zoukankan      html  css  js  c++  java
  • Kafka 之 async producer (1)

    问题 

    1. 很多条消息是怎么打包在一起的?
    2. 如果消息是发给很多不同的topic的, async producer如何在按batch发送的同时区分topic的
    3. 它是如何用key来做partition的?
    4. 是如何实现对消息成批量的压缩的?

    async producer是将producer.type设为async时启用的producer

    此时,调用send方法的线程和实际完成消息发送的线程是分开的。

    当调用java API中producer的send方法时,最终会调用kafka.producer.Producer的send方法。在kafka.producer.Producer类中,会根据producer.type配置使用不同的方法发送消息。

    def send(messages: KeyedMessage[K,V]*) {
        lock synchronized {
          if (hasShutdown.get)
            throw new ProducerClosedException
          recordStats(messages)
          sync match {
            case true => eventHandler.handle(messages)
            case false => asyncSend(messages)
          }
        }
      }
    

      当async时,会使用asyncSend。asyncSend方法会根据“queue.enqueue.timeout.ms”配置选项采用BlockingQueue的put或offer方法把消息放入kafka.producer.Producer持有的一个LinkedBlockingQueue。一个ProducerSendThread线程从queue里取消息,成批量的用eventHandler来处理。

      当使用sync时,对每条消息会直接使用eventHandler来处理。这就是为什么前一种方式会被称为"asynchornization",而这一种会称为”synchronization"

      private val queue = new LinkedBlockingQueue[KeyedMessage[K,V]](config.queueBufferingMaxMessages)
    

      在kafka.producer.Producer构造时,会检查"producer.type“,如果是asnyc,就会开启一个送发线程。

      config.producerType match {
        case "sync" =>
        case "async" =>
          sync = false
          producerSendThread = new ProducerSendThread[K,V]("ProducerSendThread-" + config.clientId,
                                                           queue,
                                                           eventHandler,
                                                           config.queueBufferingMaxMs,
                                                           config.batchNumMessages,
                                                           config.clientId)
          producerSendThread.start()
    

      现在有了一个队列,一个发送线程 。看来这个ProducerSendThread是来完成大部分发送的工作,而"async"的特性都主要都是由它来实现。

       这个线程的run方法实现为:

      override def run {
        try {
          processEvents
        }catch {
          case e: Throwable => error("Error in sending events: ", e)
        }finally {
          shutdownLatch.countDown
        }
      }
    

      看来实际工作由processEvents方法来实现喽

      private def processEvents() {
        var lastSend = SystemTime.milliseconds //上一次发送的时间,每发送一次会更新
        var events = new ArrayBuffer[KeyedMessage[K,V]] //一起发送的消息的集合,发送完后也会更新
        var full: Boolean = false  //是否消息的数量已大于指定的batch大小(batch大小指多少消息在一起发送,由"batch.num.messages"确定)
    
        // drain the queue until you get a shutdown command
        //构造一个流,它的每个元素为queue.poll(timeout)取出来的值。
        //timeout的值是这么计算的:lastSend+queueTime表示下次发送的时间,再减去当前时间,就是最多还能等多长时间,也就是poll阻塞的最长时间
        //takeWhile接受的函数参数决定了当item是shutdownCommand时,流就结束了。这个shutdownCommand是shutdown()方法执行时,往队列里发的一个特殊消息
        Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
                          .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
          currentQueueItem => 										//对每一条处理的消息
            val elapsed = (SystemTime.milliseconds - lastSend)  //距上次发送已逝去的时间,只记录在debug里,并不会以它作为是否发送的条件
            // check if the queue time is reached. This happens when the poll method above returns after a timeout and
            // returns a null object
            val expired = currentQueueItem == null //当poll方法超时,就返回一个null,说明一定已经是时候发送这批消息了。当时间到了,poll(timeout)中timeout为负值时,poll一定返回null
            if(currentQueueItem != null) {
              trace("Dequeued item for topic %s, partition key: %s, data: %s"
                  .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
              events += currentQueueItem //如果当前消息不为空,就附加在发送集合里
            }
    
            // check if the batch size is reached
            full = events.size >= batchSize //是否当前发送集合的大小已经大于batch size
    
            if(full || expired) {  //如果发送集合有了足够多的消息或者按时间计可以发送了,就发送
              if(expired)
                debug(elapsed + " ms elapsed. Queue time reached. Sending..")
              if(full)
                debug("Batch full. Sending..")
              // if either queue time has reached or batch size has reached, dispatch to event handler
              tryToHandle(events)
              lastSend = SystemTime.milliseconds //更新lastSend,将一个新的ArrayBuffer的引用赋给events
              events = new ArrayBuffer[KeyedMessage[K,V]]
            }
        }
        // send the last batch of events
        tryToHandle(events) //当shutdownCommand遇到时,流会终结。此时之前的消息只要不是恰好发送完,就还会有一些在events里,做为最后一批发送。
        if(queue.size > 0) //些时producerSendThread已经不再发消息了,但是queue里若还有没发完的,就是一种异常情况
          throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
            .format(queue.size))
      }
    

      看来Scala的Stream帮了不少忙。shutdown方法将一个特殊的shutdownCommand发给queue,也正好使得这个Stream可以用takeWhile方法正确结束。

      好吧,搞了这么多,这个ProducerSendThread只有打包的逻辑 ,并没有处理topic、partition、压缩的逻辑,这些逻辑都在另一个类中。明天再来看看这个handler

  • 相关阅读:
    Linux内核参数信息(Oracle相关)
    Android和java平台 DES加密解密互通程序及其不能互通的原因
    [置顶] ※数据结构※→☆线性表结构(queue)☆============循环队列 顺序存储结构(queue circular sequence)(十)
    理解 Thread.Sleep 函数
    引导加载程序:GRUB
    xvfb 初步探究
    [置顶] 中国象棋程序的设计与实现(原始版)(包含源码)
    Ext图表的精彩
    JSTL解析——004——core标签库03
    Servlet API中文版
  • 原文地址:https://www.cnblogs.com/devos/p/3629190.html
Copyright © 2011-2022 走看看