应用程序使用 KafkaConsumer 向 Kafka 订阅主题,并从订阅的主题上接收消息。
一、消费者和消费者群组
Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
假设主题T1有4个分区,消费者C1是群组G1里的唯一的消费者,消费者C1将收到主题T1全部4个分区中的消息。
在群组中新增一个消费者C2,那么每个消费者将分别从两个分区接受消息。
如果群组中有4个消费者,那么每个消费者可以分配到一个分区。
如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息。
往群组里增加消费者是横向伸缩消费能力的主要方式。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。
除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的 一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取到主题所有的消息。不同于传统的消息系统,横向伸缩Kafka消费者和消费者群组并不会对性能造成负面影响。
在上面的例子里,如果新增一个只包含一个消费者的群组 G2,那么这个消费者将从主题 T1 上接收所有的消息,与群组 G1 之间互不影响。群组 G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组 G1 那样。总的来说,群组 G2 还是会接收到所有消息,不管有没有其他群组存在。
二、消费者群组和分区再均衡
再均衡
群组里的消费者共同读取主题的分区。一个新的消费者加 入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时, 比如管理员添加了新的分区,会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者)。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
消费者通过向被指派为 群组协调器的 broker (不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息 (为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。
如果一个消费者发生崩溃,井停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。
心跳行为在最近版本中的变化
在 0.10.1 版本里,Kafka社区引入了一个独立的心跳线程,可以在轮询消息的空档发送心跳。这样一来,发送心跳的频率与消息轮询的频率之间就是相互独立的。在新版本的Kafka里,可以指定消费者在离开群组并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁。
分配分区是怎样的一个过程
当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor接口的类来决定哪些分区应该被分配给哪个消费者。
Kafka 内置了两种分配策略。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。
三、创建Kafka消费者
Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
订阅主题
//订阅一个主题 consumer.subscribe(Collections.singletonList("customerCountries")); //通过正则表达式订阅多个主题 consumer.subscribe("test.*");
如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。在Kafka和其他系统之间复制数据时,使用正则表达式的方式订阅多个主题是很常见的做法。
四、轮询
try { while (true) { //1 ConsumerRecords<String, String> records = consumer.poll(100); //2 for (ConsumerRecord<String, String> record : records) { //3 log.debug("topic = %s, partition = %s, offset = %d, key = %s, value = %s ", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } finally { consumer.close(); //4 }
//1 这是一个无限循环,消费者实际上是一个长期运行的应用程序,它通过持续轮询向Kafka请求数据。
//2 消费者必须持续对kafka进行轮询,否者会被认为已经死亡,它的分区会被移交给群组里的其他消费者。传给poll()方法的参数是一个超时时间,用于控制poll()方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为0,poll()会立即返回,否则它会在指定的毫秒数内一直等待broker返回数据。
//3 poll()方法返回一个记录列表。每条记录都包含了记录所属主题的信息,记录所在分区的信息,记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理这些记录。
//4 在退出应用程序之前使用close()方法关闭消费者,网络连接和socket也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡。
轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator, 然后加入群组,接受分配的分区。 如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。
线程安全:在同一个群组中,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程。如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。最好是把消费者的逻辑封装在自己的对象里,然后使用Java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上。
五、消费者的配置
1. fetch.min.bytes:该属性指定了消费者从服务器获取记录的最小字节数。broker在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者,这样可以降低消费者和 broker 的工作负载。
2. fetch.max.wait.ms:我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 fetch.max.wait.ms则用于指定broker的等待时间,默认是500ms。如果没有足够的数据流入Kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟。
3. max.parition.fetch.bytes:该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.parition.fetch.bytes 指定的字节。如果一个主题有 20个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。max.parition.fetch.bytes 的值必须比broker能够接收的最大消息的字节数(max.message.size)大,否则消费者可能无法读取这些消息。
4. session.timeout.ms:该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。
5. auto.offset.reset:该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理。它的默认值是latest,表示在偏移量无效的情况下,消费者将从最新的记录开始读取数据。另一个值是 earliest,表示消费者将从起始位置读取分区的记录。
6. enable.auto.commit:该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.mls 属性来控制提交的频率。
7. partition.assignment.strategy:分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。 Kafka 有两个默认的分配策略。默认使用的是Range策略。
Range:该策略会把主题的若干个连续的分区分配给消费者。假设悄费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区0和分区1,而消费者 C2 分配到这两个主题的分区2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range策略,而且分区数量无法被消费者数量整除,就会出现这种情况。
RoundRobin:该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者 C2分配分区,那么消费者 C1 将分到主题 T1 的分区0和分区2以及主题 T2 的分区1,消费者 C2 将分配到主题 T1 的分区l以及主题T2 的分区0和分区2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。
8. client.id:可以是任意字符串,broker用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。
9. max.poll.records:用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。
10. receive.buffer.bytes 和 send.buffer.bytes:socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为-1,就使用操作系统的默认值。
六、提交、偏移量
每次调用poll()方法,它总是返回由生产者写入Kafka但还没有被消费者读取过的记录,我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。Kafka不会像其他JMS队列那样需要得到消费者的确认,消费者可以使用Kafka来追踪消息在分区里的位置(偏移量)。
我们把更新分区当前位置的操作叫作提交。 消费者是如何提交偏移量的呢?
消费者往一个叫作_consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。
如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
自动提交
如果enable.auto.commit被设为true,那么每过5s,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。
不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。
假设我们仍然使用默认的 5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的 。自动提交虽然方便,不过并没有为开发者留有余地来避免重复处理消息。
提交当前偏移量
大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。
把 auto.commit.offset 设为false,让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量最简单也最可靠。这个API会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。要记住, commitSync() 将会提交由 poll() 返回的最新偏移量,所以在处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s ", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } try { consumer.commitSync(); //1 } catch (CommitFailedException e) { log.error("commit failed", e) //2 } }
//1 处理完当前批次的消息,在轮询更多的消息之前,调用commitSync()方法提交当前批次最新的偏移量。
//2 只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败,将异常记录到错误日志。
异步提交
手动提交有一个不足之处,在 broker对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。使用异步提交API,我们只管发送提交请求,无需等待 broker 的响应。
while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s ", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } //异步提交 consumer.commitAsync(); //回调方式,在broker作出响应时执行回调 consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e != null) log.error("Commit failed for offsets {}", offsets, e); } }); }
在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。
可以在异步回调里进行重试,但要注意提交顺序,可以使用一个单调递增的序列号来维护异步提交顺序,在每次提交偏移量之后或在回调里提交偏移量时递增序列号,在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,相等说明没有新的提交,可以安全进行重试,如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。
同步和异步组合提交
在关闭消费者或再均衡前的最后一次提交,要确保能够提交成功。因此,在消费者关闭前一般会组合使用 commitAsync()和 commitSync()。
try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s ", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync();//1 } } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync();//2 } finally { consumer.close(); } }
//1 如果一切正常,我们使用 commitAsync 方法来进行提交,这样速度更快,而且即使这次提交失败,下次提交很可能会成功。
//2 如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误。
提交特定的偏移量
如果想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync() 或 commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();//1 int count = 0; ... while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s ", record.topic(), record.partition(), record.offset(), record.key(), record.value()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));//2 if (count % 1000 == 0)//3 consumer.commitAsync(currentOffsets,null);//4 count++; } }
//1 用于跟踪偏移量的map。
//2 在读取每条记录后,使用期望处理的下一个消息的偏移量更新map里的偏移量,下次就从这里开始读取消息。
//3 每处理1000条记录就提交一次偏移量。
//4 调用commitAsync()和commitSync()都可以。
再均衡监听器
在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。ConsumerRebalanceListener 有两个需要实现的方法。
public void onPartitionsRevoked(Collection<TopicPartition> partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。
public void onPartitionsAssigned(Collection<TopicPartition> partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。
从特定偏移量处开始处理记录
如果你想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息,可以使用 seekToBeginning(Collection<TopicPartition> tp) 和 seekToEnd(Collection<TopicPartition> tp) 这两个方法。
seek() 方法可以从指定偏移量开始处理消息。seek() 方法可以更新我们正在使用的位置,下一次调用poll()时就可以获得正确的消息。
七、多线程实现
KafkaProducer是线程安全的,然而 KafkaConsumer 却是非线程安全的。KafkaConsumer 中定义了一个 acquire() 方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出 ConcurrentModifcationException 异常。
acquire() 方法和我们通常所说的锁(synchronized、lock等)不同,它不会造成阻塞等待,可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire() 方法和 release() 方法成对出现,表示相应的加锁和解锁操作。
KafkaConsumer非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。
多线程的实现方式有多种
1、第一种也是最常见的方式:线程封闭,即为每个线程实例化一个KafkaConsumer对象。一个线程对应一个KafkaConsumer实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。
2、与此对应的第二种方式是多个消费线程同时消费同一个分区,这个通过 assign()、seek() 等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少。
3、第三种实现方式,将处理消息模块改成多线程的实现方式,第三种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少TCP连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了。对于第一种实现方式而言,如果要做具体的位移提交,直接在 KafkaConsumerThread 中的 run() 方法里实现即可。而对于第三种实现方式,这里引入一个共享变量 offsets 来参与提交。每一个处理消息的 RecordHandler 类在处理完消息之后都将对应的消费位移保存到共享变量 offsets 中,KafkaConsumerThread 在每一次 poll() 方法之后都读取 offsets 中的内容并对其进行位移提交。