zoukankan      html  css  js  c++  java
  • spring整合kafka(配置文件方式 消费者)

    Kafka官方文档有   https://docs.spring.io/spring-kafka/reference/htmlsingle/

    这里是配置文件实现的方式

    先引入依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.0.RELEASE</version>
    </dependency>


    创建 spring-context-kafka-consumer.xml 当然要配置spring扫描该配置文件

    配置文件里边内容如下

    <!-- 基本的配置参数   可以写成配置文件或者这种${bootstrap.servers} 配置文件获取的  可以区分开发测试环境    -->
    <bean id="consumerProperties" class="java.util.HashMap">
    <constructor-arg>
    <map>
    <entry key="bootstrap.servers" value="${bootstrap.servers}" />
    <entry key="group.id" value="0" />
    <entry key="enable.auto.commit" value="true" />
    <entry key="auto.commit.interval.ms" value="1000" />
    <entry key="session.timeout.ms" value="15000" />
    <entry key="key.deserializer"
    value="org.apache.kafka.common.serialization.StringDeserializer" />
    <entry key="value.deserializer"
    value="org.apache.kafka.common.serialization.StringDeserializer" />
    </map>
    </constructor-arg>
    </bean>

    <!-- 创建工厂  然后把配置信息注入-->
    <bean id="consumerFactory"
    class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
    <ref bean="consumerProperties" />
    </constructor-arg>
    </bean>

    <!-- 把实际消费的类关联进来 -->
    <bean id="messageListernerConsumerService" class="com.test.kafkaConsumer.KafkaConsumer" />

    <!-- 然后把这个类和消费的topic注入这个container  topic也配置成灵活的 -->
    <bean id="containerProperties"
    class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics" value="${topic}"/>
    <property name="messageListener" ref="messageListernerConsumerService" />
    </bean>

    <!-- 把这个container和factory 注入 -->
    <bean id="messageListenerContainer"
    class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
    init-method="doStart">
    <constructor-arg ref="consumerFactory" />
    <constructor-arg ref="containerProperties" />
    </bean>

    <!-- 这个可以配置一个类消费多个topic   如果需要不同的类消费不同的topic 就配置多个container关联不通的类 -->


    消费的类
    public class KafkaConsumer implements MessageListener<Integer, String> {



    @Override
    public void onMessage(ConsumerRecord<Integer, String> record) {

    String value = record.value();
      }
    //因为配置文件已经关联这个类   所以只要启动spring项目   就可以监听消费配置的topic  value就是推送过来的消息

    }

     
    更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算



     
     
     
     
     
     
  • 相关阅读:
    2.Spring Boot 有哪些优点?
    3.什么是 JavaConfig?
    4.如何重新加载 Spring Boot 上的更改,而无需重新启动服务器?
    Java中的异常处理机制的简单原理和应用。
    垃圾回收的优点和原理。并考虑2种回收机制。
    我们在web应用开发过程中经常遇到输出某种编码的字符,如iso8859-1等,如何输出一个某种编码的字符串?
    Request对象的主要方法:
    JSP的内置对象及方法。
    Servlet执行时一般实现哪几个方法?
    说说你所熟悉或听说过的j2ee中的几种常用模式?及对设计模式的一些看法
  • 原文地址:https://www.cnblogs.com/tree1123/p/8341764.html
Copyright © 2011-2022 走看看