zoukankan      html  css  js  c++  java
  • Kafka 0.8 Consumer处理逻辑

    0.前言

    客户端用法:

    kafka.javaapi.consumer.ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    
    // 决定一个topic启动几个线程去拉取数据,即生成几个KafkaStream;
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(threads));
    
    Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get(topic);
    
    // 本质是调用了 ZookeeperConsumerConnector
    val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
    
    • 一个Topic启动几个消费者线程,会生成几个KafkaStream。
    • 一个KafkaStream对应的是一个Queue(有界的LinkedBlockingQueue),有界的参数控制:queued.max.message.chunks。消费者线程数量决定阻塞队列的个数。
    • Fetcher线程是对应topic所在的broker的个数。

    因此,分析Consumer,主要是分析ZookeeperConsumerConnector。代码里面,有两个类,它们是什么关系呢?

    • kafka.consumer.ZookeeperConsumerConnector:核心类
    • kafka.javaapi.consumer.ZookeeperConsumerConnector:对上面那个类的scala数据结构封装,方便Java程序员使用。

    0.8.0 和 0.8.2.1 ZookeeperConsumerConnector的源码不一样,下面以0.8.2.1源码为主来分析,也就是从这个版本开始,可以将Offset存在Kafka的Broker中。(关注实现思想,忽略细节。)

    1.ZookeeperConsumerConnector 架构

    image

    一个Consumer会创建一个ZookeeperConsumerConnector,代表一个消费者进程.

    • fetcher: 消费者获取数据, 使用ConsumerFetcherManager fetcher线程抓取数据
    • zkClient: 消费者要和ZK通信, 除了注册自己,还有其他信息也会写到ZK中
    • topicThreadIdAndQueues: 消费者会指定自己消费哪些topic,并指定线程数, 所以topicThreadId都对应一个队列
    • messageStreamCreated: 消费者会创建消息流, 每个队列都对应一个消息流
    • offsetsChannel: offset可以存储在ZK或者kafka中,如果存在kafka里,像其他请求一样,需要和Broker通信。可以理解成OffsetManager的一部分。
    • scheduler: 后台调度autoCommit
    • 还有其他几个Listener监听器,分别用于topicPartition的更新,负载均衡,消费者重新负载等

    简述获取数据的流程

    1. 初始化上面的几个组件,包括与ZK的连接,创建ConsumerFetcherManager,确保连接上OffsetManager(为该ConsumerGroup建立一个OffsetChannel)。
    2. createMessageStreams创建消息流,反序列化message
    3. 通过Fetcher线程拉取数据,放入BlockingQueue来给客户端。
    4. 客户端启动ZKRebalancerListener,ZKRebalancerListener实例会在内部创建一个线程,这个线程定时检查监听的事件有没有执行(消费者发生变化),如果没有变化则wait 1秒钟,当发生了变化就调用 syncedRebalance 方法,去rebalance消费者。

    1.1 消费者线程(consumer thread),队列(LinkedBlockingQueue),拉取线程(fetch thread)三者之间关系

    以一段代码来说明,消费的topic 12 partition,分配在3台broker机器上。

    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("test-string-topic", new Integer(2)); //value表示consumer thread线程数量
    
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    
    • consumer thread数量与BlockingQueue一一对应。所以上述的代码只有2个BlockQueue。(它们连接的桥梁是KafkaStream)
    • fetcher线程数和topic所在多少台broker有关。因此,共有3个fetcher线程与broker建立一个连接。(3个fetch thread线程去拉取消息数据,最终放到2个BlockingQueue中,等待consumer thread来消费。

    下面是分配的情况:

    • 消费者线程,缓冲队列,partitions分布列表如下
    consumer线程 Blocking Queue partitions
    consumer thread1 blockingQueue1 0,1,2,3,4,5
    consumer thread2 blockingQueue2 6,7,8,9,10,11
    • fetch thread与partitions分布列表如下
    fetch线程 partitions
    fetch thread1 0,3,6,9
    fetch thread2 1,4,7,10
    fetch thread3 2,5,8,11

    用户的consumer thread就使用2个BlockingQueue的数据进行处理;所以一般会使用2个consumer thread去消费这2个BlockingQueue数据。

    1.2 rebalance的流程

    代码上调用:syncedRebalance方法在内部会调用def rebalance(cluster: Cluster): Boolean方法,去执行操作。

    1. // 关闭所有的数据获取者 closeFetchers
    2. // 解除分区的所有者 releasePartitionOwnership
    3. // 按规则得到当前消费者拥有的分区信息并保存到topicRegistry中 topicRegistry=getCurrentConsumerPartitionInfo
    4. // 修改并重启Fetchers updateFetchers

    最后,对每个broker创建一个FetcherRunnable线程,并启动它。这个fetcher线程负责从Broker上不断获取数据,对每个partition分别创建FetchRequest,最后把数据插入BlockingQueue的操作。

    KafkaStreamConsumerIterator做了进一步的封装,我们调用stream的next方法就可以取到数据了(内部通过调用ConsumerIteratornext方法实现)

    1.3 注意

    ConsumerIterator的实现可能会造成数据的重复发送(这要看生产者如何生产数据),FetchedDataChunk是一个数据集合,它内部会包含很多数据块,一个数据块可能包含多条消息,但同一个数据块中的消息只有一个offset,所以当一个消息块有多条数据,处理完部分数据发生异常时,消费者重新去取数据,就会再次取得这个数据块,然后消费过的数据就会被重新消费。

    • 没想到里面,里面是这个样子的,给一个数据块,导致了数据消费的重复。

    3.美团遇到的一个问题

    问题: Kafka中由Consumer维护消费状态,当Consumer消费消息时,支持2种模式commit消费状态,分别为立即commit和周期commit。前者会导致性能低下,做到消息投递恰好一次,但很少使用,后者性能高,通常用于实际应用,但极端条件下无法保证消息不丢失。

    解决方案(这个问题太极端情况,不推荐,长个知识)

    • 将本来的结果改成下面的处理流程:等待“执行业务逻辑”成功完成后更新缓存消费状态,就可以保证消息不会丢失。
      -

    变成下面的:
    image

  • 相关阅读:
    oracle 聚合函数 LISTAGG ,将多行结果合并成一行
    oracle 数据库对于多列求最大值
    Java 简单的rpc 一
    centos7 安装php7
    win10下VM 中centos 安装共享文件
    CentOS7 cannot find a valid baseurl for repo base
    分布式事务
    利用虚拟映射文件加密大文件
    动态代理
    c++ 11 lambda表达式
  • 原文地址:https://www.cnblogs.com/byrhuangqiang/p/6364082.html
Copyright © 2011-2022 走看看