Kafka的分区,相当于把一个Topic再细分成了多个通道(对应 多个线程)
部署的时候尽量做到一个消费者(线程)对应一个分区。
如何确定Kafka的分区数,key和consumer线程数,以及不消费问题解决
怎么确定分区数?
Kafka官网上标榜自己是"high-throughput distributed messaging system",即一个高吞吐量的分布式消息引擎。那么怎么达到高吞吐量呢?
Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。但是,这只是一个方面,毕竟单机优化的能力是有上限的。
如何通过水平扩展甚至是线性扩展来进一步提升吞吐量呢? Kafka就是使用了分区(partition),通过将topic的消息打散到多个分区并分布保存在不同的broker上实现了消息处理(不管是producer还是consumer)的高吞吐量。
Kafka的生产者和消费者都可以多线程地并行操作,而每个 线程 处理的是(对应的是) 一个分区的数据。因此分区实际上是调优Kafka并行度的最小单元。
对于producer而言,它实际上是用多个线程并发地向不同分区所在的broker发起Socket连接同时给这些分区发送消息;
而consumer呢,同一个消费组内的所有consumer线程都被指定topic的某一个分区进行消费(具体如何确定consumer线程数目我们后面会详细说明)。所以说,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。
但分区是否越多越好呢?
显然也不是,因为每个分区都有自己的开销:
一、客户端/服务器端需要使用的内存就越多
二、文件句柄的开销
三、降低高可用性:
Kafka通过副本(replica)机制来保证高可用。具体做法就是为每个分区保存若干个副本(replica_factor指定副本数)。每个副本保存在不同的broker上。期中的一个副本充当leader 副本,负责处理producer和consumer请求。其他副本充当follower角色,由Kafka controller负责保证与leader的同步。如果leader所在的broker挂掉了,contorller会检测到然后在zookeeper的帮助下重选出新的leader——这中间会有短暂的不可用时间窗口,虽然大部分情况下可能只是几毫秒级别。但如果你有10000个分区,10个broker,也就是说平均每个broker上有1000个分区。此时这个broker挂掉了,那么zookeeper和controller需要立即对这1000个分区进行leader选举。比起很少的分区leader选举而言,这必然要花更长的时间,并且通常不是线性累加的。
默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions
如果你的分区数是N,那么最好线程数也保持为N。
KafkaStream——它是consumer的关键类,提供了遍历方法用于consumer程序调用实现数据的消费。
其底层维护了一个阻塞队列,所以在没有新消息到来时,consumer是处于阻塞状态的,即consumer程序一直在等待新消息的到来。
你也可以配置成带超时的consumer,参看参数consumer.timeout.ms。
下面说说 Kafka提供的两种分配策略: range和roundrobin,由参数partition.assignment.strategy指定,默认是range策略。
本文只讨论range策略。所谓的range其实就是按照阶段平均分配。举个例子就明白了,
假设你有10个分区,P0 ~ P9,consumer线程数是3, C0 ~ C2,那么每个线程都分配哪些分区呢?
C0 消费分区 0, 1, 2, 3
C1 消费分区 4, 5, 6
C2 消费分区 7, 8, 9
具体算法就是:
val nPartsPerConsumer = curPartitions.size / curConsumers.size // 每个consumer至少保证消费的分区数
val nConsumersWithExtraPart = curPartitions.size % curConsumers.size // 还剩下多少个分区需要单独分配给开头的线程们
...
for (consumerThreadId <- consumerThreadIdSet) { // 对于每一个consumer线程
val myConsumerPosition = curConsumers.indexOf(consumerThreadId) //算出该线程在所有线程中的位置,介于[0, n-1]
assert(myConsumerPosition >= 0)
// startPart 就是这个线程要消费的起始分区数
val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
// nParts 就是这个线程总共要消费多少个分区
val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)
...
}
针对于这个例子,nPartsPerConsumer就是10/3=3,nConsumersWithExtraPart为10%3=1,说明每个线程至少保证3个分区,还剩下1个分区需要单独分配给开头的若干个线程。
说到这里,经常有个需求就是我想让某个consumer线程消费指定的分区而不消费其他的分区。坦率来说,目前Kafka并没有提供自定义分配策略。
不消费问题
第一步:参看消费者的基本情况
查看mwbops系统,【Consumer监控】-->【对应的consumerId】
如果offset数字一直在动,说明一直在消费,说明不存在问题,return;
如果offset数字一直不动,看Owner是不是有值存在
如果Owner是空,说明消费端的程序已经跟Kafka断开连接,应该排查消费端是否正常,return;
如果Owner不为空,就是有上图上面的类似于 bennu_index_benuprdapp02-1444748505181-f558155a-0 的文字,继续看下面内容
第二步:查看消费端的程序代码
看看自己的消费代码里面,存不存在处理消息的时候出异常的情况
如果有,需要try-catch一下,其实不论有没有异常,都用try-catch包一下,return;
原因:如果在处理消息的时候有异常出现,又没有进行处理,那么while循环就会跳出,线程会结束,所以不会再去取消息,就是消费停止了。
第三步:查看消费端的配置
消费端有一个配置,叫 fetch.message.max.bytes,默认是1M,此时如果有消息大于1M,会发生停止消费的情况。
此时,在配置中增加 props.put("fetch.message.max.bytes", "10 * 1024 * 1024"); 即可
return;
原因:目前Kafka集群配置的运行最大的消息大小是10M,如果客户端配置的运行接收的消息是1M,跟Kafka服务端配置的不一致,则消息大于1M的情况下,消费端就无法消费,导致一直卡在这一条消息,现象就是消费停止。