zoukankan      html  css  js  c++  java
  • Kafka Consumer 原理与实践

    1、Kafka 版本说明

    从上面截图已经很清楚的看到,针对版本 kafka_2.12-2.6.0.tgz ,2.12是编译Kafka Server端的Scala版本,2.6.0是Kafka Server的版本!

    另外,目前Kafka客户端从某个版本开始已经用Java重写了,服务端依然还是Scala版本!

    2、消费者组(Consumer Group)

    • 一个consumer group包含多个consumer(进程或线程),每个消费者组用一个group id唯一标识;
    • consumer group订阅的某个topic(可以订阅多个topic)下的每个partition只能被组里的一个consumer所消费;更深层次的讲,consumer group订阅的topic下的消息是以partition为单位在组里的多个consumer上进行负载均衡的,这点也保证了partition层面消息被消费的顺序性;如果consumer group订阅的所有topic下总的partition数量大于组内consumer数量,则会出现一个消费者消费多个partition的情况,如果partition数量小于组内consumer数量,则会出现有的consumer消费不到partition的情况;
    • consumer group是一个逻辑订阅者,多个consumer group可以消费相同的topic中的消息,相当于同一个topic下的消息被广播给多个consumer group;

    举个简单的例子,如果某位名人正在给粉丝进行签名活动,但是限制过来签名的人必须以家庭为单位,且给每个家庭的签名数量都是固定的4次,如果家庭成员数量多于4个,就会存在有的家庭成员得不到签名,如果家庭成员少于3个,就会出现有的家庭成员得到一个以上的签名(签名数总是要用完的嘛,毕竟除了收藏还可以卖),在这里,这个名人就类似于一个topic,每个家庭就是一个consumer group,家庭成员就是consumer,签名数就是topic下的partition数目,再扩展一种更宽泛的场景,如果这次活动来的不是一个名人,而是多个,那么一个家庭就可以得到多个名人的签名,这就是consumer group消费多个topic的场景。

    3、位移管理(offset management)

    3.1、自动VS手动

    Kafka默认是定期帮你自动提交位移的(enable.auto.commit = true),你当然可以选择手动提交位移实现自己控制。另外kafka会定期把group消费情况保存起来,做成一个offset map,如下图所示:

     上图中表明了test-group这个组当前的消费情况。

    3.2、位移提交

    老版本的位移是提交到zookeeper中的,目录结构是:/consumers/<group.id>/offsets/<topic>/<partitionId>,但是zookeeper其实并不适合进行大批量的读写操作,尤其是写操作。因此kafka提供了另一种解决方案:增加__consumer_offsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每个consumer group某一时刻提交的offset信息。依然以上图中的consumer group为例,格式大概如下:

     

     __consumers_offsets topic配置了compact策略,使得它总是能够保存最新的位移信息,既控制了该topic总体的日志容量,也能实现保存最新offset的目的。compact的具体原理请参见:Log Compaction

     至于每个group保存到__consumers_offsets的哪个分区,如何查看的问题请参见这篇文章:Kafka 如何读取offset topic内容 (__consumer_offsets)

    4、Rebalance

    4.1 什么是rebalance?

    rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。比如某个group下有20个consumer,它订阅了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。

    4.2 什么时候rebalance?

    这也是经常被提及的一个问题。rebalance的触发条件有三种:

    • 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了——这两者的区别后面会谈到)
    • 订阅主题数发生变更——这当然是可能的,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
    • 订阅主题的分区数发生变更

    4.3 如何进行组内分区分配?

    之前提到了group下的所有consumer都会协调在一起共同参与分配,这是如何完成的?Kafka新版本consumer默认提供了两种分配策略:range和round-robin。当然Kafka采用了可插拔式的分配策略,你可以创建自己的分配器以实现不同的分配策略。实际上,由于目前range和round-robin两种分配器都有一些弊端,Kafka社区已经提出第三种分配器来实现更加公平的分配策略,只是目前还在开发中。我们这里只需要知道consumer group默认已经帮我们把订阅topic的分区分配工作做好了就行了。

    简单举个例子,假设目前某个consumer group下有两个consumer: A和B,当第三个成员加入时,kafka会触发rebalance并根据默认的分配策略重新为A、B和C分配分区,如下图所示:

     

    4.4 谁来执行rebalance和consumer group管理?

    Kafka提供了一个角色:coordinator来执行对于consumer group的管理。坦率说kafka对于coordinator的设计与修改是一个很长的故事。最新版本的coordinator也与最初的设计有了很大的不同。这里我只想提及两次比较大的改变。

    首先是0.8版本的coordinator,那时候的coordinator是依赖zookeeper来实现对于consumer group的管理的。Coordinator监听zookeeper的/consumers/<group>/ids的子节点变化以及/brokers/topics/<topic>数据变化来判断是否需要进行rebalance。group下的每个consumer都自己决定要消费哪些分区,并根据自己的决定抢先在zookeeper中的/consumers/<group>/owners/<topic>/<partition>下注册。很明显,这种方案要依赖于zookeeper的帮助,而且每个consumer是单独做决定的,没有那种“大家属于一个组,要协商做事情”的精神。

    基于这些潜在的弊端,0.9版本的kafka改进了coordinator的设计,提出了group coordinator——每个consumer group都会被分配一个这样的coordinator用于组管理和位移管理。这个group coordinator比原来承担了更多的责任,比如组成员管理、位移提交保护机制等。当新版本consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。显而易见,这种coordinator设计不再需要zookeeper了,性能上可以得到很大的提升。后面的所有部分我们都将讨论最新版本的coordinator设计。

    4.5 如何确定coordinator?

    上面简单讨论了新版coordinator的设计,那么consumer group如何确定自己的coordinator是谁呢? 简单来说分为两步:

    • 确定consumer group位移信息写入__consumers_offsets的哪个分区。具体计算公式:
      •   __consumers_offsets partition# = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)   注意:groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
    • 该分区leader所在的broker就是被选定的coordinator

    4.6 Rebalance Generation

    JVM GC的分代收集就是这个词,我这里把它翻译成“届”好了,它表示了rebalance之后的一届成员,主要是用于保护consumer group,隔离无效offset提交的。比如上一届的consumer成员是无法提交位移到新一届的consumer group中。我们有时候可以看到ILLEGAL_GENERATION的错误,就是kafka在抱怨这件事情。每次group进行rebalance之后,generation号都会加1,表示group进入到了一个新的版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入Generation 2,之后成员4加入,再次触发rebalance,group进入Generation 3.

    4.7 协议(protocol)

    前面说过了, rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题:

    • Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着
    • LeaveGroup请求:主动告诉coordinator我要离开consumer group
    • SyncGroup请求:group leader把分配方案告诉组内所有成员
    • JoinGroup请求:成员请求加入组
    • DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

    Coordinator在rebalance的时候主要用到了前面4种请求。

    4.8 liveness

    consumer如何向coordinator证明自己还活着? 通过定时向coordinator发送Heartbeat请求。如果超过了设定的超时时间,那么coordinator就认为这个consumer已经挂了。一旦coordinator认为某个consumer挂了,那么它就会开启新一轮rebalance,并且在当前其他consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告诉其他consumer:不好意思各位,你们重新申请加入组吧!

    4.9 Rebalance过程

    终于说到consumer group执行rebalance的具体流程了。很多用户估计对consumer内部的工作机制也很感兴趣。下面就跟大家一起讨论一下。当然我必须要明确表示,rebalance的前提是coordinator已经确定了

    总体而言,rebalance分为2步:Join和Sync

    1 Join, 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求入组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定

    2 Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

    还是拿几张图来说明吧,首先是加入组的过程:

    值得注意的是, 在coordinator收集到所有成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方。记得国内有篇文章以此来证明kafka开发人员都是很有文艺范的,写得也是比较有趣,有兴趣可以去搜搜。
    然后是分发分配方案的过程,即SyncGroup请求:

    注意!! consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是因为这样做可以有更好的灵活性。比如这种机制下我可以实现类似于Hadoop那样的机架感知(rack-aware)分配方案,即为consumer挑选同一个机架下的分区数据,减少网络传输的开销。Kafka默认为你提供了两种分配策略:range和round-robin。由于这不是本文的重点,这里就不再详细展开了,你只需要记住你可以覆盖consumer的参数:partition.assignment.strategy来实现自己分配策略就好了。

    4.10 consumer group状态机

    和很多kafka组件一样,group也做了个状态机来表明组状态的流转。coordinator根据这个状态机会对consumer group做不同的处理,如下图所示(完全是根据代码注释手动画的,多见谅吧)

    简单说明下图中的各个状态:

    • Dead:组内已经没有任何成员的最终状态,组的元数据也已经被coordinator移除了。这种状态响应各种请求都是一个response: UNKNOWN_MEMBER_ID
    • Empty:组内无成员,但是位移信息还没有过期。这种状态只能响应JoinGroup请求
    • PreparingRebalance:组准备开启新的rebalance,等待成员加入
    • AwaitingSync:正在等待leader consumer将分配方案传给各个成员
    • Stable:rebalance完成!可以开始消费了~

    至于各个状态之间的流程条件以及action,这里就不具体展开了。

    5、rebalance场景剖析

    上面详细阐述了consumer group是如何执行rebalance的,可能依然有些云里雾里。这部分对其中的三个重要的场景做详尽的时序展开,进一步加深对于consumer group内部原理的理解。由于图比较直观,所有的描述都将以图的方式给出,不做过多的文字化描述了。

    1 新成员加入组(member join) 

    2 组成员崩溃(member failure)

    前面说过了,组成员崩溃和组成员主动离开是两个不同的场景。因为在崩溃时成员并不会主动地告知coordinator此事,coordinator有可能需要一个完整的session.timeout周期才能检测到这种崩溃,这必然会造成consumer的滞后。可以说离开组是主动地发起rebalance;而崩溃则是被动地发起rebalance。直接上图: 

    3 组成员主动离组(member leave group)

     4 提交位移(member commit offset)

     总结一下,本文着重讨论了一下新版本的consumer group的内部设计原理,特别是consumer group与coordinator之间的交互过程,希望对各位有所帮助。

    6 Consumer Fetch Message

    上图中,Consumer A、B分属于不用的Consumer Group。Consumer B读取到offset =11,Consumer A读取到offset=9 。这个值表示Consumer Group中的某个Consumer 在下次读取该partition时会从哪个offset的 message开始读取,即 Consumer Group A 中的Consumer下次会从offset = 9 的message 读取, Consumer Group B 中的Consumer下次会从offset = 11 的message 读取。

    这里并没有说是Consumer A 下次会从offset = 9 的message读取,原因是Consumer A可能会退出Group ,然后Group A 进行rebalance,即重新分配分区

    6.1 poll 方法

      poll方法内部通过调用fetch方法来间接地从partition拉取消息,大致流程就是,fetch方法通过网络请求从broker中拉取一定量的消息(通过配置项max.partition.fetch.bytes来限制)放到consumer端的缓存中。poll方法时再从缓存中读取一定量的消息用来消费处理(通过配置项max.poll.records来限制一次最多poll多少个record)。

      举个例子: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record.。所以poll方法只是可能会发起fetch请求,即在本地缓存中没有消息的情况下再调用fetch方法去拉取消息。

      在consumer中,还有另外一个配置项:max.poll.interval.ms ,它表示最大的poll数据间隔,如果超过这个间隔没有发起pool请求,但heartbeat仍旧在发,就认为该consumer处于 livelock状态。就会将该consumer退出consumer group。所以为了不使Consumer 自己被退出,Consumer 应该不停的发起poll(timeout)操作。而这个动作 KafkaConsumer Client是不会帮我们做的,这就需要自己在程序中不停的调用poll方法了。

    6.2 commit offset

        当一个consumer因某种原因退出Group时,进行重新分配partition后,同一group中的另一个consumer在读取该partition时,怎么能够知道上一个consumer该从哪个offset的message读取呢?也就是如何保证同一个group内的consumer不重复消费消息呢?上面说了一次走网络的fetch请求会拉取到一定量的数据,但是这些数据还没有被消息完毕,Consumer就挂掉了,下一次进行数据fetch时,是否会从上次读到的数据开始读取,而导致Consumer消费的数据丢失吗?

        为了做到这一点,当使用poll从本地缓存拉取到数据之后,需要client调用commitSync方法(或者commitAsync方法)去commit 下一次开始读取的offset 。

        而这个commit方法会通过走网络的commit请求将offset在coordinator中保留,这样就能够保证下一次读取(不论进行了rebalance)时,既不会重复消费消息,也不会遗漏消息。

        对于offset的commit,Kafka Consumer Java Client支持两种模式:由KafkaConsumer自动提交,或者是用户通过调用commitSync、commitAsync方法的方式完成offset的提交。

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
    for (ConsumerRecord<String, String> record : records) {
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
    }

    手动提交的例子: 

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("foo", "bar"));
    final int minBatchSize = 200;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2000));
    for (ConsumerRecord<String, String> record : records) {
    buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
    insertIntoDb(buffer);
    consumer.commitSync();
    buffer.clear();
    }
    }

    在手动提交时,需要注意的一点是:要提交的是下一次要读取的offset,例如: 

    try {
        while (true) {
            // 取得消息
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
            // 根据分区来遍历数据:
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                // 数据处理
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.println(record.offset() + ": " + record.value());
                }
                // 取得当前读取到的最后一条记录的offset
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 提交offset,记得要 + 1
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    } finally {
        consumer.close();
    }

    7、Consumer的线程安全性

    KafkaProducer是线程安全的,上一节已经了解到。但Consumer却没有设计成线程安全的。当用户想要在在多线程环境下使用kafkaConsumer时,需要自己来保证synchronized。如果没有这样的保证,就会抛出ConcurrentModificatinException的。

    当你想要关闭Consumer或者为也其它的目的想要中断Consumer的处理时,可以调用consumer的wakeup方法。这个方法会抛出WakeupException。

    class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;
    
        public KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }
    
        @Override
        public void run() {
            try {
                consumer.subscribe(Arrays.asList("topic"));
                while (!closed.get()) {
                    ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
                    // Handle new records
                }
            } catch (WakeupException e) {
                // Ignore exception if closing
                if (!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }
    
        // Shutdown hook which can be called from a separate thread
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }

    8、Consumer Configuration

    在kafka 0.9+使用Java Consumer替代了老版本的scala Consumer。新版的配置如下:

    ·bootstrap.servers

    在启动consumer时配置的broker地址的。不需要将cluster中所有的broker都配置上,因为启动后会自动的发现cluster所有的broker。它配置的格式是:host1:port1;host2:port2…

     

    ·key.descrializervalue.descrializer

    Message record 的key, value的反序列化类。

     

    ·group.id

    用于表示该consumer想要加入到哪个group中。默认值是 “”。

     

    ·heartbeat.interval.ms

    心跳间隔。心跳是在consumer与coordinator之间进行的。心跳是确定consumer存活,加入或者退出group的有效手段。

    这个值必须设置的小于session.timeout.ms,因为:

    当Consumer由于某种原因不能发Heartbeat到coordinator时,并且时间超过session.timeout.ms时,就会认为该consumer已退出,它所订阅的partition会分配到同一group 内的其它的consumer上。

    通常设置的值要低于session.timeout.ms的1/3。默认值是:3000 (3s)

     

    ·session.timeout.ms

    Consumer session 过期时间。这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。

    其默认值是:10000 (10 s)

     

    ·enable.auto.commit

    Consumer 在commit offset时有两种模式:自动提交,手动提交。手动提交在前面已经说过。自动提交:是Kafka Consumer会在后台周期性的去commit。

    默认值是true。

     

    ·auto.commit.interval.ms

    自动提交间隔。范围:[0,Integer.MAX],默认值是 5000 (5 s)

     

    ·auto.offset.reset

    这个配置项,是告诉Kafka Broker在发现kafka在没有初始offset,或者当前的offset是一个不存在的值(如果一个record被删除,就肯定不存在了)时,该如何处理。它有4种处理方式:

    1) earliest:自动重置到最早的offset。

    2) latest:自动重置到最晚的offset。

    3) none:如果连更早的offset也没有的话,就抛出异常给consumer,告诉consumer在整个consumer group中都没有发现有这样的offset。

    4) 如果不是上述3种,只抛出异常给consumer。

    默认值是latest。

     

    ·connections.max.idle.ms

    连接空闲超时时间。因为consumer只与broker有连接(coordinator也是一个broker),所以这个配置的是consumer到broker之间的。

    默认值是:540000 (9 min)

     

    ·fetch.max.wait.ms

    Fetch请求发给broker后,在broker中可能会被阻塞的(当topic中records的总size小于fetch.min.bytes时),此时这个fetch请求耗时就会比较长。这个配置就是来配置consumer最多等待response多久。

     

    ·fetch.min.bytes

    当consumer向一个broker发起fetch请求时,broker返回的records的大小最小值。如果broker中数据量不够的话会wait,直到数据大小满足这个条件。

    取值范围是:[0, Integer.Max],默认值是1。

    默认值设置为1的目的是:使得consumer的请求能够尽快的返回。

     

    ·fetch.max.bytes

    一次fetch请求,从一个broker中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这种情况下,只会返回这一条record。

    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。

    取值范围是:[0, Integer.Max],默认值是:52428800 (5 MB)

     

    ·max.partition.fetch.bytes

    一次fetch请求,从一个partition中取得的records最大大小。如果在从topic中第一个非空的partition取消息时,如果取到的第一个record的大小就超过这个配置时,仍然会读取这个record,也就是说在这片情况下,只会返回这一条record。

    broker、topic都会对producer发给它的message size做限制。所以在配置这值时,可以参考broker的message.max.bytes 和 topic的max.message.bytes的配置。

     

    ·max.poll.interval.ms

    前面说过要求程序中不间断的调用poll()。如果长时间没有调用poll,且间隔超过这个值时,就会认为这个consumer失败了。

     

    ·max.poll.records

    Consumer每次调用poll()时取到的records的最大数。

     

    ·receive.buffer.byte

    Consumer receiver buffer (SO_RCVBUF)的大小。这个值在创建Socket连接时会用到。

    取值范围是:[-1, Integer.MAX]。默认值是:65536 (64 KB)

    如果值设置为-1,则会使用操作系统默认的值。

     

    ·request.timeout.ms

    请求发起后,并不一定会很快接收到响应信息。这个配置就是来配置请求超时时间的。默认值是:305000 (305 s)

     

    ·client.id

    Consumer进程的标识。如果设置一个人为可读的值,跟踪问题会比较方便。

     

    ·interceptor.classes

    用户自定义interceptor。

     

    ·metadata.max.age.ms

    Metadata数据的刷新间隔。即便没有任何的partition订阅关系变更也行执行。

    范围是:[0, Integer.MAX],默认值是:300000 (5 min)

     

    参考:

    https://www.cnblogs.com/huxi2b/p/6223228.html

    https://www.cnblogs.com/f1194361820/p/6054148.html

  • 相关阅读:
    Lucene 全文检索
    Redis 集群
    Redis 初步接触
    Mybatis
    FastJson 介绍
    JAVA微信企业付款到零钱(十分钟搞定),附完整DEMO下载
    持续集成与Devops关系
    GIT命令行统计代码提交行数
    一种简单的REST API接口加密实现,只允许自己的产品调用后台,防止接口被刷
    Beyond Compare 4.X 破解方法(亲测有效)
  • 原文地址:https://www.cnblogs.com/codestarer/p/13616810.html
Copyright © 2011-2022 走看看