zoukankan      html  css  js  c++  java
  • kafka-consumer端的设计细节

    在记录中,ConsumerA,B,C代表一个消费者,Group代表一个Consumer Group

    Consumer,Group,Topic,Partition的关系

    • Topic逻辑的订阅者是Group,每个Consumer 进程都会划归到一个Group中
    • 一条消息可以被多个Group订阅,就像广播到每个Group,但是只会被这个Group下的一个Consumer实例消费到

    1、Consumer和Group

    Group与Consumer的关系是动态维护的:

    当一个Consumer 进程挂掉 或者是卡住时,该consumer所订阅的partition会被重新分配到该group内的其它的consumer上

    当一个consumer 加入一个group时,会从其它consumer中分配出一个或多个partition 给这个新加入的consumer

    当启动一个Consumer时,会根据配置的group.id指定它要加入的group

    为了维持Consumer 与 Group的联系,需要Consumer周期性的发送heartbeat到coordinator(协调者)

    当Consumer由于某种原因不能发Heartbeat失联时,会认为该consumer已退出,它订阅的partition会分配到其它consumer(rebalance)

    2、Consumer与Partition

    具体的Consumer实例实际订阅的其实是topic下的一个或多个Partition

    partition分配的工作在consumer leader中完成

    3、Coordinator

    group在server端由GroupCoordinator(组协调器)来管理部分组和该组下的每个消费者的消费偏移量

    每个consumer都有一个ConsumerCoordinator,负责与GroupCoordinator保持通信(包括但不限于)

    consumer在poll()和joinGroup()之前必须保证Coordinator状态连接正常

    4、rebalance过程

    一个Consumer要join到一个group中,或者一个consumer退出时,都会触发rebalance。大体经过这几步:

    1)变动的consumer会带上自己的一些元数据信息,向对应的GroupCoordinator发起Join请求

    2)Coordinator 可能会收到不止一个join请求,从组里选出一个leader(就是选队长),并通知给各个consumer(组员)

    3)leader 根据其他consumer的metadata,为每个member重新分配partition。分配完毕通过coordinator把最新分配情况同步给每个consumer

    4)Consumer拿到最新的分配后,继续工作

    注意:所有的consumer要先向coordinator注册,由coordinator选出leader, 然后由leader来分配state。 由leader来执行协调任务, 这样把负载分

    配到client端,可以减轻broker的压力,支持更多数量的消费组,leader分配完后将结果发回组协调器,组协调器同步结果给各member

    Consumer消费过程

     借图,这个是讲清了从消费者实例启动到抓取数据的整个过程,涉及到KafkaConsumer和Broker,GroupCoordinator进行确认,入组,心跳,抓取数据的过程

    poll()

    调用poll()时,consumer发起fetch请求,像partition拉取数据,拉取多少取决于配置的max.partition.fetch.bytes和max.poll.records来配置决定

    可能出现的问题:consumer 进程一直在周期性的发送heartbeat,但是一直不消费消息(不调用poll()拉取消息),这种状态称为livelock

    我取个名字叫"占着茅坑不拉屎"(不管是什么原因导致),这时候占着的茅坑(partition)没法被正常消费,但是也没法让出来给别的consumer

    kafka为你想到了,使用max.poll.interval.ms这个配置来检测,如果蹲大厕不拉屎的时间超过了这个设定时间,会向GroupCoordinator发送一个

    leaveGroup的通知,接着会触发rebalance,然后下一次poll()的时候重新发送joinGroup的请求,这启示我们如果批次消息处理耗时较长,这个检测

    时间(max.poll.interval.ms)应该调大一点点,至少要大于你的处理消息的时间,换一种角度,解决方案是降低处理消息时间,要么优化业务逻辑,

    要么通过max.poll.records设置减小拉取的条数

    commit offset

    前面辨析了,同一个group下,message不会被组员重复消费,不会漏消费,比如刚刚有一个批次的消息,consumerA消费完之后挂了,或者还没消费完就挂了,

    rebalance后其他的consumer怎么知道从哪里开始接着消费呢?

    跟了一遍源码,每个Consumer都有一个ConsumerCoordinator,在这个协调器中保存了一个变量叫SubscriptionState,当调用commit offset的请求时,会

    将这个变量保存的信息一起提交,保存的信息,正是这个组对应的topic下,每个partition消息的位移

    注意:决定消息什么时候被消费,控制权在消费者端

    重要的配置

    bootstrap.servers

    consumer端配置的kafka集群的地址,是一个ip:port的list,逗号隔开,可以只填一个,kafka会自动发现集群里

    其他的broker,但是如果配置的这个broker正好挂了,那就不行了,多多益善

    group.id

    重要属性,消费者必须处于一个消费者组里面,如果消费过程中,更改了groupId,会导致重新消费

    heartbeat.interval.ms

    心跳用于确保消费者ConsumerCoordinator 和协调者GroupCoordinator会话保持活动状态,当消费者加入或离开组时,方便broker端进行rebalance,

    该值必须比session.timeout.ms小,通常不高于1/3。源码中位于AbstracCoordinator这个类下,定义了一个内部类:

    private class HeartbeatThread extends Thread {}
    HeartbeatThread() {
    super("kafka-coordinator-heartbeat-thread" + (AbstractCoordinator.this.groupId.isEmpty() ? "" : " | " + AbstractCoordinator.this.groupId));
    this.setDaemon(true);
    }

    构造方法中设置该线程为一个守护线程,因此对消费者来说,心跳是无感的,消费者实例已启动,心跳线程就开始工作

    在定义的run方法中,检查了各项参数OK之后,会调用AbstractCoordinator.this.heartbeat.sentHeartbeat(now)来定频率的发送心跳

    Heartbeat的结构如下,构造方法中确定heartbeatInterval必须小于sessionTimeout

    public final class Heartbeat {
        private final long sessionTimeout;
        private final long heartbeatInterval;
        private final long maxPollInterval;
        private final long retryBackoffMs;
        private volatile long lastHeartbeatSend;
        private long lastHeartbeatReceive;
        private long lastSessionReset;
        private long lastPoll;
        private boolean heartbeatFailed;
    
        public Heartbeat(long sessionTimeout, long heartbeatInterval, long maxPollInterval, long retryBackoffMs) {
            if (heartbeatInterval >= sessionTimeout) {
                throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout");
            } else {
                this.sessionTimeout = sessionTimeout;
                this.heartbeatInterval = heartbeatInterval;
                this.maxPollInterval = maxPollInterval;
                this.retryBackoffMs = retryBackoffMs;
            }
        }
    }

    这个地方贴一个重点,Heartbeat中会判断两个超时来决定消费者是否"脱离了组织"

    public boolean sessionTimeoutExpired(long now) {
        return now - Math.max(this.lastSessionReset, this.lastHeartbeatReceive) > this.sessionTimeout;
    }
    public boolean pollTimeoutExpired(long now) { return now - this.lastPoll > this.maxPollInterval; }

    每次心跳会保存上一次发送心跳请求的最后时刻点,然后比较心跳有没有断开,因为sessionTimeout是单独的一个设计点,

    可能和心跳的过程重合,所以取的是两者最近的时刻作为最后一次确认是连接状态的时间点

    run()中对超时处理是这么定义的:session超时会判定消费者挂了,将会被踢下线,然后组进行rebalance;poll超时会离开group,之所以是maybeLeave,

    是因为下次poll的时候可能再重新入组,因为这个poll是KafkaConsumer主动调用的,所以如果上一次poll没拉回消息,也不要"休息"太久,

    这样会平凡源码如下,方法名很容易读:

    else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) {
        AbstractCoordinator.this.coordinatorDead();
    } else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(now)) {
        AbstractCoordinator.this.maybeLeaveGroup();
    } 

     enable.auto.commit

    是否自动提交offset,默认为true,只能保证at least once,通常大部分业务都要设置为false,业务手动提交

  • 相关阅读:
    C#学习笔记10
    C#学习笔记9
    C#学习笔记8
    C#学习笔记7
    C#学习笔记6
    C#学习笔记5
    C#学习笔记4
    distinct() 去重复
    row_number over ()排序函数
    当没有用 EXISTS 引入子查询时,在选择列表中只能指定一个表达式。
  • 原文地址:https://www.cnblogs.com/yb38156/p/14590350.html
Copyright © 2011-2022 走看看