前言
GenericMessageListener
@FunctionalInterface public interface GenericMessageListener<T> { void onMessage(T var1); default void onMessage(T data, Acknowledgment acknowledgment) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } default void onMessage(T data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { throw new UnsupportedOperationException("Container should never call this"); } }
接下来先浏览一下继承了GenericMessageListener接口的类。前缀为Batch的接口都是批处理类型的消息监听接口,里面的参数也都讲解过了
public interface MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data); } public interface AcknowledgingMessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment); } public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer); } public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); } public interface BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data); } public interface BatchAcknowledgingMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment); } public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer); } public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer); }
@KafkaListener参数讲解
@KafkaListener(id = "demo", topics = "topic.quick.demo") public void listen(String msgData) { log.info("demo receive : "+msgData); }
- data : 对于data值的类型其实并没有限定,根据KafkaTemplate所定义的类型来决定。data为List集合的则是用作批量消费。
- ConsumerRecord:具体消费数据类,包含Headers信息、分区信息、时间戳等
- Acknowledgment:用作Ack机制的接口
- Consumer:消费者类,使用该类我们可以手动提交偏移量、控制消费速率等功能
public void listen1(String data) public void listen2(ConsumerRecord<K,V> data) public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment) public void listen4(ConsumerRecord<K,V> data, Acknowledgment acknowledgment, Consumer<K,V> consumer) public void listen5(List<String> data) public void listen6(List<ConsumerRecord<K,V>> data) public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment) public void listen8(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment, Consumer<K,V> consumer)
接下来在看看@KafkaListener的注解都提供了什么属性。
- id:消费者的id,当GroupId没有被配置的时候,默认id为GroupId
- containerFactory:上面提到了@KafkaListener区分单数据还是多数据消费只需要配置一下注解的containerFactory属性就可以了,这里面配置的是监听容器工厂,也就是ConcurrentKafkaListenerContainerFactory,配置BeanName
- topics:需要监听的Topic,可监听多个
- topicPartitions:可配置更加详细的监听信息,必须监听某个Topic中的指定分区,或者从offset为200的偏移量开始监听
- errorHandler:监听异常处理器,配置BeanName
- groupId:消费组ID
- idIsGroup:id是否为GroupId
- clientIdPrefix:消费者Id前缀
- beanRef:真实监听容器的BeanName,需要在 BeanName前加 "__"
public @interface KafkaListener { String id() default ""; String containerFactory() default ""; String[] topics() default {}; String topicPattern() default ""; TopicPartition[] topicPartitions() default {}; String containerGroup() default ""; String errorHandler() default ""; String groupId() default ""; boolean idIsGroup() default true; String clientIdPrefix() default ""; String beanRef() default "__listener"; }
使用ConsumerRecord类消费
用ConsumerRecord类接收的好处是什么呢,ConsumerRecord类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用ConsumerRecord会是个不错的选择。
如果使用具体的类型接收消息体则更加方便,比如说用String类型去接收消息体。
这里我们编写一个consumerListener方法,监听"topic.quick.consumer" Topic,并把ConsumerRecord里面所包含的内容打印到控制台中
@Component public class SingleListener { private static final Logger log = LoggerFactory.getLogger(SingleListener.class); @KafkaListener(id = "consumer", topics = "topic.quick.consumer") public void consumerListener(ConsumerRecord<Integer, String> record) { log.info("topic.quick.consumer receive : " + record.toString()); } }
批量消费
- 重新创建一份新的消费者配置,配置为一次拉取5条消息
- 创建一个监听容器工厂,设置其为批量消费并设置并发量为5,这个并发量根据分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态
- 创建一个分区数为8的Topic
- 创建监听方法,设置消费id为batch,clientID前缀为batch,监听topic.quick.batch,使用batchContainerFactory工厂创建该监听容器
@Component public class BatchListener { private static final Logger log= LoggerFactory.getLogger(BatchListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); //一次拉取消息数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("batchContainerFactory") public ConcurrentKafkaListenerContainerFactory listenerContainer() { ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory(); container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); //设置并发量,小于或等于Topic的分区数 container.setConcurrency(5); //设置为批量监听 container.setBatchListener(true); return container; } @Bean public NewTopic batchTopic() { return new NewTopic("topic.quick.batch", 8, (short) 1); } @KafkaListener(id = "batch",clientIdPrefix = "batch",topics = {"topic.quick.batch"},containerFactory = "batchContainerFactory") public void batchListener(List<String> data) { log.info("topic.quick.batch receive : "); for (String s : data) { log.info( s); } } }
注意:设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。
注解方式获取消息头及消息体
当你接收的消息包含请求头,以及你监听方法需要获取该消息非常多的字段时可以通过这种方式,毕竟get方法代码量还是稍多点的。这里使用的是默认的监听容器工厂创建的,如果你想使用批量消费,把对应的类型改为List即可,比如List<String> data , List<Integer> key。
- @Payload:获取的是消息的消息体,也就是发送内容
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):获取发送消息的key
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID):获取当前消息是从哪个分区中监听到的
- @Header(KafkaHeaders.RECEIVED_TOPIC):获取监听的TopicName
- @Header(KafkaHeaders.RECEIVED_TIMESTAMP):获取时间戳
@KafkaListener(id = "anno", topics = "topic.quick.anno") public void annoListener(@Payload String data, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) { log.info("topic.quick.anno receive : "+ "data : "+data+" "+ "key : "+key+" "+ "partitionId : "+partition+" "+ "topic : "+topic+" "+ "timestamp : "+ts+" " ); }
使用Ack机制确认消费
Kafka的Ack机制相对于RabbitMQ的Ack机制差别比较大,刚入门Kafka的时候我也被搞蒙了,不过能弄清楚Kafka是怎么消费消息的就能理解Kafka的Ack机制了。
我先说说RabbitMQ的Ack机制,RabbitMQ的消费可以说是一次性的,也就是你确认消费后就立刻从硬盘或内存中删除,
而且RabbitMQ粗糙点来说是顺序消费,像排队一样,一个个顺序消费,未被确认的消息则会重新回到队列中,等待监听器再次消费。
但Kafka不同,Kafka是通过最新保存偏移量进行消息消费的,而且确认消费的消息并不会立刻删除,所以我们可以重复的消费未被删除的数据,
当第一条消息未被确认,而第二条消息被确认的时候,Kafka会保存第二条消息的偏移量,也就是说第一条消息再也不会被监听器所获取,除非是根据第一条消息的偏移量手动获取。
使用Kafka的Ack机制比较简单,只需简单的三步即可:
- 设置ENABLE_AUTO_COMMIT_CONFIG=false,禁止自动提交
- 设置AckMode=MANUAL_IMMEDIATE
- 监听方法加入Acknowledgment ack 参数
怎么拒绝消息呢,只要在监听方法中不调用ack.acknowledge()即可
@Component public class AckListener { private static final Logger log= LoggerFactory.getLogger(AckListener.class); private Map<String, Object> consumerProps() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean("ackContainerFactory") public ConcurrentKafkaListenerContainerFactory ackContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps())); return factory; } @KafkaListener(id = "ack", topics = "topic.quick.ack",containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack) { log.info("topic.quick.ack receive : " + record.value()); ack.acknowledge(); } }
重复消费未被Ack的消息
在这段章节开头之初我就讲解了Kafka机制会出现的一些情况,导致没办法重复消费未被Ack的消息,解决办法有如下:
1、重新将消息发送到队列中,这种方式比较简单而且可以使用Headers实现第几次消费的功能,用以下次判断
@KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { log.info("topic.quick.ack receive : " + record.value()); //如果偏移量为偶数则确认消费,否则拒绝消费 if (record.offset() % 2 == 0) { log.info(record.offset()+"--ack"); ack.acknowledge(); } else { log.info(record.offset()+"--nack"); kafkaTemplate.send("topic.quick.ack", record.value()); } }
2、使用Consumer.seek方法,重新回到该未ack消息偏移量的位置重新消费,这种可能会导致死循环,原因出现于业务一直没办法处理这条数据,但还是不停的重新定位到该数据的偏移量上。
@KafkaListener(id = "ack", topics = "topic.quick.ack", containerFactory = "ackContainerFactory") public void ackListener(ConsumerRecord record, Acknowledgment ack, Consumer consumer) { log.info("topic.quick.ack receive : " + record.value()); //如果偏移量为偶数则确认消费,否则拒绝消费 if (record.offset() % 2 == 0) { log.info(record.offset()+"--ack"); ack.acknowledge(); } else { log.info(record.offset()+"--nack"); consumer.seek(new TopicPartition("topic.quick.ack",record.partition()),record.offset() ); } }
引用:
https://www.jianshu.com/p/a64defb44a23