zoukankan      html  css  js  c++  java
  • spring-kafka消费者配置

         <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>${spring-kafka.version}</version>
            </dependency>

    目前kafka版本更新的有快,0.10版本刚用稳定,1.0都出来了,0.11版本重启的时候会偶尔会报日志文件未正常结束,如果用0.10就基本不会出现这个问题,这里主要是在开发环境会经常关闭再开,

    这里spring-kafka的版本为1.2.2.RELEASE版本

    上consumer配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:int="http://www.springframework.org/schema/integration"
           xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:task="http://www.springframework.org/schema/task"
           xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka
            http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <bean id="messagingMessageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>
    
        <!-- 定义consumer的参数 -->
        <bean id="consumerProperties" class="java.util.HashMap">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${kafka.broker.address}"/>
                    <entry key="group.id" value="${kafka.broker.groupid}"/>
                    <entry key="enable.auto.commit" value="false"/>
                    <entry key="auto.commit.interval.ms" value="1000"/>
                    <entry key="session.timeout.ms" value="15000"/>
                    <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                    <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                </map>
            </constructor-arg>
        </bean>
    
        <!-- 创建consumerFactory bean -->
        <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <ref bean="consumerProperties"/>
            </constructor-arg>
        </bean>
    
        <!-- 消费者容器配置信息 -->
        <bean id="containerProperties_flowevent" class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg value="${kafka.topic}"/>
            <property name="messageListener" ref="eventAsyncConsumerService"/>
            <property name="AckMode" value="MANUAL"/>
        </bean>
    
        <bean id="concurrentMessageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
            <constructor-arg ref="consumerFactory"/>
            <constructor-arg ref="containerProperties_flowevent"/>
            <property name="Concurrency" value="2"/>
        </bean>
    
    </beans>
    如果需要手动提交offset,那么调用acknowledgment的ack方法。但这里有一个问题,不知道是不是项目版本的问题,
    我之前的一个项目仅仅只配置enable.auto.commit为false,然后在配合acknowledgment使用是没任何问题的,到现在项目都在跑,
    @Component
    public class EventAsyncConsumerService implements AcknowledgingMessageListener<String, String> {
        private final static Logger log = LoggerFactory.getLogger(EventAsyncConsumerService.class);
    
        @Override
        public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
            if (log.isDebugEnabled()) {
                log.debug("value:" + consumerRecord.value());
                log.debug("topic:" + consumerRecord.topic());
                log.debug("partition:" + consumerRecord.partition());
                log.debug("offset:" + consumerRecord.offset());
            }
    
            try {
                EventMsg message=JSONSerializer.deserialize(consumerRecord.value(), EventMsg.class);
                EventInterface eventModel = ApplicationContextHelper.getBean(message.getEventName());
                Map result= eventModel.execute(message.getParams(),message.getPrevResult());
                // 最后 调用acknowledgment的ack方法,提交offset
                acknowledgment.acknowledge();
            } catch (Exception ex) {
                log.error("EventAsyncConsumer==Exception==>"+ex.getMessage()+ex.getStackTrace());
            }
        }
    }

    这几天升级了spring框架版本,发现不管用了。于是看了下spring-kafka源码,发现需要配置 <property name="AckMode" value="MANUAL"/>
    就ok了

    由于kafka官方最近升级很快,开发者使用的速度有点跟不上了,1.0版本在windows环境中启动会报错,这让人情何以堪,难道是必须要放到linux环境下使用的吗


  • 相关阅读:
    Spring Boot日志管理
    JProfiler
    JProfiler学习笔记
    jprofiler安装图解
    方便!C++ builder快捷键大全
    QuickFix/N简介
    QuickFIX/N入门:(三)如何配置QuickFIX/N
    java自带线程池和队列详细讲解
    SQLYog快捷键大全
    DBCP连接池配置参数说明
  • 原文地址:https://www.cnblogs.com/fangyuan303687320/p/8244230.html
Copyright © 2011-2022 走看看