zoukankan      html  css  js  c++  java
  • 关于SpringKafka消费者的几个监听器:[一次处理单条消息和一次处理一批消息]以及[自动提交offset和手动提交offset]

    自己在使用Spring Kafka 的消费者消费消息的时候的实践总结:

    接口 KafkaDataListener 是spring-kafka提供的一个供消费者接受消息的顶层接口,也是一个空接口;
    public interface KafkaDataListener<T> {}
    
    对于消费端接收消息的时候,spring-kafka的设计思路是,提供一个顶层接口,提供两个子类,一个子类是自动提交offset的,另一个子类是手动提交offset的.
    无论是自动提交offset还是手动提交offset,又各分为两种,一种是一次只处理一条消息,另一种是一次可以处理一批消息.
    
    该 KafkaDataListener 顶层接口有两个实现类:GenericMessageListener 和 GenericAcknowledgingMessageListener,
    二者的区别是,前者是自动提交offset,后者是手动提交offset。
    
    1、 GenericMessageListener 
        该接口是自动提交offset,它的onMessage方法的参数只有一个,就是传递过来的一条消息;
        public interface GenericMessageListener<T> extends KafkaDataListener<T> {void onMessage(T data);}
        这个接口又有两个子接口:MessageListener 和 BatchMessageListener
        这两个接口也都是空接口,二者的区别是,前者一次只处理一条消息,后者一次处理一批消息.
        
        //一次处理一条消息
        //消费者如果实现该接口的话,如果配置中设置max.poll.records参数大于1的话是无效的,因为它一次只处理一条消息
        public interface MessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {}
        //一次可以处理一批消息,每一批次的消息总条数是随机的,但可以在消费者的配置中设置一个最大值(max.poll.records,
        //比如设置了最大拉取的消息条数为100,那么onMessage方法每次接受到的消息条数是随机的,但最大不会超过100)
        public interface BatchMessageListener<K, V> extends GenericMessageListener<List<ConsumerRecord<K, V>>> {}
        
    2、 GenericAcknowledgingMessageListener
        该接口是手动提交offset,它的onMessage方法的参数有两个,第一个是传递过来的一条消息,第二个参数是用于提交offset的对象
        public interface GenericAcknowledgingMessageListener<T> extends KafkaDataListener<T> {void onMessage(T data, Acknowledgment acknowledgment);}
        
        这个接口也有两个子接口:AcknowledgingMessageListener 和 BatchAcknowledgingMessageListener,这两个接口也都是空接口.
        //一次只处理一条消息,并手动提交offset,需要在消费者的配置中设置<property name="ackMode" value="MANUAL_IMMEDIATE"/>
        public interface AcknowledgingMessageListener<K, V> extends GenericAcknowledgingMessageListener<ConsumerRecord<K, V>> {}
        //一次处理一批消息,处理完这一批消息之后,在批量提交offset,需要在消费者的配置中设置<property name="ackMode" value="MANUAL"/>
        public interface BatchAcknowledgingMessageListener<K, V> extends GenericAcknowledgingMessageListener<List<ConsumerRecord<K, V>>> {}

    下面的消费者继承的是MessageListener这个监听器,就是一次处理一条消息,而且是自动提交offset:

     1 import com.alibaba.fastjson.JSON;
     2 import com.alibaba.fastjson.TypeReference;
     3 import com.xxxxxx.consumer.dto.FriendRelationDto;
     4 import com.xxxxxx.consumer.dto.MessageDto;
     5 import com.xxxxxx.consumer.service.FriendRelationService;
     6 import org.apache.kafka.clients.consumer.ConsumerRecord;
     7 import org.slf4j.Logger;
     8 import org.slf4j.LoggerFactory;
     9 import org.springframework.beans.factory.annotation.Autowired;
    10 import org.springframework.kafka.listener.MessageListener;
    11 import org.springframework.stereotype.Service;
    12 import java.io.IOException;
    13 
    14 /**
    15  * Created by SYJ on 2017/3/21.
    16  */
    17 @Service
    18 public class ConsumerService implements MessageListener<Integer, String> {
    19 
    20     private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
    21     @Autowired
    22     private FriendRelationService friendRelationService;
    23 
    24     /**
    25      * 消息监听方法
    26      * @param record
    27      */
    28     @Override
    29     public void onMessage(ConsumerRecord<Integer, String> record) {
    30         logger.info("Before receiving:" + record.toString());
    31         String value = record.value();
    32         MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
    33         try {
    34             friendRelationService.process(message.getData());
    35         } catch (IOException e) {
    36             e.printStackTrace();
    37         }
    38     }
    39 }

    下面的消费者实现的BatchMessageListener这个监听器,就是一次接受一批消息,消息的数量是随机的,但最大不会超过"max.poll.records"参数配置的数量:

     1 import com.alibaba.fastjson.JSON;
     2 import com.alibaba.fastjson.TypeReference;
     3 import com.xxxxxx.consumer.dto.FriendRelationDto;
     4 import com.xxxxxx.consumer.dto.MessageDto;
     5 import com.xxxxxx.consumer.service.FriendRelationService;
     6 import org.apache.kafka.clients.consumer.ConsumerRecord;
     7 import org.slf4j.Logger;
     8 import org.slf4j.LoggerFactory;
     9 import org.springframework.beans.factory.annotation.Autowired;
    10 import org.springframework.kafka.listener.BatchMessageListener;
    11 import org.springframework.stereotype.Service;
    12 
    13 import java.io.IOException;
    14 import java.util.List;
    15 
    16 /**
    17  * Created by SYJ on 2017/3/21.
    18  */
    19 @Service
    20 public class ConsumerService implements BatchMessageListener<Integer, String> {
    21 
    22     private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
    23     @Autowired
    24     private FriendRelationService friendRelationService;
    25 
    26     @Override
    27     public void onMessage(List<ConsumerRecord<Integer, String>> recordList) {
    28         for (ConsumerRecord<Integer, String> record : recordList) {
    29             logger.info("Before receiving:" + record.toString());
    30             String value = record.value();
    31             MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
    32             try {
    33                 friendRelationService.process(message.getData());
    34             } catch (IOException e) {
    35                 e.printStackTrace();
    36             }
    37         }
    38 
    39     }
    40 }

    下面的消费者实现的是AcknowledgingMessageListener这个监听器,它的特点是一次接收一条消息,可以通过acknowledgment来手动提交offset,需要在消费者的配置中指定<property name="ackMode" value="MANUAL_IMMEDIATE"/>:

     1 import com.alibaba.fastjson.JSON;
     2 import com.alibaba.fastjson.TypeReference;
     3 import com.xxxxxx.consumer.dto.FriendRelationDto;
     4 import com.xxxxxx.consumer.dto.MessageDto;
     5 import com.xxxxxx.consumer.service.FriendRelationService;
     6 import org.apache.kafka.clients.consumer.ConsumerRecord;
     7 import org.slf4j.Logger;
     8 import org.slf4j.LoggerFactory;
     9 import org.springframework.beans.factory.annotation.Autowired;
    10 import org.springframework.kafka.listener.AcknowledgingMessageListener;
    11 import org.springframework.kafka.support.Acknowledgment;
    12 import org.springframework.stereotype.Service;
    13 
    14 import java.io.IOException;
    15 
    16 /**
    17  * Created by SYJ on 2017/3/21.
    18  */
    19 @Service
    20 public class ConsumerService implements AcknowledgingMessageListener<Integer, String> {
    21 
    22     private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
    23     @Autowired
    24     private FriendRelationService friendRelationService;
    25 
    26     @Override
    27     public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
    28         logger.info("Before receiving:" + record.toString());
    29         String value = record.value();
    30         MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
    31         try {
    32             friendRelationService.process(message.getData());
    33             logger.info("===========开始提交offset=============");
    34             acknowledgment.acknowledge();//提交offset
    35             logger.info("===========已经提交offset=============");
    36         } catch (IOException e) {
    37             e.printStackTrace();
    38         }
    39     }
    40 }

    下面的消费者实现的是BatchAcknowledgingMessageListener这个监听器,它的特点是一次可以处理一批消息,并且可以在处理完这一批消息之后提交offset,需要在消费者的配置文件中配置"max.poll.records"参数指定本批消息可以达到的最大值,并指定<property name="ackMode" value="MANUAL"/>:

     1 import com.alibaba.fastjson.JSON;
     2 import com.alibaba.fastjson.TypeReference;
     3 import com.xxxxxx.consumer.dto.FriendRelationDto;
     4 import com.xxxxxx.consumer.dto.MessageDto;
     5 import com.xxxxxx.consumer.service.FriendRelationService;
     6 import org.apache.kafka.clients.consumer.ConsumerRecord;
     7 import org.slf4j.Logger;
     8 import org.slf4j.LoggerFactory;
     9 import org.springframework.beans.factory.annotation.Autowired;
    10 import org.springframework.kafka.listener.BatchAcknowledgingMessageListener;
    11 import org.springframework.kafka.support.Acknowledgment;
    12 import org.springframework.stereotype.Service;
    13 
    14 import java.io.IOException;
    15 import java.util.List;
    16 
    17 /**
    18  * Created by SYJ on 2017/3/21.
    19  */
    20 @Service
    21 public class ConsumerService implements BatchAcknowledgingMessageListener<Integer, String> {
    22 
    23     private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
    24     @Autowired
    25     private FriendRelationService friendRelationService;
    26     
    27 
    28     @Override
    29     public void onMessage(List<ConsumerRecord<Integer, String>> recordList, Acknowledgment acknowledgment) {
    30         logger.info("Before receiving:" + recordList.toString());
    31         logger.info("本次消息总数:" + recordList.size());
    32         for (ConsumerRecord<Integer, String> record : recordList) {
    33             String value = record.value();
    34             MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>() {
    35             });
    36             try {
    37                 friendRelationService.process(message.getData());
    38             } catch (IOException e) {
    39                 e.printStackTrace();
    40             }
    41         }
    42         logger.info("===========开始提交offset=============");
    43         acknowledgment.acknowledge();//提交offset
    44         logger.info("===========已经提交offset=============");
    45     }
    46 }

    下面是spring-kafka消费端的配置文件示例:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4        xsi:schemaLocation="http://www.springframework.org/schema/beans
     5          http://www.springframework.org/schema/beans/spring-beans.xsd">
     6     
     7     <bean id="consumerProperties" class="java.util.HashMap">
     8         <constructor-arg>
     9             <map>
    10                 <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
    11                 <!-- 指定消费组名 -->
    12                 <entry key="group.id" value="friend-group"/>
    13                 <entry key="enable.auto.commit" value="false"/>
    14                 <entry key="auto.commit.interval.ms" value="1000"/>
    15                 <entry key="session.timeout.ms" value="15000"/>
    16                 <!-- 当使用批量处理消息的时候,每次onMessage方法获取到的消息总条数虽然是随机的,但是不会超过此最大值 -->
    17                 <entry key="max.poll.records" value="50"/>
    18                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
    19                 <!--<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>-->
    20                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
    21             </map>
    22         </constructor-arg>
    23     </bean>
    24 
    25     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    26         <constructor-arg>
    27             <ref bean="consumerProperties"/>
    28         </constructor-arg>
    29     </bean>
    30 
    31     <!-- 消费消息的服务类 -->
    32     <bean id="messageListernerConsumerService" class="com.xxxxxxx.consumer.ConsumerService"/>
    33 
    34     <!-- 消费者容器配置信息 -->
    35     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    36         <constructor-arg value="friend"/>
    37         <!--<constructor-arg>
    38             <list>
    39                 <value>zptopic</value>
    40                 <value>ssmk</value>
    41                 <value>friend</value>
    42             </list>
    43         </constructor-arg>-->
    44         <property name="messageListener" ref="messageListernerConsumerService"/>
    45 
    46         <!-- 提交offset,批量提交 -->
    47         <property name="ackMode" value="MANUAL"/>
    48         <!-- 提交offset的方式,处理完一条消息就立即提交 -->
    49         <!--<property name="ackMode" value="MANUAL_IMMEDIATE"/>-->
    50     </bean>
    51 
    52     <!-- 单线程消息监听容器,每启动一个消费者客户端,只会开启一个线程来消费 -->
    53     <!--<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
    54         <constructor-arg ref="consumerFactory"/>
    55         <constructor-arg ref="containerProperties"/>
    56     </bean>-->
    57 
    58     <!-- 多线程消息监听容器,每启动一个消费者客户端,可以开启多个线程,开启多少个线程自己可以通过concurrency来指定 -->
    59     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
    60         <constructor-arg ref="consumerFactory"/>
    61         <constructor-arg ref="containerProperties"/>
    62         <property name="concurrency" value="5"/>
    63     </bean>
    64 
    65 </beans>
  • 相关阅读:
    1.7 All components require plug-in?
    1.6 Why only in China?
    1.5 A better alternative thing: React Native
    1.4 The usage of plug-in
    1.3 History of Android Plug-in Programing
    SQL Server 查询请求
    matplotlib 绘图的核心原理
    数据加密 第六篇:透明文件加密
    数据加密 第五篇:非对称密钥
    SSIS 数据类型 第二篇:变量的数据类型
  • 原文地址:https://www.cnblogs.com/jun1019/p/6636228.html
Copyright © 2011-2022 走看看