zoukankan      html  css  js  c++  java
  • 5.Spark Streaming流计算框架的运行流程源码分析2

    1 spark streaming 程序代码实例
    代码如下:
    [html] view plain copy
     
    1. object OnlineTheTop3ItemForEachCategory2DB {  
    2.   def main(args: Array[String]){   
    3.     val conf = new SparkConf() //创建SparkConf对象  
    4.     //设置应用程序的名称,在程序运行的监控界面可以看到名称  
    5.     conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")  
    6.     conf.setMaster("spark://Master:7077") //此时,程序在Spark集群  
    7.     //设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口  
    8.     val ssc = new StreamingContext(conf, Seconds(5))  
    9.     ssc.checkpoint("/root/Documents/SparkApps/checkpoint")  
    10.     val soketDStream = ssc.socketTextStream("Master", 9999)  
    11.     /// 业务处理逻辑 .....
    12.       
    13.     ssc.start()  
    14.     ssc.awaitTermination()  
    15.   }  
    16. }  
     
    2 Spark Streaming的运行源码分析

    2.1 创建StreamingContext

     
    我们将基于以上实例例,粗略地分析一下Spark源码,提示一些有针对性的内容,以了解其运行的主要流程。
    1)代码没有直接使用SparkContext,而是使用StreamingContext。
    我们来看看StreamingContext 的源码片段:
    [html] view plain copy
     
    1. /**  
    2.  * Create a StreamingContext by providing the configuration necessary for a new SparkContext.  
    3.  * @param conf a org.apache.spark.SparkConf object specifying Spark parameters  
    4.  * @param batchDuration the time interval at which streaming data will be divided into batches  
    5.  */  
    6. def this(conf: SparkConf, batchDuration: Duration) = {  
    7.   this(StreamingContext.createNewSparkContext(conf), null, batchDuration)  
    8. }  
    没错,createNewSparkContext就是创建SparkContext:
    [html] view plain copy
     
    1. private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {   
    2.   new SparkContext(conf)   
    3. }  
     这说明Spark Streaming也是Spark上的一个应用程序。

     2)案例最开始的地方肯定要通过数据流创建一个InputDStream。

    [html] view plain copy
     
    1. val socketDstram = ssc.socketTextStream("Master", 9999)  
    socketTextStream方法定义如下:
    [html] view plain copy
     
    1. /**  
    2.  * Create a input stream from TCP source hostname:port. Data is received using  
    3.  * a TCP socket and the receive bytes is interpreted as UTF8 encoded ` ` delimited  
    4.  * lines.  
    5.  * @param hostname      Hostname to connect to for receiving data  
    6.  * @param port          Port to connect to for receiving data  
    7.  * @param storageLevel  Storage level to use for storing the received objects  
    8.  *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)  
    9.  */  
    10. def socketTextStream(  
    11.     hostname: String,  
    12.     port: Int,  
    13.     storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2  
    14.   ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {  
    15.   socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)  
    16. }  
    3)可看到代码最后面调用socketStream。
    socketStream定义如下:
    [html] view plain copy
     
    1. /**  
    2.  * Create a input stream from TCP source hostname:port. Data is received using  
    3.  * a TCP socket and the receive bytes it interepreted as object using the given  
    4.  * converter.  
    5.  * @param hostname      Hostname to connect to for receiving data  
    6.  * @param port          Port to connect to for receiving data  
    7.  * @param converter     Function to convert the byte stream to objects  
    8.  * @param storageLevel  Storage level to use for storing the received objects  
    9.  * @tparam T            Type of the objects received (after converting bytes to objects)  
    10.  */  
    11. def socketStream[T: ClassTag](  
    12.     hostname: String,  
    13.     port: Int,  
    14.     converter: (InputStream) => Iterator[T],  
    15.     storageLevel: StorageLevel  
    16.   ): ReceiverInputDStream[T] = {  
    17.   new SocketInputDStream[T](this, hostname, port, converter, storageLevel)  
    18. }  
    4)实际上生成SocketInputDStream。
    SocketInputDStream类如下:
    [html] view plain copy
     
    1. private[streaming]  
    2. class SocketInputDStream[T: ClassTag](  
    3.     ssc_ : StreamingContext,  
    4.     host: String,  
    5.     port: Int,  
    6.     bytesToObjects: InputStream => Iterator[T],  
    7.     storageLevel: StorageLevel  
    8.   ) extends ReceiverInputDStream[T](ssc_) {  
    9.   def getReceiver(): Receiver[T] = {  
    10.     new SocketReceiver(host, port, bytesToObjects, storageLevel)  
    11.   }  
    12. }  
    SocketInputDStream继承ReceiverInputDStream。
    其中实现getReceiver方法,返回SocketReceiver对象。
    总结一下SocketInputDStream的继承关系:
    SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。  
     
    5)DStream是生成RDD的模板,是逻辑级别,当达到Interval的时候这些模板会被batch data实例化成为RDD和DAG。
    DStream的generatedRDDs:
    [html] view plain copy
     
    1. // RDDs generated, marked as private[streaming] so that testsuites can access it  
    2. @transient  
    3. private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()  
    DStream的getOrCompute:
    [html] view plain copy
     
    1. /**  
    2.  * Get the RDD corresponding to the given time; either retrieve it from cache  
    3.  * or compute-and-cache it.  
    4.  */  
    5. private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {  
    6.   // If RDD was already generated, then retrieve it from HashMap,  
    7.   // or else compute the RDD  
    8.   generatedRDDs.get(time).orElse {  
    9.     // Compute the RDD if time is valid (e.g. correct time in a sliding window)  
    10.     // of RDD generation, else generate nothing.  
    11.     if (isTimeValid(time)) {  
    12.       val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {  
    13.         // Disable checks for existing output directories in jobs launched by the streaming  
    14.         // scheduler, since we may need to write output to an existing directory during checkpoint  
    15.         // recovery; see SPARK-4835 for more details. We need to have this call here because  
    16.         // compute() might cause Spark jobs to be launched.  
    17.         PairRDDFunctions.disableOutputSpecValidation.withValue(true) {  
    18.           compute(time)  
    19.         }  
    20.       }  
    21.       rddOption.foreach { case newRDD =>  
    22.         // Register the generated RDD for caching and checkpointing  
    23.         if (storageLevel != StorageLevel.NONE) {  
    24.           newRDD.persist(storageLevel)  
    25.           logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")  
    26.         }  
    27.         if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {  
    28.           newRDD.checkpoint()  
    29.           logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")  
    30.         }  
    31.         generatedRDDs.put(time, newRDD)  
    32.       }  
    33.       rddOption  
    34.     } else {  
    35.       None  
    36.     }  
    37.   }  
    38. }  
    主要是生成RDD,再将生成的RDD放在HashMap中。具体生成RDD过程以后剖析。
    目前大致讲了DStream和RDD这些核心概念在Spark Streaming中的使用。
     

    2.2 启动StreamingContext

    代码实例中的ssc.start() 方法启动StreamingContext,主要的逻辑发生在这个start方法中:

          *  在StreamingContext调用start方法的内部其实是会启动JobScheduler的Start方法,进行消息循环,

          *  在JobScheduler的start内部会构造JobGenerator和ReceiverTacker,并且调用JobGenerator和

          *  ReceiverTacker的start方法:

          *

          *  1,JobGenerator启动后会不断的根据batchDuration生成一个个的Job

          *  其实这里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG

          *  而已,从Java角度讲,相当于Runnable接口实例,此时要想运行Job需要提交给JobScheduler,

          *  在JobScheduler中通过线程池的方式找到一个单独的线程来提交Job到集群运行(其实是在线程中

          *  基于RDD的Action触发真正的作业的运行)

          *

          *  2,ReceiverTracker启动后首先在Spark Cluster中启动Receiver(其实是在Executor中先启动

          *  ReceiverSupervisor),在Receiver收到数据后会通过ReceiverSupervisor存储到Executor并且把

          *  数据的Metadata信息发送给Driver中的ReceiverTracker,在ReceiverTracker内部会通过

          *  ReceivedBlockTracker来管理接受到的元数据信息.

     

    体现Spark Streaming应用运行流程的关键类如下图所示。
     
     
     
    下面开启神奇的 源码分析之旅,过程痛苦,痛苦之后是大彻大悟的畅快...........
     
     
    1)先看看ScreamingContext的start()。
    start()方法启动StreamContext,由于Spark应用程序不能有多个SparkContext对象实例,所以Spark Streaming框架在启动时对状态进行判断。代码如下:
    [html] view plain copy
     
    1. /**  
    2.  * Start the execution of the streams.  
    3.  *  
    4.  * @throws IllegalStateException if the StreamingContext is already stopped.  
    5.  */  
    6. def start(): Unit = synchronized {  
    7.   state match {  
    8.     case INITIALIZED =>  
    9.       startSite.set(DStream.getCreationSite())  
    10.       StreamingContext.ACTIVATION_LOCK.synchronized {  
    11.         StreamingContext.assertNoOtherContextIsActive()  
    12.         try {  
    13.           validate()  
    14.           // Start the streaming scheduler in a new thread, so that thread local properties  
    15.           // like call sites and job groups can be reset without affecting those of the  
    16.           // current thread.  
    17.           //线程本地存储,线程有自己的私有属性,设置这些线程的时候不会影响其他线程,
    18.         ThreadUtils.runInNewThread("streaming-start") {  
    19.             sparkContext.setCallSite(startSite.get)  
    20.             sparkContext.clearJobGroup()  
    21.             sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")  
    22.             //启动JobScheduler  
    23.             scheduler.start()  
    24.           }  
    25.           state = StreamingContextState.ACTIVE  
    26.         } catch {  
    27.           case NonFatal(e) =>  
    28.             logError("Error starting the context, marking it as stopped", e)  
    29.             scheduler.stop(false)  
    30.             state = StreamingContextState.STOPPED  
    31.             throw e  
    32.         }  
    33.         StreamingContext.setActiveContext(this)  
    34.       }  
    35.       shutdownHookRef = ShutdownHookManager.addShutdownHook(  
    36.         StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)  
    37.       // Registering Streaming Metrics at the start of the StreamingContext  
    38.       assert(env.metricsSystem != null)  
    39.       env.metricsSystem.registerSource(streamingSource)  
    40.       uiTab.foreach(_.attach())  
    41.       logInfo("StreamingContext started")  
    42.     case ACTIVE =>  
    43.       logWarning("StreamingContext has already been started")  
    44.     case STOPPED =>  
    45.       throw new IllegalStateException("StreamingContext has already been stopped")  
    46.   }  
    47. }  
    初始状态时,会启动JobScheduler。
     
    2)接着来看下JobScheduler的启动过程start()。
    其中启动了EventLoop、StreamListenerBus、ReceiverTracker和jobGenerator等多项工作。
    [html] view plain copy
     
    1. def start(): Unit = synchronized {  
    2.   if (eventLoop != null) return // scheduler has already been started  
    3.     logDebug("Starting JobScheduler")  
    4.     eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {  
    5.     override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)  
    6.     override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)  
    7.   }  
    8.   // 启动消息循环处理线程。用于处理JobScheduler的各种事件。  
    9.   eventLoop.start()  
    10.   // attach rate controllers of input streams to receive batch completion updates  
    11.   for {  
    12.     inputDStream <- ssc.graph.getInputStreams  
    13. // rateController可以控制输入速度
    14.     rateController <- inputDStream.rateController  
    15.   } ssc.addStreamingListener(rateController)  
    16.   // 启动监听器。用于更新Spark UI中StreamTab的内容。  
    17.   listenerBus.start(ssc.sparkContext)  
    18.   receiverTracker = new ReceiverTracker(ssc)  
    19.   // 生成InputInfoTracker。用于管理所有的输入的流,以及他们输入的数据统计。这些信息将通过 StreamingListener监听。  
    20.   inputInfoTracker = new InputInfoTracker(ssc)  
    21.   // 启动ReceiverTracker。用于处理数据接收、数据缓存、Block生成。  
    22.   receiverTracker.start()  
    23.   // 启动JobGenerator。用于DStreamGraph初始化、DStream与RDD的转换、生成Job、提交执行等工作。  
    24.   jobGenerator.start()  
    25.   logInfo("Started JobScheduler")  
    26. }  
    3)JobScheduler中的消息处理函数processEvent
    处理三类消息:Job已开始,Job已完成,错误报告。
    [html] view plain copy
     
    1. private def processEvent(event: JobSchedulerEvent) {  
    2.   try {  
    3.     event match {  
    4.       case JobStarted(job, startTime) => handleJobStart(job, startTime)  
    5.       case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)  
    6.       case ErrorReported(m, e) => handleError(m, e)  
    7.     }  
    8.   } catch {  
    9.     case e: Throwable =>  
    10.       reportError("Error in job scheduler", e)  
    11.   }  
    12. }
    4)我们再粗略地分析一下JobScheduler.start()中启动的工作。
    4.1)先看JobScheduler.start()启动的第一项工作EventLoop。
    EventLoop用于处理JobScheduler的各种事件。
    EventLoop中有事件队列:
    [html] view plain copy
     
    1. private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()  
    还有一个线程处理队列中的事件:
    [html] view plain copy
     
    1. private val eventThread = new Thread(name) {  
    2.   setDaemon(true)  
    3.   override def run(): Unit = {  
    4.     try {  
    5.       while (!stopped.get) {  
    6.         val event = eventQueue.take()  
    7.         try {  
    8.           onReceive(event)  
    9.         } catch {  
    10.           case NonFatal(e) => {  
    11.             try {  
    12.               onError(e)  
    13.             } catch {  
    14.               case NonFatal(e) => logError("Unexpected error in " + name, e)  
    15.             }  
    16.           }  
    17.         }  
    18.       }  
    19.     } catch {  
    20.       case ie: InterruptedException => // exit even if eventQueue is not empty  
    21.       case NonFatal(e) => logError("Unexpected error in " + name, e)  
    22.     }  
    23.   }  
    24. }  
    这个线程中的onReceive、onError,在JobScheduler中的EventLoop实例化时已定义。
    4.2)JobScheduler.start()启动的第二项工作StreamListenerBus。
    - 用于异步传递StreamingListenerEvents到注册的StreamingListeners。
    - 用于更新Spark UI中StreamTab的内容。
     
     
    4.3)看JobScheduler.start()启动的第三项工作ReceiverTracker。
    ReceiverTracker用于管理所有的输入的流,以及他们输入的数据统计。
    这些信息将通过 StreamingListener监听。
    ReceiverTracker的start()中,会内部实例化ReceiverTrackerEndpoint这个Rpc消息通信体。
     
     1 def start(): Unit = synchronized {
     2   if (isTrackerStarted) {
     3     throw new SparkException("ReceiverTracker already started")
     4   }
     5  
     6   if (!receiverInputStreams.isEmpty) {
     7     endpoint = ssc.env.rpcEnv.setupEndpoint(
     8       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
     9     if (!skipReceiverLaunch) launchReceivers()
    10     logInfo("ReceiverTracker started")
    11     trackerState = Started
    12   }
    13 }
     
    在ReceiverTracker启动的过程中会调用其launchReceivers方法:
     
    /**
     * Get the receivers from the ReceiverInputDStreams, distributes them to the
     * worker nodes as a parallel collection, and runs them.
     */
    private def launchReceivers(): Unit = {
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })
      runDummySparkJob()
      logInfo("Starting " + receivers.length + " receivers")
      endpoint.send(StartAllReceivers(receivers))
    }
     
    其中调用了runDummySparkJob方法来启动Spark Streaming的框架第一个Job,其中collect这个action操作会触发Spark Job的执行。这个方法是为了确保每个Slave都注册上,避免所有Receiver都在一个节点,使后面的计算能负载均衡。
     
    /**
     * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
     * receivers to be scheduled on the same node.
     *
     * TODO Should poll the executor number and wait for executors according to
     * "spark.scheduler.minRegisteredResourcesRatio" and
     * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
     */
    private def runDummySparkJob(): Unit = {
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }
      assert(getExecutors.nonEmpty)
    }
     
    
    ReceiverTracker.launchReceivers()还调用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通信体发送StartAllReceivers消息。
    ReceiverTrackerEndpoint它自己接收到消息后,先根据调度策略获得Recevier在哪个Executor上运行,然后在调用startReceiver(receiver, executors)方法,来启动Receiver。
    override def receive: PartialFunction[Any, Unit] = {
      // Local messages
      case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }
     
    在startReceiver方法中,ssc.sparkContext.submitJob提交Job的时候传入startReceiverFunc这个方法,因为startReceiverFunc该方法是在Executor上执行的。而在startReceiverFunc方法中实例化ReceiverSupervisorImpl对象,该对象是对Receiver进行管理和监控。这个Job是Spark Streaming框架为我们启动的第二个Job,且一直运行。因为supervisor.awaitTermination()该方法会阻塞等待退出
     
    /**
     * Start a receiver along with its scheduled executors
     */
    private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
     
        // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........
     
      // Function to start the receiver on the worker node
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            //实例化Receiver监控者
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }
     
      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
     
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, 
       startReceiverFunc, //提交Job时候传入startReceiverFunc这个方法,因为startReceiverFunc该方法是在Executor上执行的
      Seq(0), (_, _) => Unit, ())
     
      // 一直重启 receiver job直到 ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(submitJobThreadPool)
      logInfo(s"Receiver ${receiver.streamId} started")
    }
     
    接下来看下ReceiverSupervisorImpl的启动过程,先启动所有注册上的BlockGenerator对象,然后向ReceiverTrackerEndpoint发送RegisterReceiver消息,再调用receiver的onStart方法。
     
    /** Start the supervisor */
    def start() {
      onStart()
      startReceiver()
    }
     
    其中的onStart():启动所有注册上的BlockGenerator对象
    override protected def onStart() {
      registeredBlockGenerators.foreach { _.start() }
    }
     
    其中的startReceiver()方法中调用onReceiverStart()然后再调用receiver的onStart方法。
     
    /** Start receiver */
    def startReceiver(): Unit = synchronized {
      try {
        if (onReceiverStart()) {
          logInfo("Starting receiver")
          receiverState = Started
          receiver.onStart()
          logInfo("Called receiver onStart")
        } else {
          // The driver refused us
          stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
        }
      } catch {
        case NonFatal(t) =>
          stop("Error starting receiver " + streamId, Some(t))
      }
    }
     
    在onReceiverStart()中向ReceiverTrackerEndpoint发送RegisterReceiver消息
     
    override protected def onReceiverStart(): Boolean = {
      val msg = RegisterReceiver(
        streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
      trackerEndpoint.askWithRetry[Boolean](msg)
    }
     
    其中在Driver运行的ReceiverTrackerEndpoint对象接收到RegisterReceiver消息后,将streamId, typ, host, executorId, receiverEndpoint封装为ReceiverTrackingInfo保存到内存对象receiverTrackingInfos这个HashMap中。
     
    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
      // Remote messages
      case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
        val successful =
          registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
        context.reply(successful)
     
     
    registerReceiver方法源码:
    /** Register a receiver */
    private def registerReceiver(
        streamId: Int,
        typ: String,
        host: String,
        executorId: String,
        receiverEndpoint: RpcEndpointRef,
        senderAddress: RpcAddress
      ): Boolean = {
      if (!receiverInputStreamIds.contains(streamId)) {
        throw new SparkException("Register received for unexpected id " + streamId)
      }
     
        // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........
     
      if (!isAcceptable) {
        // Refuse it since it's scheduled to a wrong executor
        false
      } else {
        val name = s"${typ}-${streamId}"
        val receiverTrackingInfo = ReceiverTrackingInfo(
          streamId,
          ReceiverState.ACTIVE,
          scheduledLocations = None,
          runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
          name = Some(name),
          endpoint = Some(receiverEndpoint))
        receiverTrackingInfos.put(streamId, receiverTrackingInfo)
        listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
        logInfo("Registered receiver for stream " + streamId + " from " + senderAddress)
        true
      }
    }
     
    Receiver的启动,以ssc.socketTextStream("localhost", 9999)为例,创建的是SocketReceiver对象。内部启动一个线程来连接Socket Server,读取socket数据并存储。
     
    private[streaming]
    class SocketReceiver[T: ClassTag](
        host: String,
        port: Int,
        bytesToObjects: InputStream => Iterator[T],
        storageLevel: StorageLevel
      ) extends Receiver[T](storageLevel) with Logging {
     
      def onStart() {
        // Start the thread that receives data over a connection
        new Thread("Socket Receiver") {
          setDaemon(true)
          override def run() { receive() }
        }.start()
      }
     
     
      /** Create a socket connection and receive data until receiver is stopped */
      def receive() {
        var socket: Socket = null
        try {
          logInfo("Connecting to " + host + ":" + port)
          socket = new Socket(host, port)
          logInfo("Connected to " + host + ":" + port)
          val iterator = bytesToObjects(socket.getInputStream())
          while(!isStopped && iterator.hasNext) {
            store(iterator.next)
          }
          if (!isStopped()) {
            restart("Socket data stream had no more data")
          } else {
            logInfo("Stopped receiving")
          }
        } catch {
            // ........... 此处省略1万字 (无关代码) , 呵呵哒 .........
      }
    }
     
    4.4)接下来看JobScheduler.start()中启动的第四项工作JobGenerator。
    JobGenerator有成员RecurringTimer,用于启动消息系统和定时器。按照batchInterval时间间隔定期发送GenerateJobs消息。
     
    //根据创建StreamContext时传入的batchInterval,定时发送GenerateJobs消息
    private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
      longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
     
    JobGenerator的start()方法:
    /** Start generation of jobs */
    def start(): Unit = synchronized {
      if (eventLoop != null) return // generator has already been started
     
      // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
      // See SPARK-10125
      checkpointWriter
     
      eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
        override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
     
        override protected def onError(e: Throwable): Unit = {
          jobScheduler.reportError("Error in job generator", e)
        }
      }
     
      // 启动消息循环处理线程
      eventLoop.start()
     
      if (ssc.isCheckpointPresent) {
        restart()
      } else {
        // 开启定时生成Job的定时器
        startFirstTime()
      }
    }
     
    JobGenerator.start()中的startFirstTime()的定义:
    /** Starts the generator for the first time */
    private def startFirstTime() {
      val startTime = new Time(timer.getStartTime())
      graph.start(startTime - graph.batchDuration)
      timer.start(startTime.milliseconds)
      logInfo("Started JobGenerator at " + startTime)
    }
    JobGenerator.start()中的processEvent()的定义:
    [html] view plain copy
     
    1. /** Processes all events */  
    2. private def processEvent(event: JobGeneratorEvent) {  
    3.   logDebug("Got event " + event)  
    4.   event match {  
    5.     case GenerateJobs(time) =generateJobs(time)  
    6.     case ClearMetadata(time) => clearMetadata(time)  
    7.     case DoCheckpoint(time, clearCheckpointDataLater) =>  
    8.       doCheckpoint(time, clearCheckpointDataLater)  
    9.     case ClearCheckpointData(time) => clearCheckpointData(time)  
    10.   }  
    11. }  
    其中generateJobs的定义:
    /** Generate jobs and perform checkpoint for the given `time`.  */
    private def generateJobs(time: Time) {
      // Set the SparkEnv in this thread, so that job generation code can access the environment
      // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
      // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
      SparkEnv.set(ssc.env)
      Try {
     
        // 根据特定的时间获取具体的数据
        jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
        //调用DStreamGraph的generateJobs生成Job
        graph.generateJobs(time) // generate jobs using allocated block
      } match {
        case Success(jobs) =>
          val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
          jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
        case Failure(e) =>
          jobScheduler.reportError("Error generating jobs for time " + time, e)
      }
      eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
    }
     
    DStreamGraph的generateJobs方法,调用输出流的generateJob方法来生成Jobs集合。
    // 输出流:具体Action的输出操作
    private val outputStreams = new ArrayBuffer[DStream[_]]()
     
    def generateJobs(time: Time): Seq[Job] = {
      logDebug("Generating jobs for time " + time)
      val jobs = this.synchronized {
        outputStreams.flatMap { outputStream =>
          val jobOption = outputStream.generateJob(time)
          jobOption.foreach(_.setCallSite(outputStream.creationSite))
          jobOption
        }
      }
      logDebug("Generated " + jobs.length + " jobs for time " + time)
      jobs
    }
     
    来看下DStream的generateJob方法,调用getOrCompute方法来获取当Interval的时候,DStreamGraph会被BatchData实例化成为RDD,如果有RDD则封装jobFunc方法,里面包含context.sparkContext.runJob(rdd, emptyFunc),然后返回封装后的Job。
    [html] view plain copy
     
    1. /**  
    2.  * Generate a SparkStreaming job for the given time. This is an internal method that  
    3.  * should not be called directly. This default implementation creates a job  
    4.  * that materializes the corresponding RDD. Subclasses of DStream may override this  
    5.  * to generate their own jobs.  
    6.  */  
    7. private[streaming] def generateJob(time: Time): Option[Job] = {  
    8.   getOrCompute(time) match {  
    9.     case Some(rdd) => {  
    10.       val jobFunc = () => {  
    11.         val emptyFunc = { (iterator: Iterator[T]) => {} }  
    12.         context.sparkContext.runJob(rdd, emptyFunc)  
    13.       }  
    14.       Some(new Job(time, jobFunc))  
    15.     }  
    16.     case None => None  
    17.   }  
    18. }  
    接下来看JobScheduler的submitJobSet方法,向线程池中提交JobHandler。而JobHandler实现了Runnable 接口,最终调用了job.run()这个方法。看一下Job类的定义,其中run方法调用的func为构造Job时传入的jobFunc,其包含了context.sparkContext.runJob(rdd, emptyFunc)操作,最终导致Job的提交。
    def submitJobSet(jobSet: JobSet) {
      if (jobSet.jobs.isEmpty) {
        logInfo("No jobs added for time " + jobSet.time)
      } else {
        listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
        jobSets.put(jobSet.time, jobSet)
        jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
        logInfo("Added jobs for time " + jobSet.time)
      }
    }
     
    JobHandler实现了Runnable 接口,最终调用了job.run()这个方法:
    private class JobHandler(job: Job) extends Runnable with Logging {
        import JobScheduler._
     
        def run() {
          try {
        
             //  *********** 此处省略无关代码 *******************
     
            // We need to assign `eventLoop` to a temp variable. Otherwise, because
            // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
            // it's possible that when `post` is called, `eventLoop` happens to null.
            var _eventLoop = eventLoop
            if (_eventLoop != null) {
              _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
              // Disable checks for existing output directories in jobs launched by the streaming
              // scheduler, since we may need to write output to an existing directory during checkpoint
              // recovery; see SPARK-4835 for more details.
              PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
                job.run()
              }
              _eventLoop = eventLoop
              if (_eventLoop != null) {
                _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
              }
            } else {
              // JobScheduler has been stopped.
            }
          } finally {
            ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
            ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
          }
        }
      }
    }
    Job的代码片段:
    [html] view plain copy
     
    1. private[streaming]  
    2. class Job(val time: Time, func: () => _) {  
    3.   private var _id: String = _  
    4.   private var _outputOpId: Int = _  
    5.   private var isSet = false  
    6.   private var _result: Try[_] = null  
    7.   private var _callSite: CallSite = null  
    8.   private var _startTime: Option[Long] = None  
    9.   private var _endTime: Option[Long] = None  
    10.   def run() {  
    11.     _result = Try(func())  
    12.   }  
     
    以上是主要源码的分析,累死宝宝了,......慢慢的成就感 
     
     
  • 相关阅读:
    Educational Codeforces Round 10 C. Foe Pairs 水题
    Educational Codeforces Round 10 B. z-sort 构造
    CDOJ 1048 Bob's vector 三分
    Educational Codeforces Round 10 A. Gabriel and Caterpillar 模拟
    第14届电子科大初赛民间盗版部分题目题解
    HDU 5654 xiaoxin and his watermelon candy 离线树状数组 区间不同数的个数
    HDU 5653 Bomber Man wants to bomb an Array. dp
    HDU 5652 India and China Origins 二分+并查集
    HDU 5651 xiaoxin juju needs help 数学
    HDU 5650 so easy 数学
  • 原文地址:https://www.cnblogs.com/zhouyf/p/5481212.html
Copyright © 2011-2022 走看看