zoukankan      html  css  js  c++  java
  • Kafka 0.8源码分析—ZookeeperConsumerConnector

    1.HighLevelApi

    High Level Api是多线程的应用程序,以Topic的Partition数量为中心。消费的规则如下:

    • 一个partition只能被同一个ConsumersGroup的一个线程所消费.
    • 线程数小于partition数,某些线程会消费多个partition.
    • 线程数等于partition数,一个线程正好消费一个线程.
    • 当添加消费者线程时,会触发rebalance,partition的分配发送变化.
    • 同一个partition的offset保证消费有序,不同的partition消费不保证顺序.

    image

    关于与ZK的几个参数意思解释

    • zookeeper.connect: ZK连接。
    • group.id: Consumer消费ID。
    • zookeeper.session.timeout.ms: kafka节点与ZK会话的超时时间。
    • zookeeper.sync.time.ms: zk的follower与leader的同步时间间隔。
    • auto.commit.interval.ms: Consumer offset自动提交给Zookeeper的时间。

    Note that since the commit frequency is time based instead of # of messages consumed, if an error occurs between updates to ZooKeeper on restart you will get replayed messages.(由于记录Offset是基于时间的,所以当Consumer发生错误的时候,有可能会收到重复的消息。)

    消费者的代码

    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(threads));
    
    Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap);
    
    List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic);
    // now launch all the threads
    executor = Executors.newFixedThreadPool(threads);
    
    for (KafkaStream<byte[], byte[]> stream : streams) {
        executor.submit(new KafkaConsumerWorker(stream));
    }
    

    2.ZookeeperConsumerConnector

    一个Consumer会创建一个ZookeeperConsumerConnector,代表一个消费者进程.

    • fetcher: 消费者获取数据, 使用ConsumerFetcherManager fetcher线程抓取数据
    • zkUtils: 消费者要和ZK通信, 除了注册自己,还有其他信息也会写到ZK中
    • topicThreadIdAndQueues: 消费者会指定自己消费哪些topic,并指定线程数, 所以topicThreadId都对应一个队列
    • messageStreamCreated: 消费者会创建消息流, 每个队列都对应一个消息流
    • offsetsChannel: offset可以存储在ZK或者kafka中,如果存在kafka里,像其他请求一样,需要和Broker通信
    • 还有其他几个Listener监听器,分别用于topicPartition的更新,负载均衡,消费者重新负载等

    当Broker挂掉的时候,在这个Broker上的所有Partition都丢失了,而Partition是给消费者服务的.
    所以Broker挂掉后在做迁移的时候,会将其上的Partition转移到其他Broker上,因此消费者要消费的Partition也跟着变化.

    2.1 init

    在创建ZookeeperConsumerConnector时,有几个初始化方法需要事先执行.

    • 消费者要和ZK通信,所以connectZk会确保连接上ZooKeeper
    • 消费者要消费数据,需要有抓取线程,所有的抓取线程交给ConsumerFetcherManager统一管理
    • 由消费者客户端自己保存offset,而消费者会消费多个topic的多个partition.
    • 多个partition的offset管理类OffsetManager是一个GroupCoordinator
    • 定时提交线程会使用OffsetManager建立的通道定时提交offset到zk或者kafka.
      image

    2.2 createMessageStreams

    ConsumerConnector创建消息流,需要指定解码器,因为要将日志反序列化(生产者写消息时对消息序列化到日志文件).

    在kafka的运行过程中,会有其他的线程将数据放入partition对应的queue中. 而queue是用于KafkaStream的.
    一旦数据添加到queue后,KafkaStream的阻塞队列就有数据了,消费者就可以从队列中消费消息.

    • createMessageStreams: 返回KafkaStream, 每个Topic都对应了多个KafkaStream. 数量和topicCount中的count一样.
    例子解释

    假设消费者C1声明了topic1:2, topic2:3. topicThreadIds=consumerThreadIdsPerTopicMap.
    topicThreadIds.values = [ (C1_1,C1_2), (C1_1,C1_2,C1_3)]一共有5个线程,queuesAndStreams也有5个元素.

    consumerThreadIdsPerTopicMap = {
        topic1: [C1_1, C1_2],
        topic2: [C1_1, C1_2, C1_3]
    }
    topicThreadIds.values = [
        [C1_1, C1_2],
        [C1_1, C1_2, C1_3]
    ]
    threadIdSet循环[C1_1, C1_2]时, 生成两个queue->stream pair. 
    threadIdSet循环[C1_1, C1_2, C1_3]时, 生成三个queue->stream pair. 
    queuesAndStreams = [
        (LinkedBlockingQueue_1,KafkaStream_1),      //topic1:C1_1
        (LinkedBlockingQueue_2,KafkaStream_2),      //topic1:C1_2
        (LinkedBlockingQueue_3,KafkaStream_3),      //topic2:C1_1
        (LinkedBlockingQueue_4,KafkaStream_4),      //topic2:C1_2
        (LinkedBlockingQueue_5,KafkaStream_5),      //topic2:C1_3
    ]
    
    • 客户端关注的是我的每个线程都对应了一个队列,每个队列都是一个消息流就可以了.
    • 客户端的每个线程实际上是针对Partition级别的,一个线程对应一个或多个partition。

    2.3 registerConsumerInZK

    消费者需要向ZK注册一个临时节点,路径为:/consumers/[group_id]/ids/[consumer_id],内容为订阅的topic.

    问题:什么时候这个节点会被删除掉呢? Consumer进程挂掉时,或者Session失效时删除临时节点. 重连时会重新创建.
    由于是临时节点,一旦创建节点的这个进程挂掉了,临时节点就会自动被删除掉. 这是由zk机制决定的,不是由消费者完成的.

    2.4 reinitializeConsumer listener

    当前Consumer在ZK注册之后,需要重新初始化Consumer.对于全新的消费者,注册多个监听器,在zk的对应节点的注册事件发生时,会回调监听器的方法.

    • 将topic对应的消费者线程id及对应的LinkedBlockingQueue放入topicThreadIdAndQueues中,LinkedBlockingQueue是真正存放数据的queue
    1. 注册sessionExpirationListener,监听状态变化事件.在session失效重新创建session时调用
    2. 向/consumers/[group_id]/ids注册Child变更事件的loadBalancerListener,当消费组下的消费者发生变化时调用
    3. 向/brokers/topics/[topic]注册Data变更事件的topicPartitionChangeListener,在topic数据发生变化时调用
    • 显式调用loadBalancerListener.syncedRebalance(), 会调用reblance方法进行consumer的初始化工作
    private def reinitializeConsumer[K,V](topicCount: TopicCount, 
      queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {
      val dirs = new ZKGroupDirs(config.groupId)
      // ② listener to consumer and partition changes
      if (loadBalancerListener == null) {
        val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
        loadBalancerListener = new ZKRebalancerListener(config.groupId, consumerIdString, 
          topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
      }
      // ① create listener for session expired event if not exist yet
      if (sessionExpirationListener == null) sessionExpirationListener = 
        new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener)
      // ③ create listener for topic partition change event if not exist yet
      if (topicPartitionChangeListener == null) 
        topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)
    
      // listener to consumer and partition changes
      zkUtils.zkClient.subscribeStateChanges(sessionExpirationListener)
      zkUtils.zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)
      // register on broker partition path changes.
      topicStreamsMap.foreach { topicAndStreams => 
        zkUtils.zkClient.subscribeDataChanges(BrokerTopicsPath+"/"+topicAndStreams._1, topicPartitionChangeListener)
      }
    
      // explicitly trigger load balancing for this consumer
      loadBalancerListener.syncedRebalance()
    }
    

    ZKRebalancerListener传入ZKSessionExpireListener和ZKTopicPartitionChangeListener.它们都会使用ZKRebalancerListener完成自己的工作.

    2.5 ZKSessionExpireListener

    当Session失效时,新的会话建立时,立即进行rebalance操作.

    2.6 ZKTopicPartitionChangeListener

    当topic的数据变化时,通过触发的方式启动rebalance操作.

    2.7 ZKRebalancerListener watcher

    image

    image

    class ZKRebalancerListener(val group: String, val consumerIdString: String,
                               val kafkaMessageAndMetadataStreams: mutable.Map[String,List[KafkaStream[_,_]]])
      extends IZkChildListener {
      private var isWatcherTriggered = false
      private val lock = new ReentrantLock
      private val cond = lock.newCondition()
    
      private val watcherExecutorThread = new Thread(consumerIdString + "_watcher_executor") {
        override def run() {
          var doRebalance = false
          while (!isShuttingDown.get) {
              lock.lock()
              try {
                // 如果isWatcherTriggered=false,则不会触发syncedRebalance. 等待1秒后,继续判断
                if (!isWatcherTriggered)
                  cond.await(1000, TimeUnit.MILLISECONDS) // wake up periodically so that it can check the shutdown flag
              } finally {
                // 不管isWatcherTriggered值是多少,在每次循环时,都会执行. 如果isWatcherTriggered=true,则会执行syncedRebalance
                doRebalance = isWatcherTriggered
                // 重新设置isWatcherTriggered=false, 因为其他线程触发一次后就失效了,想要再次触发,必须再次设置isWatcherTriggered=true
                isWatcherTriggered = false
                lock.unlock()
              }
              if (doRebalance) syncedRebalance        // 只有每次rebalanceEventTriggered时,才会调用一次syncedRebalance
          }
        }
      }
      watcherExecutorThread.start()
    
      // 触发rebalance开始进行, 修改isWatcherTriggered标志位,触发cond条件运行
      def rebalanceEventTriggered() {
        inLock(lock) {
          isWatcherTriggered = true
          cond.signalAll()
        }
      }
    

    3.ZKRebalancerListener rebalance

    因为消费者加入/退出时,消费组的成员会发生变化,而消费组中的所有存活消费者负责消费可用的partitions.
    可用的partitions或者消费组中的消费者成员一旦发生变化,都要重新分配partition给存活的消费者.下面是一个示例.

    当然分配partition的工作绝不仅仅是这么简单的,还要处理与之相关的线程,并重建必要的数据:

    1. 关闭数据抓取线程,获取之前为topic设置的存放数据的queue并清空该queue
    2. 释放partition的ownership,删除partition和consumer的对应关系
    3. 为各个partition重新分配threadid
      获取partition最新的offset并重新初始化新的PartitionTopicInfo(queue存放数据,两个offset为partition最新的offset)
    4. 重新将partition对应的新的consumer信息写入zookeeper
    5. 重新创建partition的fetcher线程

    image

    private def rebalance(cluster: Cluster): Boolean = {
      val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, 
        zkUtils, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
      val brokers = zkUtils.getAllBrokersInCluster()
      if (brokers.size == 0) {
        zkUtils.zkClient.subscribeChildChanges(BrokerIdsPath, loadBalancerListener)
        true
      } else {
        // ① 停止fetcher线程防止数据重复.如果当前调整失败了,被释放的partitions可能被其他消费者拥有.
        // 而没有先停止fetcher的话,原先的消费者仍然会和新的拥有者共同消费同一份数据.  
        closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)
        // ② 释放topicRegistry中topic-partition的owner
        releasePartitionOwnership(topicRegistry)
        // ③ 为partition重新分配消费者....
        // ④ 为partition添加consumer owner
        if(reflectPartitionOwnershipDecision(partitionAssignment)) {
            allTopicsOwnedPartitionsCount = partitionAssignment.size
            topicRegistry = currentTopicRegistry
            // ⑤ 创建拉取线程
            updateFetcher(cluster)
            true
        }
      }
    }
    

    rebalance操作涉及了以下内容:

    • PartitionOwnership: Partition的所有者(ownership)的删除和重建
    • AssignmentContext: 分配信息上下文
    • PartitionAssignor: 为Partition分配Consumer的算法
    • PartitionAssignment: Partition分配之后的上下文
    • PartitionTopicInfo: Partition的最终信息
    • Fetcher: 完成了rebalance,消费者就可以重新开始抓取数据

    3.1 核心:PartitionAssignor

    将可用的partitions以及消费者线程排序, 将partitions处于线程数,表示每个线程(不是消费者数量)平均可以分到几个partition.

    如果除不尽,剩余的会分给前面几个消费者线程. 比如有两个消费者,每个都是两个线程,一共有5个可用的partitions: (p0-p4).

    每个消费者线程(一共四个线程)可以获取到至少一共partition(5/4=1),剩余一个(5%4=1)partition分给第一个线程.
    最后的分配结果为: p0 -> C1-0, p1 -> C1-0, p2 -> C1-1, p3 -> C2-0, p4 -> C2-1

    image

    关闭Fetcher时要注意:

    • 先提交offset,然后才停止消费者. 因为在停止消费者的时候当前的数据块中还会有点残留数据.
    • 因为这时候还没有释放partiton的ownership(即partition还归当前consumer所有),强制提交offset,
      这样拥有这个partition的下一个消费者线程(rebalance后),就可以使用已经提交的offset了,确保不中断.
    • 因为fetcher线程已经关闭了(stopConnections),这是消费者能得到的最后一个数据块,以后不会有了,直到平衡结束,fetcher重新开始
    1. topicThreadIdAndQueues来自于topicThreadIds,所以它的topic应该都在relevantTopicThreadIdsMap的topics中.
      为什么还要过滤呢? 注释中说到在本次平衡之后,只需要清理可能不再属于这个消费者的队列(部分的topicPartition抓取队列).

    2. 问题:新创建的ZKRebalancerListener中kafkaMessageAndMetadataStreams(即这里的messageStreams)为空的Map.
      如何清空里面的数据? 实际上KafkaStream只是一个迭代器,在运行过程中会有数据放入到这个流中,这样流就有数据了.

    4.ConsumerFetcherManager

    Fetcher线程要抓取数据关心的是PartitionTopicInfo,首先要找出Partition Leader(因为只向Leader Partition发起抓取请求).
    初始时假设所有topicInfos(PartitionTopicInfo)都找不到Leader,即同时加入partitionMap和noLeaderPartitionSet.
    在LeaderFinderThread线程中如果找到Leader,则从noLeaderPartitionSet中移除.

    ConsumerFetcherManager管理了当前Consumer的所有Fetcher线程.

    image

    5.小结

    high level的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。
    每个Consumer被创建时会触发Consumer Group的Rebalance,具体的启动流程是:

    1. (High Level)Consumer启动时将其ID注册到其Consumer Group下 (registerConsumerInZK)
    2. 在/consumers/[group_id]/ids上和/brokers/ids上分别注册Watch (reinitializeConsumer->Listener)
    3. 强制自己在其Consumer Group内启动Rebalance流程 (ZKRebalancerListener.rebalance)

    在这种策略下,每一个Consumer或者Broker的增加或者减少都会触发Consumer Rebalance。
    因为每个Consumer只负责调整自己所消费的Partition,为了保证整个Consumer Group的一致性,
    当一个Consumer触发了Rebalance时,该Consumer Group内的其它所有其它Consumer也应该同时触发Rebalance。

    该方式有如下缺陷:

    • Herd effect(羊群效应): 任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance
    • Split Brain(脑裂): 每个Consumer分别单独通过Zookeeper判断哪些Broker和Consumer 宕机了,
      那么不同Consumer在同一时刻从Zookeeper“看”到的View就可能不一样,这是由Zookeeper的特性决定的,这就会造成不正确的Reblance尝试。
    • 调整结果不可控: 所有的Consumer都并不知道其它Consumer的Rebalance是否成功,这可能会导致Kafka工作在一个不正确的状态。

    根据Kafka官方文档,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(coordinator)。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有partition或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功.

    参考这篇博文,将这个类好好总结一下。
    涉及到元数据信息的不一致问题,还有rebalance的问题。

  • 相关阅读:
    js字符串拼接 ·${}·
    [转]CRLF Injection
    域渗透-GPP(组策略)利用
    AS-REPRoasting
    域用户名枚举
    我理解的HTTP请求走私(HTTP Request Smuggling)
    NFS未授权访问
    Hessian反序列化RCE漏洞
    CVE-2020-15778 Openssh命令注入漏洞复现
    在非域内机器上运行harphound
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/6372114.html
Copyright © 2011-2022 走看看