zoukankan      html  css  js  c++  java
  • spark streaming 接收kafka消息之四 -- 运行在 worker 上的 receiver

    使用分布式receiver来获取数据
    使用 WAL 来实现 At least once 操作:
    conf.set("spark.streaming.receiver.writeAheadLog.enable","true") // 开启 WAL
    // 1、At most once - 每条数据最多被处理一次(0次或1次),这种语义下会出现数据丢失的问题;
    // 2、At least once - 每条数据最少被处理一次 (1次或更多),这个不会出现数据丢失,但是会出现数据重复;
    // 3、Exactly once - 每条数据只会被处理一次,没有数据会丢失,并且没有数据会被多次处理,这种语义是大家最想要的,但是也是最难实现的。

    如果不做容错,将会带来数据丢失,因为Receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),Executor突然挂掉(或是driver挂掉通知executor关闭),缓存在内存中的数据就会丢失。因为这个问题,Spark1.2开始加入了WAL(Write ahead log)开启 WAL,将receiver获取数据的存储级别修改为StorageLevel. MEMORY_AND_DISK_SER_2

    1 // 缺点,不能自己维护消费 topic partition 的 offset
    2 // 优点,开启 WAL,来确保 exactly-once 语义
    3 val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
    4     ssc,kafkaParams,map,StorageLevel.MEMORY_AND_DISK_SER_2)

    从Kafka 中读取数据

    Driver 规划 receiver 运行的信息

    org.apache.spark.streaming.StreamingContext#start中启动了 JobScheduler实例

     1 // private[streaming] val scheduler = new JobScheduler(this)
     2 
     3 // Start the streaming scheduler in a new thread, so that thread local properties
     4 // like call sites and job groups can be reset without affecting those of the
     5 // current thread.
     6 ThreadUtils.runInNewThread("streaming-start") { // 单独的一个daemon线程运行函数题
     7   sparkContext.setCallSite(startSite.get)
     8   sparkContext.clearJobGroup()
     9   sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
    10 // 执行start 方法
    11   scheduler.start()
    12 }
    13 state = StreamingContextState.ACTIVE

    org.apache.spark.streaming.scheduler.JobScheduler#start 源码如下:

     1 def start(): Unit = synchronized {
     2   if (eventLoop != null) return // scheduler has already been started
     3 
     4   logDebug("Starting JobScheduler")
     5   eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
     6     override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
     7 
     8     override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
     9   }
    10   eventLoop.start()
    11 
    12   // attach rate controllers of input streams to receive batch completion updates
    13   for {
    14     inputDStream <- ssc.graph.getInputStreams
    15     rateController <- inputDStream.rateController
    16   } ssc.addStreamingListener(rateController)
    17 
    18   listenerBus.start(ssc.sparkContext)
    19   receiverTracker = new ReceiverTracker(ssc)
    20   inputInfoTracker = new InputInfoTracker(ssc)
    21   receiverTracker.start()
    22   jobGenerator.start()
    23   logInfo("Started JobScheduler")
    24 }

    ReceiverTracker 的类声明如下:

    1 This class manages the execution of the receivers of ReceiverInputDStreams. Instance of this class must be created after all input streams have been added and StreamingContext.start() has been called because it needs the final set of input streams at the time of instantiation.
    2 此类负责执行ReceiverInputDStreams的receiver。必须在添加所有输入流并调用StreamingContext.start()之后创建此类的实例,因为它在实例化时需要最终的输入流集。

    其 start 方法如下:

     1 /** Start the endpoint and receiver execution thread. */
     2 def start(): Unit = synchronized {
     3   if (isTrackerStarted) {
     4     throw new SparkException("ReceiverTracker already started")
     5   }
     6 
     7   if (!receiverInputStreams.isEmpty) {
     8 // 建立rpc endpoint
     9     endpoint = ssc.env.rpcEnv.setupEndpoint( // 注意,这是一个driver的 endpoint
    10       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    11 // driver节点上发送启动 receiver 命令
    12     if (!skipReceiverLaunch) launchReceivers()
    13     logInfo("ReceiverTracker started")
    14     trackerState = Started
    15   }
    16 }
    17 
    18 /**
    19  * Get the receivers from the ReceiverInputDStreams, distributes them to the
    20  * worker nodes as a parallel collection, and runs them.
    21  */
    22 // 从ReceiverInputDStreams 获取到 receivers,然后将它们分配到不同的 worker 节点并运行它们。
    23 private def launchReceivers(): Unit = {
    24   val receivers = receiverInputStreams.map(nis => {
    25 // 未启用WAL 是KafkaReceiver,启动WAL后是ReliableKafkaReceiver
    26     val rcvr = nis.getReceiver()
    27     rcvr.setReceiverId(nis.id)
    28     rcvr
    29   })
    30   // 运行一个简单的应用来确保所有的salve node都已经启动起来,避免所有的 receiver 任务都在同一个local node上
    31   runDummySparkJob()
    32 
    33   logInfo("Starting " + receivers.length + " receivers")
    34   endpoint.send(StartAllReceivers(receivers)) // 发送请求driver 转发 启动 receiver 的命令
    35 }

    Driver 端StartAllReceivers 的处理代码如下:

     1 override def receive: PartialFunction[Any, Unit] = {
     2   // Local messages
     3   case StartAllReceivers(receivers) =>
     4 // schduleReceiver
     5     val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
     6     for (receiver <- receivers) {
     7       val executors = scheduledLocations(receiver.streamId)
     8       updateReceiverScheduledExecutors(receiver.streamId, executors)
     9       receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
    10       startReceiver(receiver, executors)
    11     }
    12 ……
    13 }

    getExecutors源码如下:

     1 /**
     2  * Get the list of executors excluding driver
     3  */
     4 // 如果是 local 模式,返回 本地线程; 如果是 yarn 模式,返回 非driver 节点上的 excutors
     5 private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
     6   if (ssc.sc.isLocal) { // 如果在 local 模式下运行
     7     val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
     8     Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
     9   } else { // 在 yarn 模式下,过滤掉 driver 的 executor
    10     ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
    11       blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
    12     }.map { case (blockManagerId, _) =>
    13       ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
    14     }.toSeq
    15   }
    16 }

    org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy#scheduleReceivers的解释如下:

    1 Try our best to schedule receivers with evenly distributed. However, if the preferredLocations of receivers are not even, we may not be able to schedule them evenly because we have to respect them. Here is the approach to schedule executors:
    2 First, schedule all the receivers with preferred locations (hosts), evenly among the executors running on those host.
    3 Then, schedule all other receivers evenly among all the executors such that overall distribution over all the receivers is even.
    4 This method is called when we start to launch receivers at the first time.
    5 该方法就是确保receiver 能够在worker node 上均匀分布的。遵循以下两个原则:
    6 1.使用 preferred location 分配 receiver 到这些node 上
    7 2.将其他的未分配的receiver均匀分布均匀分布到 每一个 worker node 上 

    org.apache.spark.streaming.scheduler.ReceiverTracker#updateReceiverScheduledExecutors 负责更新receiverid 和 receiver info 的映射关系,源码如下:

     1 private def updateReceiverScheduledExecutors(
     2     receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {
     3   val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
     4     case Some(oldInfo) =>
     5       oldInfo.copy(state = ReceiverState.SCHEDULED,
     6         scheduledLocations = Some(scheduledLocations))
     7     case None =>
     8       ReceiverTrackingInfo(
     9         receiverId,
    10         ReceiverState.SCHEDULED,
    11         Some(scheduledLocations),
    12         runningExecutor = None)
    13   }
    14   receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
    15 }

    Driver 发送分布式启动receiver job

    startReceiver 负责启动 receiver,源码如下:

     1 /**
     2  * Start a receiver along with its scheduled executors
     3  */
     4 private def startReceiver(
     5     receiver: Receiver[_],
     6     scheduledLocations: Seq[TaskLocation]): Unit = {
     7   def shouldStartReceiver: Boolean = {
     8     // It's okay to start when trackerState is Initialized or Started
     9     !(isTrackerStopping || isTrackerStopped)
    10   }
    11 
    12   val receiverId = receiver.streamId
    13   if (!shouldStartReceiver) {
    14     onReceiverJobFinish(receiverId)
    15     return
    16   }
    17 
    18   val checkpointDirOption = Option(ssc.checkpointDir)
    19   val serializableHadoopConf =
    20     new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
    21 
    22 // 在 worker node 上启动 receiver 的方法
    23   val startReceiverFunc: Iterator[Receiver[_]] => Unit =
    24     (iterator: Iterator[Receiver[_]]) => {
    25       if (!iterator.hasNext) {
    26         throw new SparkException(
    27           "Could not start receiver as object not found.")
    28       }
    29       if (TaskContext.get().attemptNumber() == 0) {
    30         val receiver = iterator.next()
    31         assert(iterator.hasNext == false)
    32         val supervisor = new ReceiverSupervisorImpl(
    33           receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
    34         supervisor.start()
    35         supervisor.awaitTermination()
    36       } else {
    37         // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
    38       }
    39     }
    40 
    41   // Create the RDD using the scheduledLocations to run the receiver in a Spark job
    42   val receiverRDD: RDD[Receiver[_]] =
    43     if (scheduledLocations.isEmpty) {
    44       ssc.sc.makeRDD(Seq(receiver), 1)
    45     } else {
    46       val preferredLocations = scheduledLocations.map(_.toString).distinct
    47       ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
    48     }
    49   receiverRDD.setName(s"Receiver $receiverId")
    50   ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
    51   ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
    52   // 提交分布式receiver 启动任务
    53   val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
    54     receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
    55   // We will keep restarting the receiver job until ReceiverTracker is stopped
    56   future.onComplete {
    57     case Success(_) =>
    58       if (!shouldStartReceiver) {
    59         onReceiverJobFinish(receiverId)
    60       } else {
    61         logInfo(s"Restarting Receiver $receiverId")
    62         self.send(RestartReceiver(receiver))
    63       }
    64     case Failure(e) =>
    65       if (!shouldStartReceiver) {
    66         onReceiverJobFinish(receiverId)
    67       } else {
    68         logError("Receiver has been stopped. Try to restart it.", e)
    69         logInfo(s"Restarting Receiver $receiverId")
    70         self.send(RestartReceiver(receiver))
    71       }
    72   }(submitJobThreadPool)
    73   logInfo(s"Receiver ${receiver.streamId} started")
    74 }

    Worker节点启动 receiver监管服务

    org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#ReceiverSupervisorImpl 的 start 方法如下:

     1 /** Start the supervisor */
     2 def start() {
     3   onStart()
     4   startReceiver()
     5 }
     6 override protected def onStart() { // 启动 BlockGenerator 服务
     7   registeredBlockGenerators.foreach { _.start() }
     8 }
     9 // startReceiver 方法如下:
    10 /** Start receiver */
    11 def startReceiver(): Unit = synchronized {
    12   try {
    13     if (onReceiverStart()) { // 注册receiver 成功
    14       logInfo("Starting receiver")
    15       receiverState = Started
    16       receiver.onStart() // 启动 receiver
    17       logInfo("Called receiver onStart")
    18     } else {
    19       // The driver refused us
    20       stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
    21     }
    22   } catch {
    23     case NonFatal(t) =>
    24       stop("Error starting receiver " + streamId, Some(t))
    25   }
    26 }

    注册 receiver 到 driver节点

    1 override protected def onReceiverStart(): Boolean = {
    2   val msg = RegisterReceiver(
    3     streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
    4   trackerEndpoint.askWithRetry[Boolean](msg)
    5 }

    简单描述一下driver 端做的事情,主要负责将其纳入到org.apache.spark.streaming.scheduler.ReceiverTracker 的管理中来,具体streamid 和 ReceiverTrackingInfo 的映射关系保存在receiverTrackingInfos中。

    org.apache.spark.streaming.scheduler.ReceiverTracker#registerReceiver关键代码如下:

     1 val name = s"${typ}-${streamId}"
     2 val receiverTrackingInfo = ReceiverTrackingInfo(
     3   streamId,
     4   ReceiverState.ACTIVE,
     5   scheduledLocations = None,
     6   runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
     7   name = Some(name),
     8   endpoint = Some(receiverEndpoint))
     9 receiverTrackingInfos.put(streamId, receiverTrackingInfo)
    10 listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))

    启动 receiver 线程

    由于我们启用了 WAL, 所以 这里的receiver 是ReliableKafkaReceiver 的实例
    receiver.onStart 即 org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart, 源码如下:

     1 override def onStart(): Unit = {
     2   logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
     3 
     4   // Initialize the topic-partition / offset hash map.
     5 // 1. 负责维护消费的 topic-partition 和 offset 的映射关系
     6   topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
     7 
     8   // Initialize the stream block id / offset snapshot hash map.
     9 // 2. 负责维护 block-id 和 partition-offset 之间的映射关系
    10   blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
    11 
    12   // Initialize the block generator for storing Kafka message.
    13 // 3. 负责保存 kafka message 的 block generator,入参是GeneratedBlockHandler 实例,这是一个负责监听 block generator事件的一个监听器
    14 // Generates batches of objects received by a org.apache.spark.streaming.receiver.Receiver and puts them into appropriately named blocks at regular intervals. This class starts two threads, one to periodically start a new batch and prepare the previous batch of as a block, the other to push the blocks into the block manager. 
    15   blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)
    16   // 4. 关闭consumer 自动提交 offset 选项
    17 // auto_offset_commit 应该是 false
    18   if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
    19     logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
    20       "otherwise we will manually set it to false to turn off auto offset commit in Kafka")
    21   }
    22 
    23   val props = new Properties()
    24   kafkaParams.foreach(param => props.put(param._1, param._2))
    25   // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
    26   // we have to make sure this property is set to false to turn off auto commit mechanism in Kafka.
    27   props.setProperty(AUTO_OFFSET_COMMIT, "false")
    28 
    29   val consumerConfig = new ConsumerConfig(props)
    30 
    31   assert(!consumerConfig.autoCommitEnable)
    32 
    33   logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
    34 // 5. 初始化 consumer 对象
    35 // consumerConnector 是ZookeeperConsumerConnector的实例
    36   consumerConnector = Consumer.create(consumerConfig)
    37   logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
    38   // 6. 初始化zookeeper 的客户端
    39   zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
    40     consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
    41    // 7. 创建线程池来处理消息流,池的大小是固定的,为partition 的总数,并指定线程池中每一个线程的name 的前缀,内部使用ThreadPoolExecutor,并且 创建线程的 factory类是guava 工具包提供的。
    42   messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
    43     topics.values.sum, "KafkaMessageHandler")
    44    // 8. 启动 BlockGenerator内的两个线程
    45   blockGenerator.start()
    46 
    47 // 9. 创建MessageStream对象
    48   val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
    49     .newInstance(consumerConfig.props)
    50     .asInstanceOf[Decoder[K]]
    51 
    52   val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
    53     .newInstance(consumerConfig.props)
    54     .asInstanceOf[Decoder[V]]
    55  
    56   val topicMessageStreams = consumerConnector.createMessageStreams(
    57     topics, keyDecoder, valueDecoder)
    58 // 10. 将待处理的MessageHandler 放入 线程池中,等待执行
    59   topicMessageStreams.values.foreach { streams =>
    60     streams.foreach { stream =>
    61       messageHandlerThreadPool.submit(new MessageHandler(stream))
    62     }
    63   }
    64 }

    其中, 第9 步,创建MessageStream对象,
    kafka.consumer.ZookeeperConsumerConnector#createMessageStreams 方法如下:

    1 def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
    2     : Map[String, List[KafkaStream[K,V]]] = {
    3   if (messageStreamCreated.getAndSet(true))
    4     throw new MessageStreamsExistException(this.getClass.getSimpleName +
    5                                  " can create message streams at most once",null)
    6   consume(topicCountMap, keyDecoder, valueDecoder)
    7 }

    其调用了 consume 方法,源码如下:

    def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
        : Map[String,List[KafkaStream[K,V]]] = {
      debug("entering consume ")
      if (topicCountMap == null)
        throw new RuntimeException("topicCountMap is null")
     // 1. 初始化 topicCount
      val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
     // 2. 获取 每一个topic 和 threadId 集合的映射关系
      val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
    
      // make a list of (queue,stream) pairs, one pair for each threadId
    // 3. 得到每一个 threadId 对应 (queue, stream) 的映射列表
      val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
        threadIdSet.map(_ => {
          val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
          val stream = new KafkaStream[K,V](
            queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
          (queue, stream)
        })
      ).flatten.toList
     // 4. 获取 groupId 在 zookeeper 中的path
      val dirs = new ZKGroupDirs(config.groupId)
    // 5. 注册 consumer 到 groupId(在zk中)
      registerConsumerInZK(dirs, consumerIdString, topicCount)
    // 6. 重新初始化 consumer
      reinitializeConsumer(topicCount, queuesAndStreams)
      // 7. 返回流 
      loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
    }

    consumer消费kafka数据

    在 kafka.consumer.ZookeeperConsumerConnector#consume方法中,有如下操作:

     1 // 得到每一个 threadId 对应 (queue, stream) 的映射列表
     2   val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
     3     threadIdSet.map(_ => {
     4       val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
     5       val stream = new KafkaStream[K,V](
     6         queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
     7       (queue, stream)
     8     })
     9   ).flatten.toList
    10  // 获取 groupId 在 zookeeper 中的path
    11   val dirs = new ZKGroupDirs(config.groupId)
    12 // 注册 consumer 到 groupId(在zk中)
    13   registerConsumerInZK(dirs, consumerIdString, topicCount)
    14 // 重新初始化 consumer
    15   reinitializeConsumer(topicCount, queuesAndStreams)

    在上面的代码中,可以看到初始化的queue(LinkedBlockingQueue实例)除了被传入stream(KafkaStream)的构造函数被迭代器从中取数据,还和 stream 重组成Tuple2[LinkedBlockingQueue[FetchedDataChunk]的list,之后被传入reinitializeConsumer 方法中。
    kafka.consumer.ZookeeperConsumerConnector#reinitializeConsume 其源码如下:

     1 private def reinitializeConsumer[K,V](
     2     topicCount: TopicCount,
     3     queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
     4  // 1. 获取 该groupid 在 zk 中的路径
     5   val dirs = new ZKGroupDirs(config.groupId)
     6 
     7   // listener to consumer and partition changes
     8 // 2. 初始化loadBalancerListener,这个负载均衡listener 会时刻监控 consumer 和 partition 的变化
     9   if (loadBalancerListener == null) {
    10     val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
    11     loadBalancerListener = new ZKRebalancerListener(
    12       config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
    13   }
    14 
    15   // create listener for session expired event if not exist yet
    16   // 3. 监控 session 过期的listner, 有新session注册初始化,会通知 loadBalancer
    17 if (sessionExpirationListener == null)
    18     sessionExpirationListener = new ZKSessionExpireListener(
    19       dirs, consumerIdString, topicCount, loadBalancerListener)
    20 
    21   // create listener for topic partition change event if not exist yet
    22 // 4. 初始化ZKTopicPartitionChangeListener实例,当topic partition 变化时,这个listener会通知 loadBalancer
    23   if (topicPartitionChangeListener == null)
    24     topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
    25  // 5. 将queuesAndStreams 的值经过一系列转换,并添加到loadBalancerListener.kafkaMessageAndMetadataStreams 中
    26   val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams
    27 
    28   // map of {topic -> Set(thread-1, thread-2, ...)}
    29   val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
    30     topicCount.getConsumerThreadIdsPerTopic
    31 
    32   val allQueuesAndStreams = topicCount match {
    33     case wildTopicCount: WildcardTopicCount => // 这里是WildcardTopicCount,走这个分支
    34       /*
    35        * Wild-card consumption streams share the same queues, so we need to
    36        * duplicate the list for the subsequent zip operation.
    37        */
    38       (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
    39     case statTopicCount: StaticTopicCount =>
    40       queuesAndStreams
    41   }
    42 
    43   val topicThreadIds = consumerThreadIdsPerTopic.map {
    44     case(topic, threadIds) =>
    45       threadIds.map((topic, _))
    46   }.flatten
    47 
    48   require(topicThreadIds.size == allQueuesAndStreams.size,
    49     "Mismatch between thread ID count (%d) and queue count (%d)"
    50     .format(topicThreadIds.size, allQueuesAndStreams.size))
    51   val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams)
    52 
    53   threadQueueStreamPairs.foreach(e => {
    54     val topicThreadId = e._1
    55     val q = e._2._1
    56     topicThreadIdAndQueues.put(topicThreadId, q)
    57     debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
    58     newGauge(
    59       "FetchQueueSize",
    60       new Gauge[Int] {
    61         def value = q.size
    62       },
    63       Map("clientId" -> config.clientId,
    64         "topic" -> topicThreadId._1,
    65         "threadId" -> topicThreadId._2.threadId.toString)
    66     )
    67   })
    68 
    69   val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
    70   groupedByTopic.foreach(e => {
    71     val topic = e._1
    72     val streams = e._2.map(_._2._2).toList
    73     topicStreamsMap += (topic -> streams)
    74     debug("adding topic %s and %d streams to map.".format(topic, streams.size))
    75   })
    76 
    77   // listener to consumer and partition changes
    78 // 6. 使用 zkClient 注册sessionExpirationListener 实例
    79   zkClient.subscribeStateChanges(sessionExpirationListener)
    80  // 7. 使用 zkClient 注册loadBalancerListener 实例
    81   zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
    82  // 遍历每一个topic,使用zkClient 注册topicPartitionChangeListener 实例
    83   topicStreamsMap.foreach { topicAndStreams =>
    84     // register on broker partition path changes
    85     val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
    86     zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
    87   }
    88 
    89   // explicitly trigger load balancing for this consumer
    90 // 8. 使用 loadBalancerListener 同步做负载均衡
    91   loadBalancerListener.syncedRebalance()
    92 }

    重点看 第 8 步,使用 loadBalancerListener 同步做负载均衡。
    kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance 源码如下:

     1 def syncedRebalance() {
     2   rebalanceLock synchronized {
     3     rebalanceTimer.time {
     4       if(isShuttingDown.get())  { // 如果ZookeeperConsumerConnector
     5 已经shutdown了,直接返回
     6         return
     7       } else {
     8         for (i <- 0 until config.rebalanceMaxRetries) { // 默认是 4 次
     9           info("begin rebalancing consumer " + consumerIdString + " try #" + i)
    10           var done = false
    11           var cluster: Cluster = null
    12           try {
    13             // 1. 根据zkClient 实例 获取并创建Cluster 对象,这个 cluster 实例包含了一个 Broker(broker的id,broker在zk中的路径) 列表
    14             cluster = getCluster(zkClient) 
    15             // 2. 在cluster中做 rebalance操作
    16             done = rebalance(cluster)
    17           } catch {
    18             case e: Throwable =>
    19               /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
    20                 * For example, a ZK node can disappear between the time we get all children and the time we try to get
    21                 * the value of a child. Just let this go since another rebalance will be triggered.
    22                 **/
    23               info("exception during rebalance ", e)
    24           }
    25           info("end rebalancing consumer " + consumerIdString + " try #" + i)
    26           if (done) {
    27             return
    28           } else {
    29             /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
    30              * clear the cache */
    31             info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
    32           }
    33           // stop all fetchers and clear all the queues to avoid data duplication
    34           closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
    35           Thread.sleep(config.rebalanceBackoffMs)
    36         }
    37       }
    38     }
    39   }
    40 
    41   throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
    42 }

    重点看 第2 步,在 cluster 中做 rebalance 操作,kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#rebalance 源码如下:

     1 private def rebalance(cluster: Cluster): Boolean = {
     2   // 1. 获取 group和 threadId 的Map 映射关系
     3   val myTopicThreadIdsMap = TopicCount.constructTopicCount(
     4     group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
     5   // 2. 获取kafka cluster 中所有可用的node
     6   val brokers = getAllBrokersInCluster(zkClient)
     7   if (brokers.size == 0) { // 如果可用节点为空,设置listener订阅,返回 true
     8     // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
     9     // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
    10     // are up.
    11     warn("no brokers found when trying to rebalance.")
    12     zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
    13     true
    14   }
    15   else {
    16     /**
    17      * fetchers must be stopped to avoid data duplication, since if the current
    18      * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
    19      * But if we don't stop the fetchers first, this consumer would continue returning data for released
    20      * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
    21      */
    22    // 3. 做rebalance 之前的准备工作
    23    // 3.1. 关闭现有 fetcher 连接
    24     closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
    25    // 3.2 释放 partition 的所有权(主要是删除zk下的owner 节点的数据以及解除内存中的topic和 fetcher的关联关系)
    26     releasePartitionOwnership(topicRegistry)
    27    // 3.3. 重新给partition分配 fetcher
    28     val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
    29     val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
    30     val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
    31       valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))
    32 
    33     // fetch current offsets for all topic-partitions
    34     // 3.4 获取当前fetcher对应的 partitions 的 offsets,这里的offset是指 consumer 下一个要消费的offset
    35     val topicPartitions = partitionOwnershipDecision.keySet.toSeq
    36 
    37     val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
    38 
    39     if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
    40       false
    41     else {
    42       // 3.5 更新 partition 和 fetcher 的对应关系
    43       val offsetFetchResponse = offsetFetchResponseOpt.get
    44       topicPartitions.foreach(topicAndPartition => {
    45         val (topic, partition) = topicAndPartition.asTuple
    46 // requestInfo是OffsetFetchResponse实例中的成员变量,它是一个Map[TopicAndPartition, OffsetMetadataAndError]实例
    47         val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
    48         val threadId = partitionOwnershipDecision(topicAndPartition)
    49         addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
    50       })
    51 
    52       /**
    53        * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
    54        * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
    55        */
    56       if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
    57         allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size
    58 
    59         partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
    60                                   .foreach { case (topic, partitionThreadPairs) =>
    61           newGauge("OwnedPartitionsCount",
    62             new Gauge[Int] {
    63               def value() = partitionThreadPairs.size
    64             },
    65             ownedPartitionsCountMetricTags(topic))
    66         }
    67         // 3.6 将已经新的 topic registry 覆盖旧的
    68         topicRegistry = currentTopicRegistry
    69 // 4. 更新 fetcher
    70         updateFetcher(cluster)
    71         true
    72       } else {
    73         false
    74       }
    75     }
    76   }
    77 }

    其中addPartitionTopicInfo 源码如下:

     1 private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
     2                                     partition: Int, topic: String,
     3                                     offset: Long, consumerThreadId: ConsumerThreadId) {
     4 //如果map没有对应的 key,会使用valueFactory初始化键值对,并返回 对应的 value
     5     val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic)
     6 
     7     val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
     8     val consumedOffset = new AtomicLong(offset)
     9     val fetchedOffset = new AtomicLong(offset)
    10     val partTopicInfo = new PartitionTopicInfo(topic,
    11                                                partition,
    12                                                queue,
    13                                                consumedOffset,
    14                                                fetchedOffset,
    15                                                new AtomicInteger(config.fetchMessageMaxBytes),
    16                                                config.clientId)
    17     // 1. 将其注册到新的 Topic注册中心中,即注册 partition 和 fetcher 的关系
    18 partTopicInfoMap.put(partition, partTopicInfo)
    19     debug(partTopicInfo + " selected new offset " + offset)
    20 // 2. 更新consumer 的 已经消费的offset信息
    21     checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset)
    22   }
    23 }

    第4步, 更新 fetcher 源码如下:

     1 private def updateFetcher(cluster: Cluster) {
     2   // update partitions for fetcher
     3   var allPartitionInfos : List[PartitionTopicInfo] = Nil
     4   for (partitionInfos <- topicRegistry.values)
     5     for (partition <- partitionInfos.values)
     6       allPartitionInfos ::= partition
     7   info("Consumer " + consumerIdString + " selected partitions : " +
     8     allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))
     9 
    10   fetcher match {
    11     case Some(f) =>
    12       f.startConnections(allPartitionInfos, cluster)
    13     case None =>
    14   }
    15 }

    其中,f.startConnections方法真正执行 更新操作。此时引入一个新的类。即 fetcher 类,kafka.consumer.ConsumerFetcherManager。

    kafka.consumer.ConsumerFetcherManager#startConnections 的源码如下:

     1 def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) {
     2 // LeaderFinderThread 在 topic 的leader node可用时,将 fetcher 添加到 leader 节点上
     3   leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread")
     4   leaderFinderThread.start()
     5 
     6   inLock(lock) {
     7 // 更新ConsumerFetcherManager 成员变量
     8     partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap
     9     this.cluster = cluster
    10     noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId))
    11     cond.signalAll()
    12   }
    13 }

    ConsumerFetcherManager 有一个LeaderFinderThread 实例,该类的父类kafka.utils.ShutdownableThread ,run 方法如下:

     1 override def run(): Unit = {
     2   info("Starting ")
     3   try{
     4     while(isRunning.get()){
     5       doWork()
     6     }
     7   } catch{
     8     case e: Throwable =>
     9       if(isRunning.get())
    10         error("Error due to ", e)
    11   }
    12   shutdownLatch.countDown()
    13   info("Stopped ")
    14 }

    doWork其实就是一个抽象方法,其子类LeaderFinderThread的实现如下:

     1 // thread responsible for adding the fetcher to the right broker when leader is available
     2 override def doWork() {
     3 // 1. 获取 partition 和leader node的映射关系
     4   val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
     5   lock.lock()
     6   try {
     7     while (noLeaderPartitionSet.isEmpty) { // 这个字段在startConnections 已更新新值
     8       trace("No partition for leader election.")
     9       cond.await()
    10     }
    11 
    12     trace("Partitions without leader %s".format(noLeaderPartitionSet))
    13     val brokers = getAllBrokersInCluster(zkClient) // 获取所有可用broker 节点
    14     // 获取kafka.api.TopicMetadata 序列,kafka.api.TopicMetadata 保存了 topic 和 partitionId,isr,leader,replicas 的信息
    15 val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
    16                                                         brokers,
    17                                                         config.clientId,
    18                                                         config.socketTimeoutMs,
    19                                                         correlationId.getAndIncrement).topicsMetadata
    20     if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString()))
    21 // 2. 根据获取到的 partition 和 leader node 的关系更新noLeaderPartitionSet 和leaderForPartitionsMap 两个map集合,其中noLeaderPartitionSet 包含的是没有确定leader 的 partition 集合,leaderForPartitionsMap 是 已经确定了 leader 的 partition 集合。
    22     topicsMetadata.foreach { tmd =>
    23       val topic = tmd.topic
    24       tmd.partitionsMetadata.foreach { pmd =>
    25         val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
    26         if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) {
    27           val leaderBroker = pmd.leader.get
    28           leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
    29           noLeaderPartitionSet -= topicAndPartition
    30         }
    31       }
    32     }
    33   } catch {
    34     case t: Throwable => {
    35         if (!isRunning.get())
    36           throw t /* If this thread is stopped, propagate this exception to kill the thread. */
    37         else
    38           warn("Failed to find leader for %s".format(noLeaderPartitionSet), t)
    39       }
    40   } finally {
    41     lock.unlock()
    42   }
    43 
    44   try {
    45 // 3. 具体为 partition 分配 fetcher
    46     addFetcherForPartitions(leaderForPartitionsMap.map{
    47       case (topicAndPartition, broker) =>
    48         topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())}
    49     )
    50   } catch {
    51     case t: Throwable => {
    52       if (!isRunning.get())
    53         throw t /* If this thread is stopped, propagate this exception to kill the thread. */
    54       else {
    55         warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t)
    56         lock.lock()
    57         noLeaderPartitionSet ++= leaderForPartitionsMap.keySet
    58         lock.unlock()
    59       }
    60     }
    61   }
    62   // 4. 关闭空闲fetcher线程
    63   shutdownIdleFetcherThreads()
    64   Thread.sleep(config.refreshLeaderBackoffMs)
    65 }

    重点看第3 步,具体为 partition 分配 fetcher,addFetcherForPartitions 源码如下:

     1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
     2   mapLock synchronized {
     3 // 获取 fetcher 和 partition的映射关系
     4     val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
     5       BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
     6     for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
     7 
     8       var fetcherThread: AbstractFetcherThread = null
     9       fetcherThreadMap.get(brokerAndFetcherId) match {
    10         case Some(f) => fetcherThread = f
    11         case None =>
    12 // 根据brokerAndFetcherId 去初始化Fetcher并启动 fetcher
    13           fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
    14           fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
    15           fetcherThread.start
    16       }
    17 
    18       fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
    19         topicAndPartition -> brokerAndInitOffset.initOffset
    20       })
    21     }
    22   }
    23 
    24   info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
    25     "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
    26 }

    kafka.consumer.ConsumerFetcherManager#createFetcherThread的源码如下:

    1 override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
    2   new ConsumerFetcherThread(
    3     "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id),
    4     config, sourceBroker, partitionMap, this)
    5 }

    先来看ConsumerFetcherThread的构造方法声明:

     1 class ConsumerFetcherThread(name: String,
     2                             val config: ConsumerConfig,
     3                             sourceBroker: Broker,
     4                             partitionMap: Map[TopicAndPartition, PartitionTopicInfo],
     5                             val consumerFetcherManager: ConsumerFetcherManager)
     6         extends AbstractFetcherThread(name = name, 
     7                                       clientId = config.clientId,
     8                                       sourceBroker = sourceBroker,
     9                                       socketTimeout = config.socketTimeoutMs,
    10                                       socketBufferSize = config.socketReceiveBufferBytes,
    11                                       fetchSize = config.fetchMessageMaxBytes,
    12                                       fetcherBrokerId = Request.OrdinaryConsumerId,
    13                                       maxWait = config.fetchWaitMaxMs,
    14                                       minBytes = config.fetchMinBytes,
    15                                       isInterruptible = true)

    注意,partitionMap 中的value 是PartitionTopicInfo ,这个对象中封装了存放fetch结果值的BlockingQueue[FetchedDataChunk] 实例。
    再来看 run 方法,其使用的是 kafka.utils.ShutdownableThread#run 方法,上面我们已经看过了,主要看该子类是如何重新 doWork方法的:

     1 override def doWork() {
     2   inLock(partitionMapLock) { // 加锁,执行,释放锁
     3     if (partitionMap.isEmpty) // 如果没有需要执行的 fetch 操作,等待200ms后返回
     4       partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
     5     partitionMap.foreach { // 将所有的 fetch 的信息添加到fetchRequestBuilder中
     6       case((topicAndPartition, offset)) =>
     7         fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
     8                          offset, fetchSize)
     9     }
    10   }
    11   // 构建批抓取的fetchRequest对象
    12   val fetchRequest = fetchRequestBuilder.build()
    13 // 处理 FetchRequest
    14   if (!fetchRequest.requestInfo.isEmpty)
    15     processFetchRequest(fetchRequest)
    16 }

    其中 kafka.server.AbstractFetcherThread#processFetchRequest 源码如下:

     1 private def processFetchRequest(fetchRequest: FetchRequest) {
     2   val partitionsWithError = new mutable.HashSet[TopicAndPartition]
     3   var response: FetchResponse = null
     4   try {
     5     trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
     6 // 发送请求,并获取返回值。
     7 // simpleConsumer  就是SimpleConsumer 实例,已作说明,不再赘述。
     8     response = simpleConsumer.fetch(fetchRequest)
     9   } catch {
    10     case t: Throwable =>
    11       if (isRunning.get) {
    12         warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString))
    13         partitionMapLock synchronized {
    14           partitionsWithError ++= partitionMap.keys
    15         }
    16       }
    17   }
    18   fetcherStats.requestRate.mark()
    19 
    20   if (response != null) {
    21     // process fetched data
    22     inLock(partitionMapLock) { // 获取锁,执行处理response 操作,释放锁
    23       response.data.foreach {
    24         case(topicAndPartition, partitionData) =>
    25           val (topic, partitionId) = topicAndPartition.asTuple
    26           val currentOffset = partitionMap.get(topicAndPartition)
    27           // we append to the log if the current offset is defined and it is the same as the offset requested during fetch
    28           if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) {
    29             partitionData.error match { // 根据返回码来确定具体执行哪部分处理逻辑
    30               case ErrorMapping.NoError => // 成功返回,没有错误
    31                 try {
    32                   val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
    33                   val validBytes = messages.validBytes
    34                   val newOffset = messages.shallowIterator.toSeq.lastOption match {
    35                     case Some(m: MessageAndOffset) => m.nextOffset
    36                     case None => currentOffset.get
    37                   }
    38                   partitionMap.put(topicAndPartition, newOffset)
    39                   fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset
    40                   fetcherStats.byteRate.mark(validBytes)
    41                   // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
    42                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
    43                 } catch {
    44                   case ime: InvalidMessageException => // 消息获取不完整
    45                     // we log the error and continue. This ensures two things
    46                     // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
    47                     // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
    48                     //    should get fixed in the subsequent fetches
    49                     logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
    50                   case e: Throwable =>
    51                     throw new KafkaException("error processing data for partition [%s,%d] offset %d"
    52                                              .format(topic, partitionId, currentOffset.get), e)
    53                 }
    54               case ErrorMapping.OffsetOutOfRangeCode => // offset out of range error
    55                 try {
    56                   val newOffset = handleOffsetOutOfRange(topicAndPartition)
    57                   partitionMap.put(topicAndPartition, newOffset)
    58                   error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
    59                     .format(currentOffset.get, topic, partitionId, newOffset))
    60                 } catch {
    61                   case e: Throwable =>
    62                     error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
    63                     partitionsWithError += topicAndPartition
    64                 }
    65               case _ =>
    66                 if (isRunning.get) {
    67                   error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
    68                     ErrorMapping.exceptionFor(partitionData.error).getClass))
    69                   partitionsWithError += topicAndPartition
    70                 }
    71             }
    72           }
    73       }
    74     }
    75   }
    76 
    77   if(partitionsWithError.size > 0) {
    78     debug("handling partitions with error for %s".format(partitionsWithError))
    79     handlePartitionsWithErrors(partitionsWithError)
    80   }
    81 }

    其中processPartitionData 源码如下,它负责处理具体的返回消息:

     1 // process fetched data
     2 def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
     3 // partitionMap 是一个成员变量,在构造函数中作为入参
     4   val pti = partitionMap(topicAndPartition)
     5   if (pti.getFetchOffset != fetchOffset)
     6     throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
     7                               .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
     8 // 数据入队
     9   pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
    10 }

    可以看到,终于在这里,把从leader中fetch的消息放入了BlockingQueue[FetchedDataChunk] 缓冲堵塞队列中。

    KafkaStream从queue中堵塞式获取数据

    KafkaStream 是依赖于 LinkedBlockingQueue 的同理 KafkaStream 也会返回一个迭代器 kafka.consumer.ConsumerIterator,用于迭代访问 KafkaStream 中的数据。
    kafka.consumer.ConsumerIterator 的主要源码如下:

     1 // 判断是否有下一个元素
     2 def hasNext(): Boolean = {
     3   if(state == FAILED)
     4     throw new IllegalStateException("Iterator is in failed state")
     5   state match {
     6     case DONE => false
     7     case READY => true
     8     case _ => maybeComputeNext()
     9   }
    10 }
    11 // 获取下一个元素,父类实现
    12 def next(): T = {
    13   if(!hasNext())
    14     throw new NoSuchElementException()
    15   state = NOT_READY
    16   if(nextItem == null)
    17     throw new IllegalStateException("Expected item but none found.")
    18   nextItem
    19 }
    20 // 获取下一个元素,使用子类ConsumerIterator实现
    21 override def next(): MessageAndMetadata[K, V] = {
    22   val item = super.next() // 调用父类实现
    23   if(consumedOffset < 0)
    24     throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset))
    25   currentTopicInfo.resetConsumeOffset(consumedOffset)
    26   val topic = currentTopicInfo.topic
    27   trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
    28   consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark()
    29   consumerTopicStats.getConsumerAllTopicStats().messageRate.mark()
    30   item
    31 }
    32  // 或许有,尝试计算一下下一个
    33 def maybeComputeNext(): Boolean = {
    34   state = FAILED
    35   nextItem = makeNext()
    36   if(state == DONE) {
    37     false
    38   } else {
    39     state = READY
    40     true
    41   }
    42 }
    43 // 创建下一个元素,这个在子类ConsumerIterator中有实现
    44 protected def makeNext(): MessageAndMetadata[K, V] = {
    45 // 首先channel 是 LinkedBlockingQueue实例, 是 KafkaStream 中的 queue 成员变量,queue 成员变量
    46   var currentDataChunk: FetchedDataChunk = null
    47   // if we don't have an iterator, get one
    48   var localCurrent = current.get() 
    49 // 如果没有迭代器或者是没有下一个元素了,需要从channel中取一个
    50   if(localCurrent == null || !localCurrent.hasNext) {
    51 // 删除并返回队列的头节点。
    52     if (consumerTimeoutMs < 0)
    53       currentDataChunk = channel.take // 阻塞方法,一直等待,直到有可用元素
    54     else {
    55       currentDataChunk = channel.poll(consumerTimeoutMs,  TimeUnit.MILLISECONDS) // 阻塞方法,等待指定时间,超时也会返回
    56       if (currentDataChunk == null) { // 如果没有数据,重置状态为NOT_READY
    57         // reset state to make the iterator re-iterable
    58         resetState()
    59         throw new ConsumerTimeoutException
    60       }
    61     }
    62 // 关闭命令
    63     if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) {
    64       debug("Received the shutdown command")
    65       return allDone // 该函数将状态设为DONE, 返回null
    66     } else {
    67       currentTopicInfo = currentDataChunk.topicInfo
    68       val cdcFetchOffset = currentDataChunk.fetchOffset
    69       val ctiConsumeOffset = currentTopicInfo.getConsumeOffset
    70       if (ctiConsumeOffset < cdcFetchOffset) {
    71         error("consumed offset: %d doesn't match fetch offset: %d for %s;
     Consumer may lose data"
    72           .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
    73         currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
    74       }
    75       localCurrent = currentDataChunk.messages.iterator
    76 
    77       current.set(localCurrent)
    78     }
    79     // if we just updated the current chunk and it is empty that means the fetch size is too small!
    80     if(currentDataChunk.messages.validBytes == 0)
    81       throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
    82                                              "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
    83                                              .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
    84   }
    85   var item = localCurrent.next()
    86   // reject the messages that have already been consumed
    87   while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) {
    88     item = localCurrent.next()
    89   }
    90   consumedOffset = item.nextOffset
    91 
    92   item.message.ensureValid() // validate checksum of message to ensure it is valid
    93  // 返回处理封装好的 kafka 数据
    94   new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder)
    95 }

    消费到的数据cache 到WAL中

    我们再来看,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart 的第10 步相应的代码:

    1 // 10. 将待处理的MessageHandler 放入 线程池中,等待执行
    2   topicMessageStreams.values.foreach { streams =>
    3     streams.foreach { stream =>
    4       messageHandlerThreadPool.submit(new MessageHandler(stream))
    5     }
    6   }

    其中 MessageHandler 是一个 Runnable 对象,其 run 方法如下:

     1 override def run(): Unit = {
     2   while (!isStopped) {
     3     try {
     4 // 1. 获取ConsumerIterator 迭代器对象
     5       val streamIterator = stream.iterator()
     6       // 2. 遍历迭代器中获取每一条数据,并且保存message和相应的 metadata 信息
     7 while (streamIterator.hasNext) {
     8         storeMessageAndMetadata(streamIterator.next)
     9       }
    10     } catch {
    11       case e: Exception =>
    12         reportError("Error handling message", e)
    13     }
    14   }
    15 }

    其中第二步中关键方法,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#storeMessageAndMetadata 方法如下:

    1 /** Store a Kafka message and the associated metadata as a tuple. */
    2 private def storeMessageAndMetadata(
    3     msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
    4   val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
    5   val data = (msgAndMetadata.key, msgAndMetadata.message)
    6   val metadata = (topicAndPartition, msgAndMetadata.offset)
    7 // 添加数据到 block
    8   blockGenerator.addDataWithCallback(data, metadata)
    9 }

    addDataWithCallback 源码如下:

     1 /**
     2  * Push a single data item into the buffer. After buffering the data, the
     3  * `BlockGeneratorListener.onAddData` callback will be called.
     4  */
     5 def addDataWithCallback(data: Any, metadata: Any): Unit = {
     6   if (state == Active) {
     7     waitToPush()
     8     synchronized {
     9       if (state == Active) {
    10 // 1. 将数据放入 buffer 中,以便处理线程从中获取数据
    11         currentBuffer += data
    12 // 2. 在启动 receiver线程中,可以知道listener 是指GeneratedBlockHandler 实例
    13         listener.onAddData(data, metadata)
    14       } else {
    15         throw new SparkException(
    16           "Cannot add data as BlockGenerator has not been started or has been stopped")
    17       }
    18     }
    19   } else {
    20     throw new SparkException(
    21       "Cannot add data as BlockGenerator has not been started or has been stopped")
    22   }
    23 }

    第二步比较简单,先看一下第二步:
    org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler#onAddData的源码如下:

     1 def onAddData(data: Any, metadata: Any): Unit = {
     2   // Update the offset of the data that was added to the generator
     3   if (metadata != null) {
     4     val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
     5     updateOffset(topicAndPartition, offset)
     6   }
     7 }
     8 // 这里的 updateOffset 调用的是//org.apache.spark.streaming.kafka.ReliableKafkaReceiver#updateOffset,源码如下:
     9 /** Update stored offset */
    10 private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
    11   topicPartitionOffsetMap.put(topicAndPartition, offset)
    12 }

    第一步的原理如下:
    在 BlockGenerator中有一个定时器,定时(200ms)去执行检查currentBuffer是否为empty任务, 若不为空,则执行如下操作并把它放入等待生成block 的队列中,有两外一个线程来时刻监听这个队列,有数据,则执行pushBlock 操作。
    第一个定时器线程如下:

     1 private val blockIntervalTimer =
     2   new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")
     3 
     4 // 其中,updateCurrentBuffer 方法如下
     5 /** Change the buffer to which single records are added to. */
     6 private def updateCurrentBuffer(time: Long): Unit = {
     7   try {
     8     var newBlock: Block = null
     9     synchronized {
    10       if (currentBuffer.nonEmpty) {
    11         val newBlockBuffer = currentBuffer
    12         currentBuffer = new ArrayBuffer[Any]
    13         val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
    14         listener.onGenerateBlock(blockId)
    15         newBlock = new Block(blockId, newBlockBuffer)
    16       }
    17     }
    18 
    19     if (newBlock != null) {
    20       blocksForPushing.put(newBlock)  // put is blocking when queue is full
    21     }
    22   } catch {
    23     case ie: InterruptedException =>
    24       logInfo("Block updating timer thread was interrupted")
    25     case e: Exception =>
    26       reportError("Error in block updating thread", e)
    27   }
    28 }
    29 
    30 // listener.onGenerateBlock(blockId) 代码如下:
    31 def onGenerateBlock(blockId: StreamBlockId): Unit = {
    32   // Remember the offsets of topics/partitions when a block has been generated
    33   rememberBlockOffsets(blockId)
    34 }
    35 // rememberBlockOffsets 代码如下:
    36 /**
    37  * Remember the current offsets for each topic and partition. This is called when a block is
    38  * generated.
    39  */
    40 private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
    41   // Get a snapshot of current offset map and store with related block id.
    42   val offsetSnapshot = topicPartitionOffsetMap.toMap
    43   blockOffsetMap.put(blockId, offsetSnapshot)
    44   topicPartitionOffsetMap.clear()
    45 }
    46 // 可以看出,主要是清除 topic-partition-> offset 映射关系
    47 // 建立 block 和topic-partition-> offset的映射关系

    其中,blocksForPushing是一个有界阻塞队列,另外一个线程会一直轮询它。

     1 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
     2 private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
     3 
     4 /** Keep pushing blocks to the BlockManager. */
     5 // 这个方法主要的作用就是一直不停地轮询blocksForPushing队列,并处理相应的push block 事件。
     6 private def keepPushingBlocks() {
     7   logInfo("Started block pushing thread")
     8 
     9   def areBlocksBeingGenerated: Boolean = synchronized {
    10     state != StoppedGeneratingBlocks
    11   }
    12 
    13   try {
    14     // While blocks are being generated, keep polling for to-be-pushed blocks and push them.
    15     while (areBlocksBeingGenerated) { // 线程没有被停止,则一直循环
    16 // 超时poll操作获取并删除头节点,超过时间(10ms)则返回
    17       Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match {
    18         case Some(block) => pushBlock(block) // 如果有数据则进行处理。
    19         case None =>
    20       }
    21     }
    22 
    23     // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks.
    24     logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
    25     while (!blocksForPushing.isEmpty) { // 如果队列中还有数据,继续进行处理
    26       val block = blocksForPushing.take() // 这是一个堵塞方法,不过现在会马上返回,因为队列里面有数据。
    27       logDebug(s"Pushing block $block")
    28       pushBlock(block) // 处理数据
    29       logInfo("Blocks left to push " + blocksForPushing.size())
    30     }
    31     logInfo("Stopped block pushing thread")
    32   } catch {
    33     case ie: InterruptedException =>
    34       logInfo("Block pushing thread was interrupted")
    35     case e: Exception =>
    36       reportError("Error in block pushing thread", e)
    37   }
    38 }

    其中的pushBlock源码如下:

    1 private def pushBlock(block: Block) {
    2   listener.onPushBlock(block.id, block.buffer)
    3   logInfo("Pushed block " + block.id)
    4 }

    其调用的listener(org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler)的 onPushBlock 源码如下:

    1 def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
    2   // Store block and commit the blocks offset
    3   storeBlockAndCommitOffset(blockId, arrayBuffer)
    4 }

    其中,storeBlockAndCommitOffset具体代码如下:

     1 /**
     2  * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method
     3  * will try a fixed number of times to push the block. If the push fails, the receiver is stopped.
     4  */
     5 private def storeBlockAndCommitOffset(
     6     blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
     7   var count = 0
     8   var pushed = false
     9   var exception: Exception = null
    10   while (!pushed && count <= 3) { // 整个过程,总共允许3 次重试
    11     try {
    12       store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
    13       pushed = true
    14     } catch {
    15       case ex: Exception =>
    16         count += 1
    17         exception = ex
    18     }
    19   }
    20   if (pushed) { // 已经push block
    21 // 更新 offset
    22     Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
    23 // 如果已经push 到 BlockManager 中,则不会再保留 block和topic-partition-> offset的映射关系
    24     blockOffsetMap.remove(blockId)
    25   } else {
    26     stop("Error while storing block into Spark", exception)
    27   }
    28 }
    29 // 其中,commitOffset源码如下:
    30 /**
    31  * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
    32  * metadata schema in Zookeeper.
    33  */
    34 private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
    35   if (zkClient == null) {
    36     val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
    37     stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
    38     return
    39   }
    40 
    41   for ((topicAndPart, offset) <- offsetMap) {
    42     try {
    43 // 获取在 zk 中 comsumer 的partition的目录
    44       val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
    45       val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
    46       // 更新 consumer 的已消费topic-partition 的offset 操作
    47       ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
    48     } catch {
    49       case e: Exception =>
    50         logWarning(s"Exception during commit offset $offset for topic" +
    51           s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
    52     }
    53 
    54     logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
    55       s"partition ${topicAndPart.partition}")
    56   }
    57 }

    关键方法store 如下:

    1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
    2 def store(dataBuffer: ArrayBuffer[T]) {
    3   supervisor.pushArrayBuffer(dataBuffer, None, None)
    4 }

    其调用了supervisor(org.apache.spark.streaming.receiver.ReceiverSupervisorImpl实例)的pushArrayBuffer方法,内部操作如下:

    1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
    2 def pushArrayBuffer(
    3     arrayBuffer: ArrayBuffer[_],
    4     metadataOption: Option[Any],
    5     blockIdOption: Option[StreamBlockId]
    6   ) {
    7   pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)
    8 }

    org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 源码如下:

     1 /** Store block and report it to driver */
     2 def pushAndReportBlock(
     3     receivedBlock: ReceivedBlock,
     4     metadataOption: Option[Any],
     5     blockIdOption: Option[StreamBlockId]
     6   ) {
     7 // 1.准备blockId,time等信息
     8   val blockId = blockIdOption.getOrElse(nextBlockId)
     9   val time = System.currentTimeMillis
    10 // 2. 执行存储 block 操作
    11   val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
    12   logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
    13 // 3. 获取保存的message 的记录数
    14   val numRecords = blockStoreResult.numRecords
    15 // 4. 通知trackerEndpoint已经添加block,执行更新driver 的WAL操作
    16   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    17   trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    18   logDebug(s"Reported block $blockId")
    19 }

    其中,receivedBlockHandler 的赋值语句如下:

     1 private val receivedBlockHandler: ReceivedBlockHandler = {
     2   if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
     3     if (checkpointDirOption.isEmpty) {
     4       throw new SparkException(
     5         "Cannot enable receiver write-ahead log without checkpoint directory set. " +
     6           "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
     7           "See documentation for more details.")
     8     }
     9 // enable WAL并且checkpoint dir 不为空,即,在这里,返回WriteAheadLogBasedBlockHandler 对象,这个对象持有了 blockmanager,streamid,storagelevel,conf,checkpointdir 等信息
    10     new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId,
    11       receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    12   } else {
    13     new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    14   }
    15 }

    ReceivedBlockHandler 的 storeBlock方法源码如下:

     1 /**
     2  * This implementation stores the block into the block manager as well as a write ahead log.
     3  * It does this in parallel, using Scala Futures, and returns only after the block has
     4  * been stored in both places.
     5  */
     6 // 并行地将block 存入 blockmanager 和 write ahead log,使用scala 的Future 机制实现的,当两个都写完毕之后,返回。
     7 def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {
     8 
     9   var numRecords = None: Option[Long]
    10   // Serialize the block so that it can be inserted into both
    11 // 1. 将ReceivedBlock序列化(未使用压缩机制)成字节数组
    12   val serializedBlock = block match { // serializedBlock 就是序列化后的结果
    13     case ArrayBufferBlock(arrayBuffer) => // go this branch
    14       numRecords = Some(arrayBuffer.size.toLong)
    15       blockManager.dataSerialize(blockId, arrayBuffer.iterator)
    16     case IteratorBlock(iterator) =>
    17       val countIterator = new CountingIterator(iterator)
    18       val serializedBlock = blockManager.dataSerialize(blockId, countIterator)
    19       numRecords = countIterator.count
    20       serializedBlock
    21     case ByteBufferBlock(byteBuffer) =>
    22       byteBuffer
    23     case _ =>
    24       throw new Exception(s"Could not push $blockId to block manager, unexpected block type")
    25   }
    26 
    27   // 2. Store the block in block manager
    28   val storeInBlockManagerFuture = Future {
    29     val putResult =
    30       blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true)
    31     if (!putResult.map { _._1 }.contains(blockId)) {
    32       throw new SparkException(
    33         s"Could not store $blockId to block manager with storage level $storageLevel")
    34     }
    35   }
    36 
    37   // 3. Store the block in write ahead log
    38   val storeInWriteAheadLogFuture = Future {
    39     writeAheadLog.write(serializedBlock, clock.getTimeMillis())
    40   }
    41 
    42   // 4. Combine the futures, wait for both to complete, and return the write ahead log record handle
    43   val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
    44 // 等待future任务结果返回。默认时间是 30s, 使用spark.streaming.receiver.blockStoreTimeout 参数来变更默认值
    45   val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
    46   // 返回cache之后的block 相关信息
    47 WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle)
    48 }

    将WAL的block信息发送给driver

    注意WriteAheadLogBasedStoreResult 这个 WriteAheadLogBasedStoreResult 实例,后面 RDD 在处理的时候会使用到。
    org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 通知driver addBlock 的源码如下:

    1 // 4. 通知trackerEndpoint已经添加block,执行更新driver 的WAL操作
    2   val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
    3   trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
    4   logDebug(s"Reported block $blockId")

    Driver将WAL block数据写入到 driver 的WAL中

    跳过中间的RPC操作,直接到 driver 端org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receiveAndReply 中:

     1 case AddBlock(receivedBlockInfo) =>
     2   if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
     3     walBatchingThreadPool.execute(new Runnable {
     4       override def run(): Unit = Utils.tryLogNonFatalError {
     5         if (active) {
     6           context.reply(addBlock(receivedBlockInfo))
     7         } else {
     8           throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
     9         }
    10       }
    11     })
    12   } else {
    13     context.reply(addBlock(receivedBlockInfo))
    14   }

    其中 addBlock方法源码如下:

    1 /** Add new blocks for the given stream */
    2 private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    3   receivedBlockTracker.addBlock(receivedBlockInfo)
    4 }

    其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#addBlock 源码如下:

     1 /** Add received block. This event will get written to the write ahead log (if enabled). */
     2 def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
     3   try {
     4     val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
     5     if (writeResult) {
     6       synchronized {
     7         getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
     8       }
     9       logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
    10         s"block ${receivedBlockInfo.blockStoreResult.blockId}")
    11     } else {
    12       logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
    13         s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
    14     }
    15     writeResult
    16   } catch {
    17     case NonFatal(e) =>
    18       logError(s"Error adding block $receivedBlockInfo", e)
    19       false
    20   }
    21 }
    22 /** Write an update to the tracker to the write ahead log */
    23 private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
    24   if (isWriteAheadLogEnabled) {
    25     logTrace(s"Writing record: $record")
    26     try {
    27       writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)),
    28         clock.getTimeMillis())
    29       true
    30     } catch {
    31       case NonFatal(e) =>
    32         logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e)
    33         false
    34     }
    35   } else {
    36     true
    37   }
    38 }
    39 /** Get the queue of received blocks belonging to a particular stream */
    40 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    41   streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
    42 }

    上述代码,主要是将BlockAdditionEvent写WAL和更新队列(其实就是mutable.HashMap[Int, ReceivedBlockQueue]),这个队列中存放的是streamId ->UnallocatedBlock 的映射关系

    从WAL RDD 中读取数据

    createStream 源码如下:

     1 /**
     2  * Create an input stream that pulls messages from Kafka Brokers.
     3  * @param ssc         StreamingContext object
     4  * @param kafkaParams Map of kafka configuration parameters,
     5  *                    see http://kafka.apache.org/08/configuration.html
     6  * @param topics      Map of (topic_name -> numPartitions) to consume. Each partition is consumed
     7  *                    in its own thread.
     8  * @param storageLevel Storage level to use for storing the received objects
     9  * @tparam K type of Kafka message key
    10  * @tparam V type of Kafka message value
    11  * @tparam U type of Kafka message key decoder
    12  * @tparam T type of Kafka message value decoder
    13  * @return DStream of (Kafka message key, Kafka message value)
    14  */
    15 def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag](
    16     ssc: StreamingContext,
    17     kafkaParams: Map[String, String],
    18     topics: Map[String, Int],
    19     storageLevel: StorageLevel
    20   ): ReceiverInputDStream[(K, V)] = {
    21 // 可以通过设置spark.streaming.receiver.writeAheadLog.enable参数为 true来开启WAL
    22   val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf)
    23   new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
    24 }

    创建的是KafkaInputDStream对象:

     1 /**
     2  * Input stream that pulls messages from a Kafka Broker.
     3  *
     4  * @param kafkaParams Map of kafka configuration parameters.
     5  *                    See: http://kafka.apache.org/configuration.html
     6  * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
     7  * in its own thread.
     8  * @param storageLevel RDD storage level.
     9  */
    10 private[streaming]
    11 class KafkaInputDStream[
    12   K: ClassTag,
    13   V: ClassTag,
    14   U <: Decoder[_]: ClassTag,
    15   T <: Decoder[_]: ClassTag](
    16     ssc_ : StreamingContext,
    17     kafkaParams: Map[String, String],
    18     topics: Map[String, Int],
    19     useReliableReceiver: Boolean,
    20     storageLevel: StorageLevel
    21   ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
    22 
    23   def getReceiver(): Receiver[(K, V)] = {
    24     if (!useReliableReceiver) { // 未启用 WAL,会使用 KafkaReceiver 对象
    25       new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    26     } else { // 如果启用了WAL, 使用ReliableKafkaReceiver
    27       new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
    28     }
    29   }
    30 }

    org.apache.spark.streaming.kafka.KafkaInputDStream 继承父类的 compute方法:

     1 /**
     2  * Generates RDDs with blocks received by the receiver of this stream. */
     3 override def compute(validTime: Time): Option[RDD[T]] = {
     4   val blockRDD = {
     5 
     6     if (validTime < graph.startTime) {
     7       // If this is called for any time before the start time of the context,
     8       // then this returns an empty RDD. This may happen when recovering from a
     9       // driver failure without any write ahead log to recover pre-failure data.
    10       new BlockRDD[T](ssc.sc, Array.empty)
    11     } else {
    12       // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
    13       // for this batch
    14       val receiverTracker = ssc.scheduler.receiverTracker
    15       val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
    16 
    17       // Register the input blocks information into InputInfoTracker
    18       val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
    19       ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    20 
    21       // Create the BlockRDD
    22       createBlockRDD(validTime, blockInfos)
    23     }
    24   }
    25   Some(blockRDD)
    26 }

    getBlocksOfBatch 如下:

    1 /** Get the blocks for the given batch and all input streams. */
    2 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = {
    3   receivedBlockTracker.getBlocksOfBatch(batchTime)
    4 }
    5 调用:
    6 /** Get the blocks allocated to the given batch. */
    7 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized {
    8   timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty)
    9 }

    JobGenerator将WAL block 分配给一个batch,并生成job

    取出WAL block 信息

    在 org.apache.spark.streaming.scheduler.JobGenerator 中声明了一个定时器:

    1 // timer 会按照批次间隔 生成 GenerateJobs 任务,并放入eventLoop 堵塞队列中
    2 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    3   longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

    EventLoop 实例化代码如下:

    1 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    2   override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
    3 
    4   override protected def onError(e: Throwable): Unit = {
    5     jobScheduler.reportError("Error in job generator", e)
    6   }
    7 }
    8 eventLoop.start()

    EventLoop里定义了一个LinkedBlockingDeque双端堵塞队列和一个执行daemon线程,daemon线程会不停从 双端堵塞队列中堵塞式取数据,一旦取到数据,会调 onReceive 方法,即 processEvent 方法:

     1 /** Processes all events */
     2 private def processEvent(event: JobGeneratorEvent) {
     3   logDebug("Got event " + event)
     4   event match {
     5     case GenerateJobs(time) => generateJobs(time)
     6     case ClearMetadata(time) => clearMetadata(time)
     7     case DoCheckpoint(time, clearCheckpointDataLater) =>
     8       doCheckpoint(time, clearCheckpointDataLater)
     9     case ClearCheckpointData(time) => clearCheckpointData(time)
    10   }
    11 }

    由于是GenerateJobs 事件, 会继续调用generateJobs 方法:

     1 /** Generate jobs and perform checkpoint for the given `time`.  */
     2 private def generateJobs(time: Time) {
     3   // Set the SparkEnv in this thread, so that job generation code can access the environment
     4   // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
     5   // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
     6   SparkEnv.set(ssc.env)
     7   Try {
     8 // 1. 将 WAL block 信息 分配给batch(这些数据块信息是worker 节点cache 到WAL 之后发送给driver 端的)
     9     jobScheduler.receiverTracker.allocateBlocksToBatch(time)
    10 // 2. 使用分配的block数据块来生成任务
    11     graph.generateJobs(time) // generate jobs using allocated block
    12   } match {
    13     case Success(jobs) =>
    14       val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
    15       jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    16     case Failure(e) =>
    17       jobScheduler.reportError("Error generating jobs for time " + time, e)
    18   }
    19 // 发布DoCheckpoint 事件,保存checkpoint操作,主要是将新的checkpoint 数据写入到 hdfs, 删除旧的 checkpoint 数据
    20   eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
    21 }

    第一步中调用的
    org.apache.spark.streaming.scheduler.ReceiverTracker#allocateBlocksToBatch方法如下:

    1 /** Allocate all unallocated blocks to the given batch. */
    2 def allocateBlocksToBatch(batchTime: Time): Unit = {
    3   if (receiverInputStreams.nonEmpty) {
    4     receivedBlockTracker.allocateBlocksToBatch(batchTime)
    5   }
    6 }

    其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#allocateBlocksToBatch 方法如下:

     1 def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
     2   if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
     3 // 遍历输入流,根据流的 streamId 获取未被分配的block队列,并返回[streamId, seq[receivedBlockInfo]],由此可知,到此为止,数据其实已经从receiver中读出来了。
     4    // 获取 streamid和 WAL的blocks 的映射关系
     5 val streamIdToBlocks = streamIds.map { streamId =>
     6         (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
     7     }.toMap
     8     val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
     9     if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
    10       timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
    11       lastAllocatedBatchTime = batchTime
    12     } else {
    13       logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    14     }
    15   } else {
    16     // This situation occurs when:
    17     // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
    18     // possibly processed batch job or half-processed batch job need to be processed again,
    19     // so the batchTime will be equal to lastAllocatedBatchTime.
    20     // 2. Slow checkpointing makes recovered batch time older than WAL recovered
    21     // lastAllocatedBatchTime.
    22     // This situation will only occurs in recovery time.
    23     logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery")
    24   }
    25 }

    其中,getReceivedBlockQueue的源码如下:

    1 /** Get the queue of received blocks belonging to a particular stream */
    2 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    3   streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
    4 }

    可以看到,worker node 发送过来的block 数据被取出来了。

    根据WAL block创建 RDD

    org.apache.spark.streaming.dstream.ReceiverInputDStream#createBlockRDD 源码如下:

     1 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
     2 
     3   if (blockInfos.nonEmpty) {
     4     val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
     5    // 所有的block已经有了WriteAheadLogRecordHandle, 创建一个WALBackedBlockRDD即可, 否则创建BlockRDD。
     6 // 其中,WriteAheadLogRecordHandle 是一个跟WAL 相关联的EntryInfo,实现类FileBasedWriteAheadLogSegment就包含了WAL segment 的path, offset 以及 length 信息。RDD 在真正需要数据时,根据这些handle信息从 WAL 中读取数据。
     7     // Are WAL record handles present with all the blocks
     8     val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
     9 
    10     if (areWALRecordHandlesPresent) {
    11       // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
    12       val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
    13       val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
    14       new WriteAheadLogBackedBlockRDD[T](
    15         ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
    16     } else {
    17       // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
    18       // others then that is unexpected and log a warning accordingly.
    19       if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
    20         if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
    21           logError("Some blocks do not have Write Ahead Log information; " +
    22             "this is unexpected and data may not be recoverable after driver failures")
    23         } else {
    24           logWarning("Some blocks have Write Ahead Log information; this is unexpected")
    25         }
    26       }
    27       val validBlockIds = blockIds.filter { id =>
    28         ssc.sparkContext.env.blockManager.master.contains(id)
    29       }
    30       if (validBlockIds.size != blockIds.size) {
    31         logWarning("Some blocks could not be recovered as they were not found in memory. " +
    32           "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
    33           "for more details.")
    34       }
    35       new BlockRDD[T](ssc.sc, validBlockIds)
    36     }
    37   } else {
    38     // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
    39     // according to the configuration
    40     if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
    41       new WriteAheadLogBackedBlockRDD[T](
    42         ssc.sparkContext, Array.empty, Array.empty, Array.empty)
    43     } else {
    44       new BlockRDD[T](ssc.sc, Array.empty)
    45     }
    46   }
    47 }

    org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD#compute 的源码如下:

     1 /**
     2  * Gets the partition data by getting the corresponding block from the block manager.
     3  * If the block does not exist, then the data is read from the corresponding record
     4  * in write ahead log files.
     5  */
     6 override def compute(split: Partition, context: TaskContext): Iterator[T] = {
     7   assertValid()
     8   val hadoopConf = broadcastedHadoopConf.value
     9   val blockManager = SparkEnv.get.blockManager
    10   val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
    11   val blockId = partition.blockId
    12 
    13   def getBlockFromBlockManager(): Option[Iterator[T]] = {
    14     blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
    15   }
    16 
    17   def getBlockFromWriteAheadLog(): Iterator[T] = {
    18     var dataRead: ByteBuffer = null
    19     var writeAheadLog: WriteAheadLog = null
    20     try {
    21       // The WriteAheadLogUtils.createLog*** method needs a directory to create a
    22       // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
    23       // writing log data. However, the directory is not needed if data needs to be read, hence
    24       // a dummy path is provided to satisfy the method parameter requirements.
    25       // FileBasedWriteAheadLog will not create any file or directory at that path.
    26       // FileBasedWriteAheadLog will not create any file or directory at that path. Also,
    27       // this dummy directory should not already exist otherwise the WAL will try to recover
    28       // past events from the directory and throw errors.
    29       val nonExistentDirectory = new File(
    30         System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
    31       writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
    32         SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
    33       dataRead = writeAheadLog.read(partition.walRecordHandle)
    34     } catch {
    35       case NonFatal(e) =>
    36         throw new SparkException(
    37           s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
    38     } finally {
    39       if (writeAheadLog != null) {
    40         writeAheadLog.close()
    41         writeAheadLog = null
    42       }
    43     }
    44     if (dataRead == null) {
    45       throw new SparkException(
    46         s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
    47           s"read returned null")
    48     }
    49     logInfo(s"Read partition data of $this from write ahead log, record handle " +
    50       partition.walRecordHandle)
    51     if (storeInBlockManager) {
    52       blockManager.putBytes(blockId, dataRead, storageLevel)
    53       logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
    54       dataRead.rewind()
    55     }
    56     blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
    57   }
    58  // 如果partition.isBlockIdValid 为true,则说明该 block 数据存在executors 中
    59   if (partition.isBlockIdValid) {
    60 // 先根据 BlockManager从 executor中读取数据, 如果没有,再从WAL 中读取数据
    61 // BlockManager 从内存还是从磁盘上获取的数据 ?
    62 blockManager 从 local 或 remote 获取 block,其中 local既可以从 memory 中获取也可以从 磁盘中读取, 其中remote获取数据是同步的,即在fetch block 过程中会一直blocking。
    63     getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
    64   } else {
    65     getBlockFromWriteAheadLog()
    66   }
    67 }

    至此,从启动 receiver,到receiver 接收数据并保存到WAL block,driver 接收WAL 的block 信息,直到spark streaming 通过WAL RDD 来获取数据等等都一一做了说明。

  • 相关阅读:
    Core Animation Programming Guide
    Core Animation Programming Guide
    Core Animation Programming Guide
    Core Animation Programming Guide
    Core Animation Programming Guide
    Core Animation Programming Guide
    UIScrollView_滚动视图
    Core Animation之基础介绍
    CORE ANIMATION的学习备忘录
    UIWindow & UIWindowLevel笔记
  • 原文地址:https://www.cnblogs.com/johnny666888/p/11100334.html
Copyright © 2011-2022 走看看