这篇文章主要介绍“ReceiverSupervisorImpl实例化怎么实现”,在日常操作中,相信很多人在ReceiverSupervisorImpl实例化怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”ReceiverSupervisorImpl实例化怎么实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
先回顾下 在 Executor执行的具体的方法
实例化ReceiverSupervisorImpl
start之后等待awaitTermination
// ReceiverTracker.scala line 564 val startReceiverFunc: Iterator[Receiver[_]] => Unit = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { throw new SparkException( "Could not start receiver as object not found.") } if (TaskContext.get().attemptNumber() == 0) { val receiver = iterator.next() assert(iterator.hasNext == false) val supervisor = new ReceiverSupervisorImpl( receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start() supervisor.awaitTermination() } else { // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. } }
看下ReceiverSupervisorImpl的父类 ReceiverSupervisor的构造。
成员变量赋值、将当前supervisor与receiver关联( receiver.attachSupervisor(this) )
注释也很清晰:在Worker上负责监督Receiver。提供所需所有 处理从receiver接收到的数据 的接口
// ReceiverSupervisor.scala line 31 /** * Abstract class that is responsible for supervising a Receiver in the worker. * It provides all the necessary interfaces for handling the data received by the receiver. */ private[streaming] abstract class ReceiverSupervisor( receiver: Receiver[_], conf: SparkConf ) extends Logging { /** Enumeration to identify current state of the Receiver */ object ReceiverState extends Enumeration { type CheckpointState = Value val Initialized, Started, Stopped = Value } import ReceiverState._ // Attach the supervisor to the receiver receiver.attachSupervisor(this) // 将receiver与supervisor关联 private val futureExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128)) /** Receiver id */ protected val streamId = receiver.streamId /** Has the receiver been marked for stop. */ private val stopLatch = new CountDownLatch(1) /** Time between a receiver is stopped and started again */ private val defaultRestartDelay = conf.getInt("spark.streaming.receiverRestartDelay", 2000) /** The current maximum rate limit for this receiver. */ private[streaming] def getCurrentRateLimit: Long = Long.MaxValue /** Exception associated with the stopping of the receiver */ @volatile protected var stoppingError: Throwable = null /** State of the receiver */ @volatile private[streaming] var receiverState = Initialized // 一些方法,其实就是 数据处理接口 }
ReceiverSupervisorImpl的实例化
实例化了 BlockManagerBasedBlockHandler,用于将数据发送到BlockManager
实例化RpcEndpoint
实例化 BlockGenerator
实例化 BlockGeneratorListener 监听器
// ReceiverSupervisorImpl.scala line 43 /** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] * which provides all the necessary functionality for handling the data received by * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]] * object that is used to divide the received data stream into blocks of data. */ private[streaming] class ReceiverSupervisorImpl( receiver: Receiver[_], env: SparkEnv, hadoopConf: Configuration, checkpointDirOption: Option[String] ) extends ReceiverSupervisor(receiver, env.conf) with Logging { private val host = SparkEnv.get.blockManager.blockManagerId.host private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { // 默认是不开启 if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } } /** Remote RpcEndpointRef for the ReceiverTracker */ private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv) /** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */ private val endpoint = env.rpcEnv.setupEndpoint( "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint { override val rpcEnv: RpcEnv = env.rpcEnv override def receive: PartialFunction[Any, Unit] = { case StopReceiver => logInfo("Received stop signal") ReceiverSupervisorImpl.this.stop("Stopped by driver", None) case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => logInfo(s"Received a new rate limit: $eps.") registeredBlockGenerators.foreach { bg => bg.updateRate(eps) } } }) /** Unique block ids if one wants to add blocks directly */ private val newBlockId = new AtomicLong(System.currentTimeMillis()) private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator] // 典型的面包模式 with mutable.SynchronizedBuffer[BlockGenerator] /** Divides received data records into data blocks for pushing in BlockManager. */ private val defaultBlockGeneratorListener = new BlockGeneratorListener { def onAddData(data: Any, metadata: Any): Unit = { } def onGenerateBlock(blockId: StreamBlockId): Unit = { } def onError(message: String, throwable: Throwable) { reportError(message, throwable) } def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) { pushArrayBuffer(arrayBuffer, None, Some(blockId)) } } private val defaultBlockGenerator = createBlockGenerator(defaultBlockGeneratorListener) // ... 一些方法 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) } /** Store block and report it to driver */ def pushAndReportBlock( receivedBlock: ReceivedBlock, metadataOption: Option[Any], blockIdOption: Option[StreamBlockId] ) { val blockId = blockIdOption.getOrElse(nextBlockId) val time = System.currentTimeMillis val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") val numRecords = blockStoreResult.numRecords val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) logDebug(s"Reported block $blockId") } }
看看BlockGenerator
注释很清晰,有两个线程
周期性的 将上一批数据 作为一个block,并新建下一个批次的数据;RecurringTimer类,内部有Thread
将数据push到BlockManager
// /** * Generates batches of objects received by a * [[org.apache.spark.streaming.receiver.Receiver]] and puts them into appropriately * named blocks at regular intervals. This class starts two threads, * one to periodically start a new batch and prepare the previous batch of as a block, * the other to push the blocks into the block manager. * * Note: Do not create BlockGenerator instances directly inside receivers. Use * `ReceiverSupervisor.createBlockGenerator` to create a BlockGenerator and use it. */ private[streaming] class BlockGenerator( listener: BlockGeneratorListener, receiverId: Int, conf: SparkConf, clock: Clock = new SystemClock() ) extends RateLimiter(conf) with Logging{ private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any]) /** * The BlockGenerator can be in 5 possible states, in the order as follows. * * - Initialized: Nothing has been started * - Active: start() has been called, and it is generating blocks on added data. * - StoppedAddingData: stop() has been called, the adding of data has been stopped, * but blocks are still being generated and pushed. * - StoppedGeneratingBlocks: Generating of blocks has been stopped, but * they are still being pushed. * - StoppedAll: Everything has stopped, and the BlockGenerator object can be GCed. */ private object GeneratorState extends Enumeration { type GeneratorState = Value val Initialized, Active, StoppedAddingData, StoppedGeneratingBlocks, StoppedAll = Value } import GeneratorState._ private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms") require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value") private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") // 周期性线程 private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10) private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } // 负责将数据push的 @volatile private var currentBuffer = new ArrayBuffer[Any] @volatile private var state = Initialized //... }
至此,ReceiverSupervisorImpl实例化完成。不过,截至目前为止Receiver还未启动。
到此,关于“ReceiverSupervisorImpl实例化怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注亿速云网站,小编会继续努力为大家带来更多实用的文章!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。