zoukankan      html  css  js  c++  java
  • Kafka 之 async producer (2) kafka.producer.async.DefaultEventHandler

    上次留下来的问题

    1. 如果消息是发给很多不同的topic的, async producer如何在按batch发送的同时区分topic的
    2. 它是如何用key来做partition的?
    3. 是如何实现对消息成批量的压缩的?
    • async producer如何在按batch发送的同时区分topic的

      这个问题的答案是: DefaultEventHandler会把发给它的一个batch的消息(实际上是Seq[KeyedMessage[K,V]]类型)拆开,确定每条消息该发送给哪个broker。对发给每个broker的消息,会按topic和partition来组合。即:拆包=>根据metaData组装

    这个功能是通过partitionAndCollate方法实现的

    def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]]
    

      它返回一个Option对象,这个Option的元素是一个Map,Key是brokerId,value是发给这个broker的消息。对每一条消息,先确定它要被发给哪一个topic的哪个parition。然后确定这个parition的leader broker,然后去Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]这个Map里找到对应的broker,然后把这条消息填充给对应的topic+partition对应的Seq[KeyedMessage[K,Message]]。这样就得到了最后的结果。这个结果表示了哪些消息要以怎样的结构发给一个broker。真正发送的时候,会按照brokerId的不同,把打包好的消息发给不同的broker。

    首先,看一下kafka protocol里对于Producer Request结构的说明:

    ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
      RequiredAcks => int16
      Timeout => int32
      Partition => int32
      MessageSetSize => int32

    发给一个broker的消息就是这样的结构。

    同时,在kafka wiki里对于Produce API 有如下说明:

    The produce API is used to send message sets to the server. For efficiency it allows sending message sets intended for many topic partitions in a single request.

    即在一个produce request里,可以同时发消息给多个topic+partition的组合。当然一个produce request是发给一个broker的。

    使用

    send(brokerid, messageSetPerBroker)
    

      把消息set发给对应的brokerid。

    • 它是如何用key来做partition的?

    首先看下KeyedMessage类的定义:

    case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
      if(topic == null)
        throw new IllegalArgumentException("Topic cannot be null.") 
      def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
      def this(topic: String, key: K, message: V) = this(topic, key, key, message)
      def partitionKey = {
        if(partKey != null)
          partKey
        else if(hasKey)
          key
        else
          null  
      }
      def hasKey = key != null
    }
    

      当使用三个参数的构造函数时, partKey会等于key。partKey是用来做partition的,但它不会最当成消息的一部分被存储。

    前边提到了,在确定一个消息应该发给哪个broker之前,要先确定它发给哪个partition,这样才能根据paritionId去找到对应的leader所在的broker。

    val topicPartitionsList = getPartitionListForTopic(message) //获取这个消息发送给的topic的partition信息
    val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)//确定这个消息发给哪个partition

      注意传给getPartition方法中时使用的是partKey。getPartition方法为:

      private def getPartition(topic: String, key: Any, topicPartitionList: Seq[PartitionAndLeader]): Int = {
        val numPartitions = topicPartitionList.size
        if(numPartitions <= 0)
          throw new UnknownTopicOrPartitionException("Topic " + topic + " doesn't exist")
        val partition =
          if(key == null) {
            // If the key is null, we don't really need a partitioner
            // So we look up in the send partition cache for the topic to decide the target partition
            val id = sendPartitionPerTopicCache.get(topic)
            id match {
              case Some(partitionId) =>
                // directly return the partitionId without checking availability of the leader,
                // since we want to postpone the failure until the send operation anyways
                partitionId
              case None =>
                val availablePartitions = topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
                if (availablePartitions.isEmpty)
                  throw new LeaderNotAvailableException("No leader for any partition in topic " + topic)
                val index = Utils.abs(Random.nextInt) % availablePartitions.size
                val partitionId = availablePartitions(index).partitionId
                sendPartitionPerTopicCache.put(topic, partitionId)
                partitionId
            }
          } else
            partitioner.partition(key, numPartitions)
    

      当partKey为null时,首先它从sendParitionPerTopicCache里取这个topic缓存的partitionId,这个cache是一个Map.如果之前己经使用sendPartitionPerTopicCache.put(topic, partitionId)缓存了一个,就直接取出它。否则就随机从可用的partitionId里取出一个,把它缓存到sendParitionPerTopicCache。这就使得当sendParitionPerTopicCache里有一个可用的partitionId时,很多消息都会被发送给这同一个partition。因此若所有消息的partKey都为空,在一段时间内只会有一个partition能收到消息。之所以会说“一段”时间,而不是永久,是因为handler隔一段时间会重新获取它发送过的消息对应的topic的metadata,这个参数通过topic.metadata.refresh.interval.ms来设置。当它重新获取metadata之后,会消空一些缓存,就包括这个sendParitionPerTopicCache。因此,接下来就会生成另一个随机的被缓存的partitionId。

      if (topicMetadataRefreshInterval >= 0 && 
              SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {  //若该refresh topic metadata 了,do the refresh
            Utils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
            sendPartitionPerTopicCache.clear()
            topicMetadataToRefresh.clear
            lastTopicMetadataRefreshTime = SystemTime.milliseconds
          }
    

      当partKey不为null时,就用传给handler的partitioner的partition方法,根据partKey和numPartitions来确定这个消息被发给哪个partition。注意这里的numPartition是topicPartitionList.size获取的,有可能会有parition不存在可用的leader。这样的问题将留给send时解决。实际上发生这种情况时,partitionAndCollate会将这个消息分派给brokerId为-1的broker。而send方法会在发送前判断brokerId

        if(brokerId < 0) {
          warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
          messagesPerTopic.keys.toSeq
    

      当brokerId<0时,就返回一个非空的Seq,包括了所有没有leader的topic+partition的组合,如果重试了指定次数还不能发送,将最终导致handle方法抛出一个 FailedToSendMessageException异常。

    • 是如何实现对消息成批量的压缩的?

    这个是在

    private def groupMessagesToSet(messagesPerTopicAndPartition: collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]])
    

    中处理。

    说明为:

    /** enforce the compressed.topics config here.
    * If the compression codec is anything other than NoCompressionCodec,
    * Enable compression only for specified topics if any
    * If the list of compressed topics is empty, then enable the specified compression codec for all topics
    * If the compression codec is NoCompressionCodec, compression is disabled for all topics
    */

    即,如果没有设置压缩,就所有topic对应的消息集都不压缩。如果设置了压缩,并且没有设置对个别topic启用压缩,就对所有topic都使用压缩;否则就只对设置了压缩的topic压缩。

    在这个gruopMessageToSet中,并不有具体的压缩逻辑。而是返回一个ByteBufferMessageSet对象。它的注释为:

    /**
    * A sequence of messages stored in a byte buffer
    *
    * There are two ways to create a ByteBufferMessageSet
    *
    * Option 1: From a ByteBuffer which already contains the serialized message set. Consumers will use this method.
    *
    * Option 2: Give it a list of messages along with instructions relating to serialization format. Producers will use this method.

     看来它是对于消息集进行序列化和反序列化的工具。

    在它的实现里用到了CompressionFactory对象。从它的实现里可以看到Kafka只支持GZIP和Snappy两种压缩方式。

    compressionCodec match {
          case DefaultCompressionCodec => new GZIPOutputStream(stream)
          case GZIPCompressionCodec => new GZIPOutputStream(stream)
          case SnappyCompressionCodec => 
            import org.xerial.snappy.SnappyOutputStream
            new SnappyOutputStream(stream)
          case _ =>
            throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
    

      

  • 相关阅读:
    函数柯里化
    常用正则
    校验table行内的form编辑
    前端代码 读取excel表格数据
    cocos2d-x 帧动画学习
    Linux 下vim配置
    Android开发笔记 二
    cocos2d-x CCDictionary类学习
    Android开发笔记
    Cococs2d-x移植到Window下的问题
  • 原文地址:https://www.cnblogs.com/devos/p/3632115.html
Copyright © 2011-2022 走看看