zoukankan      html  css  js  c++  java
  • Spark Streaming源代码学习总结(一)

    1.Spark Streaming 代码分析:

    1.1 演示样例代码DEMO:
    实时计算的WorldCount:

        import org.apache.spark.streaming.{Seconds, StreamingContext}
        import org.apache.spark.streaming.StreamingContext._
        import org.apache.spark.storage.StorageLevel
    
        object NetworkWordCount {
          def main(args: Array[String]) {
            if (args.length < 3) {
              System.err.println("Usage: NetworkWordCount <master> <hostname> <port>
    " +
                "In local mode, <master> should be 'local[n]' with n > 1")
              System.exit(1)
            }
    
            // Create the context with a 1 second batch size
            val ssc = new StreamingContext("local[4]", "NetworkWordCount", Seconds(1),
              System.getenv("SPARK_HOME"))
    
            // Create a NetworkInputDStream on target ip:port and count the
            // words in input stream of 
     delimited text (eg. generated by 'nc')
            val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
            val words = lines.flatMap(_.split(" "))
            val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
            wordCounts.print()
            ssc.start()
            ssc.awaitTermination()
          }
        }
    

    以上代码能实时的接受和处理网络传输过来的文本,并计算单词个数。


    1.2 以下基于以上代码DEMO分析记录下Spark streaming的启动过程的源代码,以及字节流处理过程的代源代码。


    以上代码DEMO涉及的启动流程时序例如以下: 简述:
    (1)从Streaming WorldCount的编程演示样例可看出Spark Streaming和Spark的编程API非常像。

    Spark Streaming是构建Spark程序框架的基础上的。从时序图的StreamingContext和SparkContext的交互能够看出来StreamingContext里面封装了SparkContext。部分关键代码例如以下:

        //包权限,包内可见。包外不可见
    //StreamingContext 里面包装的还是一个SparkContext
    class StreamingContext private[streaming] (
        sc_ : SparkContext,
        cp_ : Checkpoint,
        batchDur_ : Duration
      ) extends Logging {
    
      def this(conf: SparkConf, batchDuration: Duration) = {
        this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
      }
    
    private[streaming] def createNewSparkContext(
        master: String,
        appName: String,
        sparkHome: String,
        jars: Seq[String],
        environment: Map[String, String]
     ): SparkContext = {
        val conf = SparkContext.updatedConf(
         new SparkConf(), master, appName, sparkHome, jars, environment)
        createNewSparkContext(conf)
    }
    ...
    

        上面是StreamingContext的部分构造函数的代码。可见当中对SparkContext的封装以及CreateNewSparkContext对于SparkContext的构造过程。从代码角度看展示了StreamingContext是针对于SparkContext的封装。结合DEMO从框架启动阶段来讲,当完毕StreamingContext的实例化时候也就是完毕了程序的配置初始化过程。

    (2)结合Demo,StreamingContext实例化之后则是从StreamingContext获取了一个SocketTextStream,代码例如以下:

    val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
    

    当中args(1)。args(2)分别相应于后面所说的Receiver监听的Ip和port号,随后的參数从英文名能够看出来定义的是存储级别(这里定义仅存储在内存)。
    DEMO中的代码简单解释下,然后还是结合时序图和源代码来看下:

    StreamingContext.SocketTextStream方法例如以下:

    def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = {
        socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
    }
    

    方法返回的是ReceiverInputDStreaming。这里引出了DStream的概念(拓展总结一下)。依据DStream的代码凝视。DStream的解释例如以下:Dstream本质就是离散化的stream,将stream离散化成一组RDD的list。所以主要的操作仍然是以RDD为基础。
    InputDStream 涉及的类继承结构:

    上述类继承结构图展示了DInputStream,依据类的名字能够大致看出来DStream的作用,比方FileInputDStream就是处理文件流的DStream,kalfkaInputDSteam就是处理kalfka消息的DStream;DStream本身封装了按时间片离散化了的Stream(数据流);看例如以下DStream的代码片段:

    abstract class DStream[T: ClassTag] (
     @transient private[streaming] var ssc: StreamingContext
      ) extends Serializable with Logging {
    
      @transient
      private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
    ...
    

    如代码所看到的。generatedRDDS是DStream的一个成员变量HashMap,当中key是时间片段。value是RDD。从DStream持有的这个成员变量就能够看出来DStream的本质,就是依照时间片存储了一系列的数据流。

        /**
         * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
          * method that should not be called directly.
        */
    private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
        // If this DStream was not initialized (i.e., zeroTime not set), then do it
        // If RDD was already generated, then retrieve it from HashMap
      generatedRDDs.get(time) match {
    
      // If an RDD was already generated and is being reused, then
      // probably all RDDs in this DStream will be reused and hence should be cached
      case Some(oldRDD) => Some(oldRDD)
    
      // if RDD was not generated, and if the time is valid
      // (based on sliding time of this DStream), then generate the RDD
      case None => {
        if (isTimeValid(time)) {
          compute(time) match {
            case Some(newRDD) =>
              if (storageLevel != StorageLevel.NONE) {
                newRDD.persist(storageLevel)
                logInfo("Persisting RDD " + newRDD.id + " for time " +
                  time + " to " + storageLevel + " at time " + time)
              }
              if (checkpointDuration != null &&
                (time - zeroTime).isMultipleOf(checkpointDuration)) {
                newRDD.checkpoint()
                logInfo("Marking RDD " + newRDD.id + " for time " + time +
                  " for checkpointing at time " + time)
              }
              generatedRDDs.put(time, newRDD)
              Some(newRDD)
            case None =>
              None
          }
        } else {
          None
        }
      }
    }
    

    以上这段代码则是按时间片构建和检索RDD的主要函数,因为逻辑比較简单就不多说!


    DStream不在深入。还是回到正题,上面的DEMO代码从SteamingContext拿到了一个ReceiverInputDStream,ReceiverInputDStream封装了SocketReceiver对象,用来从网络中读取数据流,事实上代码逻辑跟下去,事实上就是拿到了一个SocketInputStream,然后按时间片将流离散化的存储在DStream定义的HashMap中。这段涉及的代码例如以下:

    def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
    }
     def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
    }
    

    以上就是clientDEMO(Driver)获取socketTextStream的过程,总结一下就是通过StreamingContext提供的API获取了一个SocketInputStreamInputDStream。

    至此返回到DEMO演示样例。除了WordCount的部分处理逻辑外,就是StreamingContext的启动逻辑了。

    通过Spark的仅仅是能够知道RDD仅仅是构建了代码的逻辑以及依赖关系。但并没有真正的运行,同理这里也是相同的道理。以下就是关键的StreamingContext的启动逻辑了。

    以下继续总结。


    (3)回到时序图,我们能够看出StreamingContext的start方法。直接或间接依次触发了JobScheduler、StreamingListenerBus、ReceiverTracker、JobGenerator的方法。以下结合源代码看一下:
    StreamingContext调用了核心的JobScheduler的start方法例如以下:

    def start(): Unit = synchronized {
        if (eventActor != null) return // scheduler has already been started
    
        logDebug("Starting JobScheduler")
        eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
          def receive = {
            case event: JobSchedulerEvent => processEvent(event)
          }
        }), "JobScheduler")
        /**
         * 启动listenerBus
         */
        listenerBus.start()
        receiverTracker = new ReceiverTracker(ssc)
        //启动receiverTracker
        receiverTracker.start()
        jobGenerator.start()
        logInfo("Started JobScheduler")
    }
    

    上面代码简单总结一下:首先生成了一个eventActor对象(类型是Akka Actor; Akka的简单定义:http://baike.baidu.com/view/5912486.htm?fr=aladdin,详细能够深入学下Scala)。eventActor的事件处理逻辑里面调用了processEvent方法。源代码例如以下:

    private def processEvent(event: JobSchedulerEvent) {
        try {
          event match {
            case JobStarted(job) => handleJobStart(job)
            case JobCompleted(job) => handleJobCompletion(job)
            case ErrorReported(m, e) => handleError(m, e)
          }
        } catch {
          case e: Throwable =>
            reportError("Error in job scheduler", e)
        }
    }
    //这个是间接调用 捕获job的启动事件
    private def handleJobStart(job: Job) {
        val jobSet = jobSets.get(job.time)
        if (!jobSet.hasStarted) {
          listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
        }
        jobSet.handleJobStart(job)
        logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
        SparkEnv.set(ssc.env)
    }
    //捕获job的完毕事件
    private def handleJobCompletion(job: Job) {
        job.result match {
          case Success(_) =>
            val jobSet = jobSets.get(job.time)
            jobSet.handleJobCompletion(job)
            logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
            if (jobSet.hasCompleted) {
              jobSets.remove(jobSet.time)
              jobGenerator.onBatchCompletion(jobSet.time)
              logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
                jobSet.totalDelay / 1000.0, jobSet.time.toString,
                jobSet.processingDelay / 1000.0
              ))
              listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
            }
          case Failure(e) =>
            reportError("Error running job " + job, e)
        }
    }
    //捕获job的ErrorReport事件
    private def handleError(msg: String, e: Throwable) {
        logError(msg, e)
        ssc.waiter.notifyError(e)
    }
    

    至此能够看到JobScheduler作为Job的调度器,所通过Akka框架的eventActor对象能够实时监听到Job的启动,停止以及错误等信息。

    而且通过以上列出的几个Handle*方法能够看出来在处理完開始结束时间后。将事件信息做了简单的封装后传输到了ListenerBus,让注冊在监听器总线上的监听器都能针对Job的状态变化及时作出反应。这个ListenerBus的设计还是十分巧妙的。


    EventActor启动完成后就启动了StreamingListenerBus对象。

    涉及的ListenerBus.start的核心源代码例如以下:

    //启动函数
    def start() {
        listenerThread.start()
    }
    
    //启动一条Daemon线程
    val listenerThread = new Thread("StreamingListenerBus") {
    setDaemon(true)
    override def run() {
      while (true) {
        val event = eventQueue.take
        event match {
          case receiverStarted: StreamingListenerReceiverStarted =>
            listeners.foreach(_.onReceiverStarted(receiverStarted))
          case receiverError: StreamingListenerReceiverError =>
            listeners.foreach(_.onReceiverError(receiverError))
          case receiverStopped: StreamingListenerReceiverStopped =>
            listeners.foreach(_.onReceiverStopped(receiverStopped))
          case batchSubmitted: StreamingListenerBatchSubmitted =>
            listeners.foreach(_.onBatchSubmitted(batchSubmitted))
          case batchStarted: StreamingListenerBatchStarted =>
            listeners.foreach(_.onBatchStarted(batchStarted))
          case batchCompleted: StreamingListenerBatchCompleted =>
            listeners.foreach(_.onBatchCompleted(batchCompleted))
          case StreamingListenerShutdown =>
            // Get out of the while loop and shutdown the daemon thread
            return
          case _ =>
        }
      }
    }
    

    通过以上代码能够看出监听器总线真实代码如其名,其start方法启动了一条Deamon线程,来监听各种事件,然后将时间捕捉后发送给注冊在总线上的监听器,其设计的巧妙可见一斑。

    以下在没忍住贴上ListenerBus完整的源代码:

    /** Asynchronously passes StreamingListenerEvents to registered StreamingListeners. */
    private[spark] class StreamingListenerBus() extends Logging {
      private val listeners = new ArrayBuffer[StreamingListener]()
        with SynchronizedBuffer[StreamingListener]
    
      /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
       * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
      private val EVENT_QUEUE_CAPACITY = 10000
      private val eventQueue = new LinkedBlockingQueue[StreamingListenerEvent](EVENT_QUEUE_CAPACITY)
      private var queueFullErrorMessageLogged = false
    
      val listenerThread = new Thread("StreamingListenerBus") {
        setDaemon(true)
        override def run() {
          while (true) {
            val event = eventQueue.take
            event match {
              case receiverStarted: StreamingListenerReceiverStarted =>
                listeners.foreach(_.onReceiverStarted(receiverStarted))
              case receiverError: StreamingListenerReceiverError =>
                listeners.foreach(_.onReceiverError(receiverError))
              case receiverStopped: StreamingListenerReceiverStopped =>
                listeners.foreach(_.onReceiverStopped(receiverStopped))
              case batchSubmitted: StreamingListenerBatchSubmitted =>
                listeners.foreach(_.onBatchSubmitted(batchSubmitted))
              case batchStarted: StreamingListenerBatchStarted =>
                listeners.foreach(_.onBatchStarted(batchStarted))
              case batchCompleted: StreamingListenerBatchCompleted =>
                listeners.foreach(_.onBatchCompleted(batchCompleted))
              case StreamingListenerShutdown =>
                // Get out of the while loop and shutdown the daemon thread
                return
              case _ =>
            }
          }
        }
      }
    
      def start() {
        listenerThread.start()
      }
    
      def addListener(listener: StreamingListener) {
        listeners += listener
      }
    
      def post(event: StreamingListenerEvent) {
        val eventAdded = eventQueue.offer(event)
        if (!eventAdded && !queueFullErrorMessageLogged) {
          logError("Dropping StreamingListenerEvent because no remaining room in event queue. " +
            "This likely means one of the StreamingListeners is too slow and cannot keep up with the " +
            "rate at which events are being started by the scheduler.")
          queueFullErrorMessageLogged = true
        }
      }
    
      /**
       * Waits until there are no more events in the queue, or until the specified time has elapsed.
       * Used for testing only. Returns true if the queue has emptied and false is the specified time
       * elapsed before the queue emptied.
       */
      def waitUntilEmpty(timeoutMillis: Int): Boolean = {
        val finishTime = System.currentTimeMillis + timeoutMillis
        while (!eventQueue.isEmpty) {
          if (System.currentTimeMillis > finishTime) {
            return false
          }
          /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
           * add overhead in the general case. */
          Thread.sleep(10)
        }
        true
      }
    
      def stop(): Unit = post(StreamingListenerShutdown)
    }
    

    通过这个监听器总线的完整源代码,能够看到里面一个监听器数组。以及一个事件的堵塞队列,以及事件处理程序,也就是将事件通知到注冊上来的每一个监听器。这段代码这是看着就舒服。是一个非常好的代码模式呀,赞一个!


    陶醉至此继续看ReceiverTracker(ReceiverTracker是Receiver监管程序)的启动过程,首先看下ReceiverTracker的start方法:

     /** Start the actor and receiver execution thread. */
    def start() = synchronized {
        if (actor != null) {
          throw new SparkException("ReceiverTracker already started")
        }
    
        if (!receiverInputStreams.isEmpty) {
          actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
            "ReceiverTracker")
          receiverExecutor.start()
          logInfo("ReceiverTracker started")
        }
    }
    

    解读一下首先还是new了一个Akka的Actor对象,肯定是用来处理一些事件呀。看代码:

    /** Actor to receive messages from the receivers. */
    private class ReceiverTrackerActor extends Actor {
        def receive = {
          case RegisterReceiver(streamId, typ, host, receiverActor) =>
            registerReceiver(streamId, typ, host, receiverActor, sender)
            sender ! true
          case AddBlock(receivedBlockInfo) =>
            addBlocks(receivedBlockInfo)
          case ReportError(streamId, message, error) =>
            reportError(streamId, message, error)
          case DeregisterReceiver(streamId, message, error) =>
            deregisterReceiver(streamId, message, error)
            sender ! true
        }
    }
    

    这个Actor主要是监听从Receiver发送过来的一些消息,包含Receiver的注冊、注销,以及接受数据流的存储AddBlock。再次可见JobTracker真是统揽全局的组件。

    ReceiverTracker start方法初始化监听Receiver的Actor后,以下的 receiverExecutor.start()方法的调用才是真正的启动Receiver的核心逻辑。看代码:

    从这个代码片段能够看出receiverExecutor是ReceiverLaucher的实例。其Start逻辑例如以下:

     def start() {
      thread.start()
    }
    
    /** This thread class runs all the receivers on the cluster.  */
    class ReceiverLauncher {
        @transient val env = ssc.env
        @transient val thread  = new Thread() {
        override def run() {
        try {
          SparkEnv.set(env)
          //启动Receiver
          startReceivers()
        } catch {
          case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
        }
      }
    }
    
     /**
     * Get the receivers from the ReceiverInputDStreams, distributes them to the
     * worker nodes as a parallel collection, and runs them.
     */
    private def startReceivers() {
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })
    
      // Right now, we only honor preferences if all receivers have them
      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
    
      // Create the parallel collection of receivers to distributed them on the worker nodes
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
        }
    
      // Function to start the receiver on the worker node
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException(
            "Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
        executor.start()
        executor.awaitTermination()
      }
      // Run the dummy Spark job to ensure that all slaves have registered.
      // This avoids all the receivers to be scheduled on the same node.
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }
    
      // Distribute the receivers and start them
      logInfo("Starting " + receivers.length + " receivers")
      ssc.sparkContext.runJob(tempRDD, startReceiver)
      logInfo("All of the receivers have been terminated")
    }
    

    能够ReceiverLauncher的start方法相同启动一条线程,异步的从InputStreams获取全部的Receiver,然后将Receiver封装成ReceiverSuperior逐个启动。当然这么这个是笼统的总结,从细节上再看下:
    上面方法中是从receiverInputStreams获取了全部的Receiver,ReceiverInputStreams事实上是这个样子的。

    private val inputStreams = new ArrayBuffer[InputDStream[_]]()
    
    def getReceiverInputStreams() = this.synchronized {
    inputStreams.filter(_.isInstanceOf[ReceiverInputDStream[_]])
      .map(_.asInstanceOf[ReceiverInputDStream[_]])
      .toArray
     }
    

    一个InputDStream数组经过以下函数处理后得到的数组,主要是校验一下是否是ReceiverInputDStream类型。


    从封装在InputDStream中拿到全部的Receiver结合后,然后将Receiver封装成RDD分发到多个work结点上(可见Spark Streaming的巧妙,将Receiver像RDD一样分发)代码例如以下:

      //将Receiver封装成RDD并分发到work结点上。
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
      }
    
      //在worker结点上循环遍历并将Receiver封装成ReceiverSupervisor然后逐个启动。
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          val receiver = iterator.next()
          val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
          executor.start()
          executor.awaitTermination()
      }
    

    这里有个细节是每一个Receiver事实上都是封装成了ReceiverSupervisor,然后才启动的。ReceiverSupervisor事实上是Receiver的监管程序,其能够用来处理Receiver启动之外的一些事件逻辑,至此补充一张ReceiverTracker以及ReceverSupervisor相关的时序图:

    因为Receiver都是封装在ReceiverSupervisor里的。那就看一下ReceiverSupervisor的start方法。

     /** Start the supervisor */
      def start() {
        onStart()
        startReceiver()
      }
    
      override protected def onStart() {
        blockGenerator.start()
      }
       /** Start receiver */
      def startReceiver(): Unit = synchronized {
        try {
          logInfo("Starting receiver")
          receiver.onStart()
          logInfo("Called receiver onStart")
          onReceiverStart()
          receiverState = Started
        } catch {
          case t: Throwable =>
            stop("Error starting receiver " + streamId, Some(t))
        }
      }
    
      /**
        * 发送RegisterServer 消息给Driver报告自己启动了。

    */ override protected def onReceiverStart() { val msg = RegisterReceiver( streamId, receiver.getClass.getSimpleName, Utils.localHostName(), actor) val future = trackerActor.ask(msg)(askTimeout) Await.result(future, askTimeout) }

    结合时序图以及源代码,能够看出ReceiverSupervisor的start方法触发了BlockGenerator的start方法以及Receiver的onstart方法。

    那么一下补充一张有关BlockGenerator的相关调用时序以及Receiver的类继承结构图。

    SocketReceiver接受并保存数据的处理流程。

    Receiver的类继承结构图:

    依据以上Receiver的类结构图,能够看出来Receiver的继承堆栈结构,我们仅仅看下和本次Demo相关的onstart方法(由于ReceiverSupervisor调用了Recevier的onstart方法。而且这里涉及的Receiver是SocketReceiver)。

    private[streaming]
    class SocketReceiver[T: ClassTag](
        host: String,
        port: Int,
        bytesToObjects: InputStream => Iterator[T],
        storageLevel: StorageLevel
      ) extends Receiver[T](storageLevel) with Logging {
    
      def onStart() {
        // Start the thread that receives data over a connection
        new Thread("Socket Receiver") {
          setDaemon(true)
          override def run() { receive() }
        }.start()
      }
    
      def onStop() {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself isStopped() returns false
      }
    
      /** Create a socket connection and receive data until receiver is stopped */
      def receive() {
        var socket: Socket = null
        try {
          logInfo("Connecting to " + host + ":" + port)
          socket = new Socket(host, port)
          logInfo("Connected to " + host + ":" + port)
          val iterator = bytesToObjects(socket.getInputStream())
          while(!isStopped && iterator.hasNext) {
            store(iterator.next)
          }
          logInfo("Stopped receiving")
          restart("Retrying connecting to " + host + ":" + port)
        } catch {
          case e: java.net.ConnectException =>
            restart("Error connecting to " + host + ":" + port, e)
          case t: Throwable =>
            restart("Error receiving data", t)
        } finally {
          if (socket != null) {
            socket.close()
            logInfo("Closed socket to " + host + ":" + port)
          }
        }
      }
    }
    

    从以上代码能够看出SocketReceiver的onStart方法也是启动了一条后台线程,循环的监听Socket的还有一端发送过来的字节流,然后调用store方法保存起来,供兴许的Action进行处理。

    那就在看下Store方法:

      /**
       * Store a single item of received data to Spark's memory.
       * These single items will be aggregated together into data blocks before
       * being pushed into Spark's memory.
       */
      def store(dataItem: T) {
        executor.pushSingle(dataItem)
      }
    

    结合上端代码和并结合上面“socketReceiver接受并保存数据”的时序图。就可以追溯到SocketReceiver接受数据并有BlockManager报错的过程。当中ReceiverSupervisor重载了BlockManager的“+”号方法,同一时候也利用了堵塞队列的机制来保存数据。最后通过BlockManager将接受到的流数据存储起来。

    以上过程就不做具体分解和结合时序图还是比較easy理解的。

    (4)最后单拿出来再看下JobGenerator的start方法:

         /** Start generation of jobs */
        def start(): Unit = synchronized {
            if (eventActor != null) return // generator has already been started
    
            eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
              def receive = {
                case event: JobGeneratorEvent =>  processEvent(event)
              }
            }), "JobGenerator")
            if (ssc.isCheckpointPresent) {
              restart()
            } else {
              startFirstTime()
            }
        }
    
        /** Starts the generator for the first time */
       private def startFirstTime() {
        val startTime = new Time(timer.getStartTime())
        graph.start(startTime - graph.batchDuration)
        timer.start(startTime.milliseconds)
        logInfo("Started JobGenerator at " + startTime)
       }
    
       /** Restarts the generator based on the information in checkpoint */
       private def restart() {
        // If manual clock is being used for testing, then
        // either set the manual clock to the last checkpointed time,
        // or if the property is defined set it to that time
        if (clock.isInstanceOf[ManualClock]) {
          val lastTime = ssc.initialCheckpoint.checkpointTime.milliseconds
          val jumpTime = ssc.sc.conf.getLong("spark.streaming.manualClock.jump", 0)
          clock.asInstanceOf[ManualClock].setTime(lastTime + jumpTime)
        }
    
        val batchDuration = ssc.graph.batchDuration
    
        // Batches when the master was down, that is,
        // between the checkpoint and current restart time
        val checkpointTime = ssc.initialCheckpoint.checkpointTime
        val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds))
        val downTimes = checkpointTime.until(restartTime, batchDuration)
        logInfo("Batches during down time (" + downTimes.size + " batches): "
          + downTimes.mkString(", "))
    
        // Batches that were unprocessed before failure
        val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)
        logInfo("Batches pending processing (" + pendingTimes.size + " batches): " +
          pendingTimes.mkString(", "))
        // Reschedule jobs for these times
        val timesToReschedule = (pendingTimes ++ downTimes).distinct.sorted(Time.ordering)
        logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
          timesToReschedule.mkString(", "))
        timesToReschedule.foreach(time =>
          jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
        )
    
        // Restart the timer
        timer.start(restartTime.milliseconds)
        logInfo("Restarted JobGenerator at " + restartTime)
      }
    

    以上代码则是设计了Job的生成以及启动过程,详细信息临时做个checkPoint有待下次分解。

  • 相关阅读:
    K
    A
    2017 Multi-University Training Contest
    第一章 概述
    校赛F 比比谁更快(线段树)
    POJ 3683 Priest John's Busiest Day
    POJ 2186 Popular Cows
    第十五周讨论
    线段树模板(单点更新,区间更新,RMQ)
    2-SAT问题(白书)
  • 原文地址:https://www.cnblogs.com/yutingliuyl/p/7107150.html
Copyright © 2011-2022 走看看