在记录中,ConsumerA,B,C代表一个消费者,Group代表一个Consumer Group
Consumer,Group,Topic,Partition的关系
- Topic逻辑的订阅者是Group,每个Consumer 进程都会划归到一个Group中
- 一条消息可以被多个Group订阅,就像广播到每个Group,但是只会被这个Group下的一个Consumer实例消费到
1、Consumer和Group
Group与Consumer的关系是动态维护的:
当一个Consumer 进程挂掉 或者是卡住时,该consumer所订阅的partition会被重新分配到该group内的其它的consumer上
当一个consumer 加入一个group时,会从其它consumer中分配出一个或多个partition 给这个新加入的consumer
当启动一个Consumer时,会根据配置的group.id指定它要加入的group
为了维持Consumer 与 Group的联系,需要Consumer周期性的发送heartbeat到coordinator(协调者)
当Consumer由于某种原因不能发Heartbeat失联时,会认为该consumer已退出,它订阅的partition会分配到其它consumer(rebalance)
2、Consumer与Partition
具体的Consumer实例实际订阅的其实是topic下的一个或多个Partition
partition分配的工作在consumer leader中完成
3、Coordinator
group在server端由GroupCoordinator(组协调器)来管理部分组和该组下的每个消费者的消费偏移量
每个consumer都有一个ConsumerCoordinator,负责与GroupCoordinator保持通信(包括但不限于)
consumer在poll()和joinGroup()之前必须保证Coordinator状态连接正常
4、rebalance过程
一个Consumer要join到一个group中,或者一个consumer退出时,都会触发rebalance。大体经过这几步:
1)变动的consumer会带上自己的一些元数据信息,向对应的GroupCoordinator发起Join请求
2)Coordinator 可能会收到不止一个join请求,从组里选出一个leader(就是选队长),并通知给各个consumer(组员)
3)leader 根据其他consumer的metadata,为每个member重新分配partition。分配完毕通过coordinator把最新分配情况同步给每个consumer
4)Consumer拿到最新的分配后,继续工作
注意:所有的consumer要先向coordinator注册,由coordinator选出leader, 然后由leader来分配state。 由leader来执行协调任务, 这样把负载分
配到client端,可以减轻broker的压力,支持更多数量的消费组,leader分配完后将结果发回组协调器,组协调器同步结果给各member
Consumer消费过程
借图,这个是讲清了从消费者实例启动到抓取数据的整个过程,涉及到KafkaConsumer和Broker,GroupCoordinator进行确认,入组,心跳,抓取数据的过程
poll()
调用poll()时,consumer发起fetch请求,像partition拉取数据,拉取多少取决于配置的max.partition.fetch.bytes和max.poll.records来配置决定
可能出现的问题:consumer 进程一直在周期性的发送heartbeat,但是一直不消费消息(不调用poll()拉取消息),这种状态称为livelock
我取个名字叫"占着茅坑不拉屎"(不管是什么原因导致),这时候占着的茅坑(partition)没法被正常消费,但是也没法让出来给别的consumer
kafka为你想到了,使用max.poll.interval.ms这个配置来检测,如果蹲大厕不拉屎的时间超过了这个设定时间,会向GroupCoordinator发送一个
leaveGroup的通知,接着会触发rebalance,然后下一次poll()的时候重新发送joinGroup的请求,这启示我们如果批次消息处理耗时较长,这个检测
时间(max.poll.interval.ms)应该调大一点点,至少要大于你的处理消息的时间,换一种角度,解决方案是降低处理消息时间,要么优化业务逻辑,
要么通过max.poll.records设置减小拉取的条数
commit offset
前面辨析了,同一个group下,message不会被组员重复消费,不会漏消费,比如刚刚有一个批次的消息,consumerA消费完之后挂了,或者还没消费完就挂了,
rebalance后其他的consumer怎么知道从哪里开始接着消费呢?
跟了一遍源码,每个Consumer都有一个ConsumerCoordinator,在这个协调器中保存了一个变量叫SubscriptionState,当调用commit offset的请求时,会
将这个变量保存的信息一起提交,保存的信息,正是这个组对应的topic下,每个partition消息的位移
注意:决定消息什么时候被消费,控制权在消费者端
重要的配置
bootstrap.servers
consumer端配置的kafka集群的地址,是一个ip:port的list,逗号隔开,可以只填一个,kafka会自动发现集群里
其他的broker,但是如果配置的这个broker正好挂了,那就不行了,多多益善
group.id
重要属性,消费者必须处于一个消费者组里面,如果消费过程中,更改了groupId,会导致重新消费
heartbeat.interval.ms
心跳用于确保消费者ConsumerCoordinator 和协调者GroupCoordinator会话保持活动状态,当消费者加入或离开组时,方便broker端进行rebalance,
该值必须比session.timeout.ms小,通常不高于1/3。源码中位于AbstracCoordinator这个类下,定义了一个内部类:
private class HeartbeatThread extends Thread {}
HeartbeatThread() {
super("kafka-coordinator-heartbeat-thread" + (AbstractCoordinator.this.groupId.isEmpty() ? "" : " | " + AbstractCoordinator.this.groupId));
this.setDaemon(true);
}
构造方法中设置该线程为一个守护线程,因此对消费者来说,心跳是无感的,消费者实例已启动,心跳线程就开始工作
在定义的run方法中,检查了各项参数OK之后,会调用AbstractCoordinator.this.heartbeat.sentHeartbeat(now)来定频率的发送心跳
Heartbeat的结构如下,构造方法中确定heartbeatInterval必须小于sessionTimeout
public final class Heartbeat { private final long sessionTimeout; private final long heartbeatInterval; private final long maxPollInterval; private final long retryBackoffMs; private volatile long lastHeartbeatSend; private long lastHeartbeatReceive; private long lastSessionReset; private long lastPoll; private boolean heartbeatFailed; public Heartbeat(long sessionTimeout, long heartbeatInterval, long maxPollInterval, long retryBackoffMs) { if (heartbeatInterval >= sessionTimeout) { throw new IllegalArgumentException("Heartbeat must be set lower than the session timeout"); } else { this.sessionTimeout = sessionTimeout; this.heartbeatInterval = heartbeatInterval; this.maxPollInterval = maxPollInterval; this.retryBackoffMs = retryBackoffMs; } } }
这个地方贴一个重点,Heartbeat中会判断两个超时来决定消费者是否"脱离了组织"
public boolean sessionTimeoutExpired(long now) { return now - Math.max(this.lastSessionReset, this.lastHeartbeatReceive) > this.sessionTimeout;
} public boolean pollTimeoutExpired(long now) { return now - this.lastPoll > this.maxPollInterval; }
每次心跳会保存上一次发送心跳请求的最后时刻点,然后比较心跳有没有断开,因为sessionTimeout是单独的一个设计点,
可能和心跳的过程重合,所以取的是两者最近的时刻作为最后一次确认是连接状态的时间点
run()中对超时处理是这么定义的:session超时会判定消费者挂了,将会被踢下线,然后组进行rebalance;poll超时会离开group,之所以是maybeLeave,
是因为下次poll的时候可能再重新入组,因为这个poll是KafkaConsumer主动调用的,所以如果上一次poll没拉回消息,也不要"休息"太久,
这样会平凡源码如下,方法名很容易读:
else if (AbstractCoordinator.this.heartbeat.sessionTimeoutExpired(now)) { AbstractCoordinator.this.coordinatorDead(); } else if (AbstractCoordinator.this.heartbeat.pollTimeoutExpired(now)) { AbstractCoordinator.this.maybeLeaveGroup(); }
enable.auto.commit
是否自动提交offset,默认为true,只能保证at least once,通常大部分业务都要设置为false,业务手动提交