zoukankan      html  css  js  c++  java
  • spring-kafka手动提交offset

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4        xsi:schemaLocation="http://www.springframework.org/schema/beans
     5          http://www.springframework.org/schema/beans/spring-beans.xsd">
     6 
     7 
     8     <bean id="consumerProperties" class="java.util.HashMap">
     9         <constructor-arg>
    10             <map>
    11                 <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
    12                 <!-- 指定消费组名 -->
    13                 <entry key="group.id" value="friend-group"/>
    14                 <entry key="enable.auto.commit" value="false"/>
    15                 <entry key="auto.commit.interval.ms" value="1000"/>
    16                 <entry key="session.timeout.ms" value="15000"/>
    17                 <entry key="max.poll.records" value="1"/>
    18                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
    19                 <!--<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>-->
    20                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
    21             </map>
    22         </constructor-arg>
    23     </bean>
    24 
    25     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    26         <constructor-arg>
    27             <ref bean="consumerProperties"/>
    28         </constructor-arg>
    29     </bean>
    30 
    31     <!-- 消费消息的服务类 -->
    32     <bean id="messageListernerConsumerService" class="com.zhaopin.consumer.ConsumerService"/>
    33 
    34     <!-- 消费者容器配置信息 -->
    35     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    36         <constructor-arg value="friend"/>
    37         <!--<constructor-arg>
    38             <list>
    39                 <value>zptopic</value>
    40                 <value>ssmk</value>
    41                 <value>friend</value>
    42             </list>
    43         </constructor-arg>-->
    44         <property name="messageListener" ref="messageListernerConsumerService"/>
    45 
    46         <!-- 设置如何提交offset -->
    47         <property name="ackMode" value="MANUAL_IMMEDIATE"/>
    48     </bean>
    49 
    50     <!-- 单线程消息监听容器 -->
    51     <!--<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
    52         <constructor-arg ref="consumerFactory"/>
    53         <constructor-arg ref="containerProperties"/>
    54     </bean>-->
    55 
    56     <!-- 多线程消息监听容器 -->
    57     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
    58         <constructor-arg ref="consumerFactory"/>
    59         <constructor-arg ref="containerProperties"/>
    60         <property name="concurrency" value="5"/>
    61     </bean>
    62 
    63 </beans>

    消费者监听类实现AcknowledgingMessageListener这个监听器,可以实现手动提交offset:

    package com.zhaopin.consumer;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.TypeReference;
    import com.zhaopin.consumer.dto.FriendRelationDto;
    import com.zhaopin.consumer.dto.MessageDto;
    import com.zhaopin.consumer.service.FriendRelationService;
    import com.zhaopin.pojo.TbPerson;
    import com.zhaopin.service.PersonService;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.listener.AcknowledgingMessageListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Service;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * Created by SYJ on 2017/3/21.
     */
    @Service
    public class ConsumerService implements AcknowledgingMessageListener<Integer, String> {
    
        private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
        private static List<TbPerson> personList = new ArrayList<TbPerson>();
        private static final Integer INSERT_BATCH_COUNT = 50;
    
        @Autowired
        private PersonService personService;
    
        @Autowired
        private FriendRelationService friendRelationService;
    
        /**
         * 消息监听方法
         * @param record
         */
        /*@Override
        public void onMessage(ConsumerRecord<Integer, String> record) {
            logger.info("Before receiving:" + record.toString());
            String value = record.value();
            MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
            try {
                friendRelationService.process(message.getData());
            } catch (IOException e) {
                e.printStackTrace();
            }
            //insert(record);
            //insertBatch(record);
        }*/
    
        /**
         * 单个TbPerson入库
         * @param record
         */
        public void insert(ConsumerRecord<Integer, String> record){
            String value = record.value();
            TbPerson person = JSON.parseObject(value, TbPerson.class);
            personService.insert(person);
            System.out.println("Single data writing to the database:" + record);
        }
    
        /**
         * 批量TbPerson入库
         * @param record
         */
        public void insertBatch(ConsumerRecord<Integer, String> record){
            String value = record.value();
            TbPerson person = JSON.parseObject(value, TbPerson.class);
            personList.add(person);
            if (personList.size() == INSERT_BATCH_COUNT) {
                personService.insertBatch(personList);
                System.out.println("Batch data writing to the database:" + personList);
                personList.clear();
            }
        }
    
        @Override
        public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
            logger.info("Before receiving:" + record.toString());
            String value = record.value();
            MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
            try {
                friendRelationService.process(message.getData());
                acknowledgment.acknowledge();//提交offset
            } catch (IOException e) {
                e.printStackTrace();
            }
            //insert(record);
            //insertBatch(record);
        }
    }
  • 相关阅读:
    windows 按时自动化任务
    Linux libusb 安装及简单使用
    Linux 交换eth0和eth1
    I.MX6 GPS JNI HAL register init hacking
    I.MX6 Android mmm convenient to use
    I.MX6 GPS Android HAL Framework 调试
    Android GPS GPSBasics project hacking
    Python windows serial
    【JAVA】别特注意,POI中getLastRowNum() 和getLastCellNum()的区别
    freemarker跳出循环
  • 原文地址:https://www.cnblogs.com/jun1019/p/6628807.html
Copyright © 2011-2022 走看看