zoukankan      html  css  js  c++  java
  • Spark Streaming ReceiverTracker架构设计

    本节的主要内容:

    一、ReceiverTracker的架构设计

    二、消息循环系统

    三、ReceiverTracker具体实现

    Spark Streaming作为Spark Core基础 架构之上的一个应用程序,其中的ReceiverTracker接收到数据之后,具体该怎么进行数据处理呢?看源码ReceiverSupervisorImpl这个类:

     /**

     * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]]
    * which provides all the necessary functionality for handling the data received by
    * the receiver. Specifically, it creates a [[org.apache.spark.streaming.receiver.BlockGenerator]]
    * object that is used to divide the received data stream into blocks of data.
    */
    private[streaming] class ReceiverSupervisorImpl(
    receiver: Receiver[_],
    env: SparkEnv,
    hadoopConf: Configuration,
    checkpointDirOption: Option[String]
    ) extends ReceiverSupervisor(receiver, env.conf) with Logging {

    private val host = SparkEnv.get.blockManager.blockManagerId.host
    private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId

    private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
    if (checkpointDirOption.isEmpty) {
    throw new SparkException(
    "Cannot enable receiver write-ahead log without checkpoint directory set. " +
    "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
    "See documentation for more details.")
    }
    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
    receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
    new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }
    }

    从源码中可以看出,写接收后的数据是通过ReceivedBlockHandler的对象receivedBlockHandler写的,写的数据的过程有二种方式:

    1、基于WAL方式进行容错写

    new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
    receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)

    2、直接写(相对不安全)

      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)

    然后存储数据完成后并报告给Driver进行存储,如下所示:

    def pushAndReportBlock(
    receivedBlock: ReceivedBlock,
    metadataOption: Option[Any],
    blockIdOption: Option[StreamBlockId]
    ) {
    val blockId = blockIdOption.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    val numRecords = blockStoreResult.numRecords
    val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    logDebug(s"Reported block $blockId")
    }

     用于汇报给Driver的消息类、如下所示:

     /** Remote RpcEndpointRef for the ReceiverTracker */

    private val trackerEndpoint = RpcUtils.makeDriverRef("ReceiverTracker", env.conf, env.rpcEnv)

    Driver端接收消息处理类为
    ReceiverTracker,通过如下方法接收元数据信息并存储的代码如下:
    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    // Remote messages
    case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
    val successful =
    registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
    context.reply(successful)
    case AddBlock(receivedBlockInfo) =>
    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
    walBatchingThreadPool.execute(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
    if (active) {
    context.reply(addBlock(receivedBlockInfo))
    } else {
    throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
    }
    }
    })
    } else {
    context.reply(addBlock(receivedBlockInfo))
    }
    case DeregisterReceiver(streamId, message, error) =>
    deregisterReceiver(streamId, message, error)
    context.reply(true)
    // Local messages
    case AllReceiverIds =>
    context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
    case StopAllReceivers =>
    assert(isTrackerStopping || isTrackerStopped)
    stopReceivers()
    context.reply(true)
    }

    根据上述代码总结如下:

    1、ReceiverSupervisorImpl中有ReceiverTracker的消息通信体,能进行与ReceiverTracker的通信。

    2、ReceiverSupervisorImpl将数据的元数据信息汇报给ReceiverTracker。 

    接下来,我们再进入ReceiverTracker:

    Receiver的三种状态:非活跃状态,正在执行调度任务状态,活跃状态。

    /** Enumeration to identify current state of a Receiver 几种不同的状态 */
    private[streaming] object ReceiverState extends Enumeration {
    type ReceiverState = Value
    val INACTIVE, SCHEDULED, ACTIVE = Value
    }

    调用ReceiverBlockTracker的getReceivedBlockQueue方法,其中streamIdToUnallocatedBlockQueues为HashMap,Key为StreamID,Value为ReceivedBlockQueue。而ReceivedBlockQueue 的定义为private type ReceivedBlockQueue = mutable.Queue[ReceivedBlockInfo]

    ReceiverBlockTracker类,可以从源码中看出,他会记录所有接收到的Block信息,根据需要把Block分配给Batch。如果设置了checkpoint,开启WAL,则会把所有的操作保存到预写日志中,因此当Driver失败后就可以从checkpoint和WAL中恢复ReceiverTracker的状态。

    ReceiverBlockTracker类中重要的方法,allocateBlocksToBatch。private val timeToAllocatedBlocks = new mutable.HashMap[Time, AllocatedBlocks]存储批处理时刻,分配到的Blocks数据。

    eceiver接收到数据,然后合并并存储数据之后,ReceiverSupervisorImpl会把Block的元数据汇报给ReceiverTracker内部的消息通信体ReceiverTrackerEndpoint。

    ReceiverTracker接收到Block的元数据信息之后,由ReceivedBlockTracker管理Block的元数据的分配,JobGenerator会将每个Batch,从ReceivedBlockTracker中获取属于该Batch的Block元数据信息来生成RDD。

    从设计模式来讲:ReceiverTrackerEndpoint和ReceivedBlockTracker是门面设计模式,内部实际干事情的是ReceivedBlockTracker,外部通信体或者代表者就是ReceiverTrackerEndpoint。

    /**
    * 管理receiver的:启动、执行、重新启动
    * 确定所有的输入流记录,有成员记录所有输入来源
    * 需要输入流,为每个输入流启动一个receiver
    * This class manages the execution of the receivers of ReceiverInputDStreams. Instance of
    * this class must be created after all input streams have been added and StreamingContext.start()
    * has been called because it needs the final set of input streams at the time of instantiation.
    *dirver端
    * @param skipReceiverLaunch Do not launch the receiver. This is useful for testing.
    */
    private[streaming]
    class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {

    private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
    private val receiverInputStreamIds = receiverInputStreams.map { _.id }
    private val receivedBlockTracker = new ReceivedBlockTracker(
    ssc.sparkContext.conf,
    ssc.sparkContext.hadoopConfiguration,
    receiverInputStreamIds,
    ssc.scheduler.clock,
    ssc.isCheckpointPresent,
    Option(ssc.checkpointDir)
    )
    private val listenerBus = ssc.scheduler.listenerBus

    /** Enumeration to identify current state of the ReceiverTracker */
    object TrackerState extends Enumeration {
    type TrackerState = Value
    val Initialized, Started, Stopping, Stopped = Value
    }
    import TrackerState._

    /** State of the tracker. Protected by "trackerStateLock" */
    @volatile private var trackerState = Initialized

    // endpoint is created when generator starts.
    // This not being null means the tracker has been started and not stopped
    private var endpoint: RpcEndpointRef = null

    private val schedulingPolicy = new ReceiverSchedulingPolicy()

    // Track the active receiver job number. When a receiver job exits ultimately, countDown will
    // be called.
    private val receiverJobExitLatch = new CountDownLatch(receiverInputStreams.size)

    /**
    * Track all receivers' information. The key is the receiver id, the value is the receiver info.
    * It's only accessed in ReceiverTrackerEndpoint.
    */
    private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]

    /**
    * Store all preferred locations for all receivers. We need this information to schedule
    * receivers. It's only accessed in ReceiverTrackerEndpoint.
    */
    private val receiverPreferredLocations = new HashMap[Int, Option[String]]

    /** Start the endpoint and receiver execution thread. */
    def start(): Unit = synchronized {
    if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint(
    "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers()
    logInfo("ReceiverTracker started")
    trackerState = Started
    }
    }

    /** Stop the receiver execution thread. */
    def stop(graceful: Boolean): Unit = synchronized {
    if (isTrackerStarted) {
    // First, stop the receivers
    trackerState = Stopping
    if (!skipReceiverLaunch) {
    // Send the stop signal to all the receivers
    endpoint.askWithRetry[Boolean](StopAllReceivers)

    // Wait for the Spark job that runs the receivers to be over
    // That is, for the receivers to quit gracefully.
    receiverJobExitLatch.await(10, TimeUnit.SECONDS)

    if (graceful) {
    logInfo("Waiting for receiver job to terminate gracefully")
    receiverJobExitLatch.await()
    logInfo("Waited for receiver job to terminate gracefully")
    }

    // Check if all the receivers have been deregistered or not
    val receivers = endpoint.askWithRetry[Seq[Int]](AllReceiverIds)
    if (receivers.nonEmpty) {
    logWarning("Not all of the receivers have deregistered, " + receivers)
    } else {
    logInfo("All of the receivers have deregistered successfully")
    }
    }

    // Finally, stop the endpoint
    ssc.env.rpcEnv.stop(endpoint)
    endpoint = null
    receivedBlockTracker.stop()
    logInfo("ReceiverTracker stopped")
    trackerState = Stopped
    }
    }

    /** Allocate all unallocated blocks to the given batch. */
    def allocateBlocksToBatch(batchTime: Time): Unit = {
    if (receiverInputStreams.nonEmpty) {
    receivedBlockTracker.allocateBlocksToBatch(batchTime)
    }
    }

    /** Get the blocks for the given batch and all input streams. */
    def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = {
    receivedBlockTracker.getBlocksOfBatch(batchTime)
    }

    /** Get the blocks allocated to the given batch and stream. */
    def getBlocksOfBatchAndStream(batchTime: Time, streamId: Int): Seq[ReceivedBlockInfo] = {
    receivedBlockTracker.getBlocksOfBatchAndStream(batchTime, streamId)
    }

    /**
    * Clean up the data and metadata of blocks and batches that are strictly
    * older than the threshold time. Note that this does not
    */
    def cleanupOldBlocksAndBatches(cleanupThreshTime: Time) {
    // Clean up old block and batch metadata
    receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, waitForCompletion = false)

    // Signal the receivers to delete old block data
    if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
    logInfo(s"Cleanup old received batch data: $cleanupThreshTime")
    synchronized {
    if (isTrackerStarted) {
    endpoint.send(CleanupOldBlocks(cleanupThreshTime))
    }
    }
    }
    }

    /** Register a receiver */
    private def registerReceiver(
    streamId: Int,
    typ: String,
    host: String,
    executorId: String,
    receiverEndpoint: RpcEndpointRef,
    senderAddress: RpcAddress
    ): Boolean = {
    if (!receiverInputStreamIds.contains(streamId)) {
    throw new SparkException("Register received for unexpected id " + streamId)
    }

    if (isTrackerStopping || isTrackerStopped) {
    return false
    }

    val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
    val acceptableExecutors = if (scheduledLocations.nonEmpty) {
    // This receiver is registering and it's scheduled by
    // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
    scheduledLocations.get
    } else {
    // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
    // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
    scheduleReceiver(streamId)
    }

    def isAcceptable: Boolean = acceptableExecutors.exists {
    case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
    case loc: TaskLocation => loc.host == host
    }

    if (!isAcceptable) {
    // Refuse it since it's scheduled to a wrong executor
    false
    } else {
    val name = s"${typ}-${streamId}"
    val receiverTrackingInfo = ReceiverTrackingInfo(
    streamId,
    ReceiverState.ACTIVE,
    scheduledLocations = None,
    runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
    name = Some(name),
    endpoint = Some(receiverEndpoint))
    receiverTrackingInfos.put(streamId, receiverTrackingInfo)
    listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
    logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
    true
    }
    }

    /** Deregister a receiver */
    private def deregisterReceiver(streamId: Int, message: String, error: String) {
    val lastErrorTime =
    if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
    val errorInfo = ReceiverErrorInfo(
    lastErrorMessage = message, lastError = error, lastErrorTime = lastErrorTime)
    val newReceiverTrackingInfo = receiverTrackingInfos.get(streamId) match {
    case Some(oldInfo) =>
    oldInfo.copy(state = ReceiverState.INACTIVE, errorInfo = Some(errorInfo))
    case None =>
    logWarning("No prior receiver info")
    ReceiverTrackingInfo(
    streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo))
    }
    receiverTrackingInfos(streamId) = newReceiverTrackingInfo
    listenerBus.post(StreamingListenerReceiverStopped(newReceiverTrackingInfo.toReceiverInfo))
    val messageWithError = if (error != null && !error.isEmpty) {
    s"$message - $error"
    } else {
    s"$message"
    }
    logError(s"Deregistered receiver for stream $streamId: $messageWithError")
    }

    /** Update a receiver's maximum ingestion rate */
    def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
    if (isTrackerStarted) {
    endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
    }
    }

    /** Add new blocks for the given stream */
    private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
    }

    /** Report error sent by a receiver */
    private def reportError(streamId: Int, message: String, error: String) {
    val newReceiverTrackingInfo = receiverTrackingInfos.get(streamId) match {
    case Some(oldInfo) =>
    val errorInfo = ReceiverErrorInfo(lastErrorMessage = message, lastError = error,
    lastErrorTime = oldInfo.errorInfo.map(_.lastErrorTime).getOrElse(-1L))
    oldInfo.copy(errorInfo = Some(errorInfo))
    case None =>
    logWarning("No prior receiver info")
    val errorInfo = ReceiverErrorInfo(lastErrorMessage = message, lastError = error,
    lastErrorTime = ssc.scheduler.clock.getTimeMillis())
    ReceiverTrackingInfo(
    streamId, ReceiverState.INACTIVE, None, None, None, None, Some(errorInfo))
    }

    receiverTrackingInfos(streamId) = newReceiverTrackingInfo
    listenerBus.post(StreamingListenerReceiverError(newReceiverTrackingInfo.toReceiverInfo))
    val messageWithError = if (error != null && !error.isEmpty) {
    s"$message - $error"
    } else {
    s"$message"
    }
    logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
    }

    private def scheduleReceiver(receiverId: Int): Seq[TaskLocation] = {
    val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None)
    val scheduledLocations = schedulingPolicy.rescheduleReceiver(
    receiverId, preferredLocation, receiverTrackingInfos, getExecutors)
    updateReceiverScheduledExecutors(receiverId, scheduledLocations)
    scheduledLocations
    }

    private def updateReceiverScheduledExecutors(
    receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {
    val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
    case Some(oldInfo) =>
    oldInfo.copy(state = ReceiverState.SCHEDULED,
    scheduledLocations = Some(scheduledLocations))
    case None =>
    ReceiverTrackingInfo(
    receiverId,
    ReceiverState.SCHEDULED,
    Some(scheduledLocations),
    runningExecutor = None)
    }
    receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
    }

    /** Check if any blocks are left to be processed */
    def hasUnallocatedBlocks: Boolean = {
    receivedBlockTracker.hasUnallocatedReceivedBlocks
    }

    /**
    * Get the list of executors excluding driver
    */
    private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
    if (ssc.sc.isLocal) {
    val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
    Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
    } else {
    ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
    blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
    }.map { case (blockManagerId, _) =>
    ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
    }.toSeq
    }
    }

    /**
    * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
    * receivers to be scheduled on the same node.
    *
    * TODO Should poll the executor number and wait for executors according to
    * "spark.scheduler.minRegisteredResourcesRatio" and
    * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
    */
    private def runDummySparkJob(): Unit = {
    if (!ssc.sparkContext.isLocal) {
    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
    }
    assert(getExecutors.nonEmpty)
    }

    /**
    * Get the receivers from the ReceiverInputDStreams, distributes them to the
    * worker nodes as a parallel collection, and runs them.
    */
    private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map(nis => {
    val rcvr = nis.getReceiver()
    rcvr.setReceiverId(nis.id)
    rcvr
    })

    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
    endpoint.send(StartAllReceivers(receivers))
    }

    /** Check if tracker has been marked for starting */
    private def isTrackerStarted: Boolean = trackerState == Started

    /** Check if tracker has been marked for stopping */
    private def isTrackerStopping: Boolean = trackerState == Stopping

    /** Check if tracker has been marked for stopped */
    private def isTrackerStopped: Boolean = trackerState == Stopped

    /** RpcEndpoint to receive messages from the receivers. */
    private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {

    // TODO Remove this thread pool after https://github.com/apache/spark/issues/7385 is merged
    private val submitJobThreadPool = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))

    private val walBatchingThreadPool = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))

    @volatile private var active: Boolean = true

    override def receive: PartialFunction[Any, Unit] = {
    // Local messages
    case StartAllReceivers(receivers) =>
    val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
    for (receiver <- receivers) {
    val executors = scheduledLocations(receiver.streamId)
    updateReceiverScheduledExecutors(receiver.streamId, executors)
    receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
    startReceiver(receiver, executors)
    }
    case RestartReceiver(receiver) =>
    // Old scheduled executors minus the ones that are not active any more
    val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
    val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
    // Try global scheduling again
    oldScheduledExecutors
    } else {
    val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
    // Clear "scheduledLocations" to indicate we are going to do local scheduling
    val newReceiverInfo = oldReceiverInfo.copy(
    state = ReceiverState.INACTIVE, scheduledLocations = None)
    receiverTrackingInfos(receiver.streamId) = newReceiverInfo
    schedulingPolicy.rescheduleReceiver(
    receiver.streamId,
    receiver.preferredLocation,
    receiverTrackingInfos,
    getExecutors)
    }
    // Assume there is one receiver restarting at one time, so we don't need to update
    // receiverTrackingInfos
    startReceiver(receiver, scheduledLocations)
    case c: CleanupOldBlocks =>
    receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
    case UpdateReceiverRateLimit(streamUID, newRate) =>
    for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
    eP.send(UpdateRateLimit(newRate))
    }
    // Remote messages
    case ReportError(streamId, message, error) =>
    reportError(streamId, message, error)
    }

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    // Remote messages
    case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
    val successful =
    registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
    context.reply(successful)
    case AddBlock(receivedBlockInfo) =>
    if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
    walBatchingThreadPool.execute(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
    if (active) {
    context.reply(addBlock(receivedBlockInfo))
    } else {
    throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
    }
    }
    })
    } else {
    context.reply(addBlock(receivedBlockInfo))
    }
    case DeregisterReceiver(streamId, message, error) =>
    deregisterReceiver(streamId, message, error)
    context.reply(true)
    // Local messages
    case AllReceiverIds =>
    context.reply(receiverTrackingInfos.filter(_._2.state != ReceiverState.INACTIVE).keys.toSeq)
    case StopAllReceivers =>
    assert(isTrackerStopping || isTrackerStopped)
    stopReceivers()
    context.reply(true)
    }

    /**
    * Return the stored scheduled executors that are still alive.
    */
    private def getStoredScheduledExecutors(receiverId: Int): Seq[TaskLocation] = {
    if (receiverTrackingInfos.contains(receiverId)) {
    val scheduledLocations = receiverTrackingInfos(receiverId).scheduledLocations
    if (scheduledLocations.nonEmpty) {
    val executors = getExecutors.toSet
    // Only return the alive executors
    scheduledLocations.get.filter {
    case loc: ExecutorCacheTaskLocation => executors(loc)
    case loc: TaskLocation => true
    }
    } else {
    Nil
    }
    } else {
    Nil
    }
    }

    /**
    * Start a receiver along with its scheduled executors
    */
    private def startReceiver(
    receiver: Receiver[_],
    scheduledLocations: Seq[TaskLocation]): Unit = {
    def shouldStartReceiver: Boolean = {
    // It's okay to start when trackerState is Initialized or Started
    !(isTrackerStopping || isTrackerStopped)
    }

    val receiverId = receiver.streamId
    if (!shouldStartReceiver) {
    onReceiverJobFinish(receiverId)
    return
    }

    val checkpointDirOption = Option(ssc.checkpointDir)
    val serializableHadoopConf =
    new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

    // Function to start the receiver on the worker node
    val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    (iterator: Iterator[Receiver[_]]) => {
    if (!iterator.hasNext) {
    throw new SparkException(
    "Could not start receiver as object not found.")
    }
    if (TaskContext.get().attemptNumber() == 0) {
    val receiver = iterator.next()
    assert(iterator.hasNext == false)
    val supervisor = new ReceiverSupervisorImpl(
    receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
    supervisor.start()
    supervisor.awaitTermination()
    } else {
    // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
    }
    }

    // Create the RDD using the scheduledLocations to run the receiver in a Spark job
    val receiverRDD: RDD[Receiver[_]] =
    if (scheduledLocations.isEmpty) {
    ssc.sc.makeRDD(Seq(receiver), 1)
    } else {
    val preferredLocations = scheduledLocations.map(_.toString).distinct
    ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    }
    receiverRDD.setName(s"Receiver $receiverId")
    ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
    ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

    val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
    // We will keep restarting the receiver job until ReceiverTracker is stopped
    future.onComplete {
    case Success(_) =>
    if (!shouldStartReceiver) {
    onReceiverJobFinish(receiverId)
    } else {
    logInfo(s"Restarting Receiver $receiverId")
    self.send(RestartReceiver(receiver))
    }
    case Failure(e) =>
    if (!shouldStartReceiver) {
    onReceiverJobFinish(receiverId)
    } else {
    logError("Receiver has been stopped. Try to restart it.", e)
    logInfo(s"Restarting Receiver $receiverId")
    self.send(RestartReceiver(receiver))
    }
    }(submitJobThreadPool)
    logInfo(s"Receiver ${receiver.streamId} started")
    }

    override def onStop(): Unit = {
    submitJobThreadPool.shutdownNow()
    active = false
    walBatchingThreadPool.shutdown()
    }

    /**
    * Call when a receiver is terminated. It means we won't restart its Spark job.
    */
    private def onReceiverJobFinish(receiverId: Int): Unit = {
    receiverJobExitLatch.countDown()
    receiverTrackingInfos.remove(receiverId).foreach { receiverTrackingInfo =>
    if (receiverTrackingInfo.state == ReceiverState.ACTIVE) {
    logWarning(s"Receiver $receiverId exited but didn't deregister")
    }
    }
    }

    /** Send stop signal to the receivers. */
    private def stopReceivers() {
    receiverTrackingInfos.values.flatMap(_.endpoint).foreach { _.send(StopReceiver) }
    logInfo("Sent stop signal to all " + receiverTrackingInfos.size + " receivers")
    }
    }

     Spark发行版笔记11

  • 相关阅读:
    vue 如何在循环中 "监听" 的绑定v-model数据
    验证正则表达式,同时验证手机号码和固定电话号码
    vue 图片上传功能
    localStorage使用总结
    vue2.0 父子组件通信 兄弟组件通信
    SVN使用教程总结
    详解SVN 的使用
    彻底理解setTimeout()
    如何实现一个HTTP请求库——axios源码阅读与分析 JavaScript
    2、Zookeeper原理及应用汇总
  • 原文地址:https://www.cnblogs.com/sparkbigdata/p/5515939.html
Copyright © 2011-2022 走看看