同步和异步组合提交
一般情况下,针对偶尔出现的提价失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会成功,但如果这是发生在关闭消费者或者再均衡前的最后一次提交,就要确保能够提交成功。
因此,在消费者关闭前一般都会组合使用commitAsync()
和commitSync()
他们的工作原理如下
try{
while(true){
ConsumerRecords<String,String> records = consumer.poll(100);
for(ConsumerRecord<String,String> record:records){
System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s
",record.topic(),record.partition(),record.topic(),record.partition(),record.offset(),record.key(),record.value());
}
consumer.commitAsync();
}
}cache(Exception e){
log.error("Unexpected error", e);
}finally {
try{
consumer.commitSync();
}finally{
consumer.close();
}
}
提交特定的偏移量
提交偏移量的频率与处理消息批次的频率是一样的,但是如果想更频繁的提交该怎么办?如果poll()
方法返回一大批数据,为了避免因再均衡引起的重复处理批消息,想要在批次中间提交偏移量怎么办? 这种情况无法通过调用commitSync()
和commitAsync()
来实现,因为他们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。
消费者api允许在调用commitSync()
和commitAsync()
方法时提交的分区和偏移量的map,假设你处理了半个批次的消息,最后来自主题customers
分区3的消息的偏移量是5000 你可以调用commitSync()
来提交他,不过,因为消费者可能不只读取一个分区,你需要跟踪所有分区。
下面是提交特定偏移量的例子:
private Map<TopicPartition,OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
...
while(true){
ConsumerRecords<String,String> records = consumer.poll(100);
for(ConsumerRecord<String,String> record : records){
System.out.printf("topic = %s,partition = %s,offset = %d,customer = %s,country = %s
",record.topic(),record.partition(),record.offset(),record.key(),record.value());
currentOffset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset)+1,"no metadata"));
if(count* 1000 == 0)
consumer.commitAsync(currentOffsets,null);
count++;
}
}