zoukankan      html  css  js  c++  java
  • spring整合kafka(配置文件方式 消费者)

    Kafka官方文档有   https://docs.spring.io/spring-kafka/reference/htmlsingle/

    这里是配置文件实现的方式

    先引入依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.0.RELEASE</version>
    </dependency>


    创建 spring-context-kafka-consumer.xml 当然要配置spring扫描该配置文件

    配置文件里边内容如下

    <!-- 基本的配置参数   可以写成配置文件或者这种${bootstrap.servers} 配置文件获取的  可以区分开发测试环境    -->
    <bean id="consumerProperties" class="java.util.HashMap">
    <constructor-arg>
    <map>
    <entry key="bootstrap.servers" value="${bootstrap.servers}" />
    <entry key="group.id" value="0" />
    <entry key="enable.auto.commit" value="true" />
    <entry key="auto.commit.interval.ms" value="1000" />
    <entry key="session.timeout.ms" value="15000" />
    <entry key="key.deserializer"
    value="org.apache.kafka.common.serialization.StringDeserializer" />
    <entry key="value.deserializer"
    value="org.apache.kafka.common.serialization.StringDeserializer" />
    </map>
    </constructor-arg>
    </bean>

    <!-- 创建工厂  然后把配置信息注入-->
    <bean id="consumerFactory"
    class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
    <ref bean="consumerProperties" />
    </constructor-arg>
    </bean>

    <!-- 把实际消费的类关联进来 -->
    <bean id="messageListernerConsumerService" class="com.test.kafkaConsumer.KafkaConsumer" />

    <!-- 然后把这个类和消费的topic注入这个container  topic也配置成灵活的 -->
    <bean id="containerProperties"
    class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg name="topics" value="${topic}"/>
    <property name="messageListener" ref="messageListernerConsumerService" />
    </bean>

    <!-- 把这个container和factory 注入 -->
    <bean id="messageListenerContainer"
    class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
    init-method="doStart">
    <constructor-arg ref="consumerFactory" />
    <constructor-arg ref="containerProperties" />
    </bean>

    <!-- 这个可以配置一个类消费多个topic   如果需要不同的类消费不同的topic 就配置多个container关联不通的类 -->


    消费的类
    public class KafkaConsumer implements MessageListener<Integer, String> {



    @Override
    public void onMessage(ConsumerRecord<Integer, String> record) {

    String value = record.value();
      }
    //因为配置文件已经关联这个类   所以只要启动spring项目   就可以监听消费配置的topic  value就是推送过来的消息

    }

     
    更多实时计算,Kafka等相关技术博文,欢迎关注实时流式计算



     
     
     
     
     
     
  • 相关阅读:
    vue---lodash的使用
    git---分支的合并
    vue---组件引入及使用的几种方式
    vue---import的几种表现形式
    mock---前端搭建模拟服务
    vue---computed计算属性的使用
    HTML禁止右键复制【两行代码实现】
    SQL Server调优系列玩转篇三(利用索引提示(Hint)引导语句最大优化运行)
    SQL Server调优系列玩转篇二(如何利用汇聚联合提示(Hint)引导语句运行)
    SQL Server调优系列玩转篇(如何利用查询提示(Hint)引导语句运行)
  • 原文地址:https://www.cnblogs.com/tree1123/p/8341764.html
Copyright © 2011-2022 走看看