应用程序使用KafkaConsumer向Kafka订阅主题,并从订阅的主题上接受消息。
消费者和消费者群组
消费者对象用于订阅主题并接收消息,然后验证消息并保存结果。
Kafka消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
如果一个消费者群组里的消费者数量大于主题分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。
一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它选取的分区将由群组里的其他消费者来读取。
往群组里增加消费者是横向伸缩消费能力的主要方式。
我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。
不同于传统的消息系统,横向伸缩Kafka消费者和消费者群组并不会对性能造成负面影响。
分区再均衡
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。
再均衡为消费者群组带来了高可用性和伸缩性。
正常情况下,我们并不希望发生这样的行为。
在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向被指派群组协调器的broker(不同群组可以有不同的协调器)发送心跳来维持他们的群组的从属关系以及他们对分区的所有权关系。
只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。
消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。
如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。
在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
如果需要处理耗费较长时间的消息,可以加大max.poll.interval.ms的值来增加轮询间隔的市场。
分区分配
当消费者要加入群组时,它会像群组协调器发送一个JoinGroup请求。第一个加入群组的消费者将成为群主。
群主从协调器那里获得群组的成员列表,并负责给每一个消费者分配分区。它使用一个实现了PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。
Kafka内置了两种分配策略。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些消息发送给所有消费者。
每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。
这个过程会在每次再均衡时重复发生。
轮询
消息轮询时消费者API的核心,通过一个简单的轮询向服务器请求数据。
一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调,分区再均衡,发送心跳和获取数据。
消费者必须持续对Kafka进行轮询(poll),否则会被认为已经死亡。
poll()方法会返回一个记录列表。每条记录都包含了记录所属主题的信息,记录所在分区的信息,记录在分区里的偏移量,以及记录的键值对。
在推出应用程序之前使用close()方法关闭消费者。网络连接和socker也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡,因为那样需要更长时间,导致整个群组在一段时间内无法读取消息。
轮询不只是获取数据那么简单,在第一次调用新消费者的poll()方法时,它会负责查找GroupCoordinator,然后加入群组,接受分配的分区。
如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。
在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。
按照规则,一个消费者使用使用一个县城。
如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。
最好是把消费者的逻辑封装在自己的对象里,然后使用Java的线程池启动多个线程,使每个消费者运行在自己的线程上。
消费者配置
bootstrap.servers
指定定了Kafka集群的连接字符串。
该参数为必选参数。
group.id
指定KafkaConsumer的消费者群组。
创建不属于任何一个群组的消费者也是可以的,但是这样做不太常见。
该参数非必选。
key.deserializer
使用指定类将字节数组转成Java对象。
该参数为必选参数。
value.deserializer
使用指定类将字节数组转成Java对象。
该参数为必选参数。
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。
broker在收到消费者的数据请求时,如果可用的数据量下雨fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。
这样可以降低消费者和broker的工作负载,因为他们在主题不是很活跃的时候就不需要来来回回地处理消息。
如果没有很多可用数据,但消费者的CPU使用率很高,那么久需要把该属性的值设的比默认值大。
如果消费者的数量比较多,把该属性的值设置大一点可以降低broker的工作负载。
fetch.max.wait.ms
用于指定broker的等待时间,默认是500ms。
如果没有足够的数据流入Kafka(fetch.min.bytes),消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟。
如果要降低潜在的延迟(为了满足SLA),可以把该参数值设置的小一些。
max.partition.fetch.bytes
该属性指定了从每个分区里返回给消费者的最大字节数。默认值是1MB。
也就是说,KafkaComsumer.poll()方法从每个分区里返回的记录最多不超过max.partition.fetch.bytes指定的字节。
在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes的值必须必broker能够接收的最大消息的字节数(max.message.size)大。
否则消费者可能无法读取这些消息,导致消费者一直挂起重试。
另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll()方法来避免回话过期和发生分区再均衡,如果单次调用poll()返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。
session.timeout.ms
该属性制定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是3s。
如果消费者没有在session.timeout.ms指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。
该属性与heartbeat.interval.ms紧密相关。heartbeat.interval.ms指定了poll()方法向协调器发送心跳的频率,session.timeout.ms指定了消费者可以多久不发送心跳。
所以,一般需要同时修改这两个属性,heartbeat.interval.ms必须比session.timeout.ms小,一般是session.timeout.ms的1/3。
把session.timeout.ms值设的比默认值小,可以更快地检测和恢复崩溃的结点,不过长时间的轮询或垃圾手机可能导致非预期的再均衡。
把该属性的值设置得大一些,可以减少意外的再均衡,不过检测结点崩溃需要更长的时间。
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。
默认值是latest。(在偏移量无效的情况下,消费者将从最新的记录开始读取数据)
另一个值是earliest。(在偏移量无效的情况下,消费者从起始位置读取分区的记录)
enable.auto.commit
该属性指定了消费者是否自动提交偏移量,默认值是true。
为了尽量避免出现重复和数据丢失,可以把它设为false,由自己控制何时提交偏移量。
如果把它设为设为true,还可以通过皮配置auto.commit.interval.ms属性来控制提交的频率(默认5s)。
partition.assignmeng.strategy
指定分配策略。
我们知道,分区会被分配给群组里的消费者。PartitionAssignor根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。
Kafka提供两个默认的分配策略。
Range(默认值)
该策略会把主题的若干个连续的分区分配给消费者。(分配是在主题内独立完成的,所以在有多个主题的情况下容易造成分配不均衡。)
RoundRobin
该策略把主题的所有分区逐个分配给消费者。
可以自定义策略。
client.id
该属性可以是任意字符串,broker用它来标识从客户端发送过来的消息,通常被用在日志,度量指定和配额里。
max.poll.records
该属性用于控制单次调用call()方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
receive.buffer.bytes和send.buffer.bytes
socket在读写数据时用到的TCP缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。
如果生产者或消费者与broker
提交和偏移量
我们把更新分区当前位置的操作叫做提交。
消费者往一个叫做_consumer_offset的特殊主体发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用于。
不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡。
完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。
为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
自动提交
如果enable.auto.commit被设为true,那么每过5s,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔有auto.commit.interval.ms控制,默认值是5s.
自动提交也是在轮询里进行的。
消费者每次再进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。
可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过还是无法完全避免消息的重复处理。
在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用close()方法之前也会进行自动提交)。
自动提交虽然方便,不过并没有为开发者留有余地来避免重复处理消息。
提交偏移量
大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者API提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。
把auto.commit.offset设为false,让应用程序决定何时提交偏移量。
同步提交
使用commitSync()提交偏移量最简单也最可靠。这个API会提交有poll()方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至成功。
异步提交
手动提交有一个不足之处,在broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。
我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。
这个时候我们可以使用异步提交commitAsync(),只管发送提交请求,无需等待broker的响应。
commitAsync()发生异常不会重试。
commitAsync()支持回调。回调经常被用于记录提交错误或生成度量指标,不过如果你要用它来进行重试,一定要注意提交的顺序。
同步和异步组合提交
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。
因此,在消费者关闭前一般会组合使用commitAsync()(轮询中使用)和commitSync()(finally中使用).
提交特定偏移量
commitSync()或commitAsync()只会提交最后一个偏移量,如果需要提交非最后一个偏移量,可以使用重载方法commitSync(Map<TopicPartition, OffsetAndMetadata>),commitAsync(Map<TopicPartition, OffsetAndMetadata>)。
不过,因为消费者可能不止读取一个分区,你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。