消费者消费消息过程(一)
消费者组
消费者是以消费者组consumer group的方式进行消息消费的,一个消费者组是由一个或者多个消费者组成的,共同消费一个topic,在每个分区同一时间只能由消费者组中的一个消费者读取,一个消费者组中的消费者可以同时读取同一个topic中的多个partition。不同group之间可以消费相同的partition。
kafka采用消费者组保证了:一个Patition只允许被一个消费者组的一个消费者消费。
由此可得出:一个消费者可以消费多个partition,不同的消费者消费的partition一定不会重复;所有消费者组共同消费所有的Partition;也就是同一个消费者组内的消费者是互斥的,不同的消费者组之间是共享所有的Partition的。
假如两个消费者消费同一个topic,如果连个消费者在不同的消费者组中,则每个消费者都会获取到这个topic的所有消息;如果这两个消费者在同一个消费者组中,则它们只能各自获取一半的记录。
发布订阅与点对点消息通信
同一条消息会被多个订阅了这个topic的不同消费者组消费;如果有多个消费者组,每个组内只有一个消费者,那么就实现了发布订阅模式
只有一个消费者组,这个消费者组有多个消费者,一条消息只能被这个组内的一个消费者消费,实现点对点模式
但实际应用中一般都是由多个消费者组,每个消费者组中由多个消费者。
负载均衡Rebalance
将topic的所有的partition平均负载给消费者组中的所有消费者。比如一个topic有四个partition,一个消费者组有两个消费者,那么每个消费者都会分配到两个partition。
一个消费者组有多个消费者,如果其中一个消费者宕机,分配给这个消费者的partition需要被重新分配给相同组的其他消费者;如果一个消费者加入了同一个组,之前分配给其他消费者的partition需要分配给新加入的消费者。实际上一旦有消费者加入或者退出消费者组,导致消费列表变化,即使partition没有变化,消费组中的所有消费者也要触发重新rebalance的工作。如果集群中的partition发生变化,即使消费组成员没有变化,所有的消费者也要重新rebalance。
比如之前有C1,C2连个消费者,它们消费的消息分别是P0,P3和P1,P2
新添加一个消费者C3,则C1,C2,C3分别消费的消息是P0,P2,P1,P3
rebalance的触发时机
consumer的添加或者减少,partition的变化(比如broker的增加或者删除)都会触发rebalance。
rebalance算法
1、将topic下的所有partition进行排序,并存放在PT中。
2、将consumer group下的所有consumer进行排序,存放在CG中,第i个编号为Ci;
3、N=size(PT)/size(CG)向上取整
4、接触Ci对原来分配的partition的消费权,从0开始
5、将第iN到(i+1)N-1个partition分配给Ci
rebalance前后的动作
rebalance之前需要把消费者对partition的消费进度保存起来,这样在rebalance之后新分配的消费者可以保存的进度继续读取partition。这样确保了消息不会被重新消费。
消费进度的保存
每个Partition的消费进度应该是面向消费者组的。如果面向消费者,那么rebalance之后的消费者并不知道rebalance之前消费partition的进度。但是如果是消费者组级别的,因为rebalance前后的两个消费者同属于同一个消费者组,所以在rebalance之后的消费者可以从消费者组中读取消费进度。通常是借助外部的存储系统来保存消费进度的比如zookeeper或者Kafka内部的topic。所以消费者消费消息时需要实时的将最新的消费进度保存到zookeeper中。
Partition数量与消费者线程数量问题
一个Partition只能属于一个消费者线程,所以可能会有一下情况:
1、线程数量多于Partition数量-----部分线程无法消费该topic的消息
2、线程数量等于Partition数量-----一个线程对应一个Partition
3、线程数量少于Partition-----一些线程会消费多个Partition
通常情况下,第三中情况是最好的,这要会充分压榨线程的劳动能力。
一个消费者消费多个Partition消息的顺序问题
同一个Partition的消息消费是有序的(在生产者不继续添加消息的情况下),但是多个Partition之间的消息消费并不能保证是有序的。
offset与消息消费进度
生产者提交的日志采用递增的offset连同消息内容一起写入到本地日志文件,生产者客户端本身不需要保存offset的状态,但是消费者进程需要保存消费消息的offset,这样消息才能将消息的消费进度保存到zookeeper中。
消费方式消费者采用主动向服务端pull拉取数据,而不是采用服务端push推送数据给消费者。
主动向服务端pull数据与服务端push数据给消费者:如果服务端push数据给消费者,那么消费者就只需被动接收就行了,也不用记录offset的消费进度。这样就增加了服务端的压力,服务端也不知道消费者的接收速度,如果消费者来不及处理,就会造成消息积压处理不及时。并且消息的消费进度是和消费者有关的,所以使用统一的外部存储,每个消费者将自己消费的消息offset写到存储系统中,这样消费者也可以自己掌控获取消息的速度。
offset的读取时机:rebalance之前,rebalance之后。