zoukankan      html  css  js  c++  java
  • spring整合kafka(配置文件方式 生产者)

    Kafka官方文档有   https://docs.spring.io/spring-kafka/reference/htmlsingle/

    这里是配置文件实现的方式

    先引入依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.0.RELEASE</version>
    </dependency>


    创建 spring-context-kafka-provider.xml 当然要配置spring扫描该配置文件

    配置文件如下

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
    <constructor-arg>
    <map>
    <entry key="bootstrap.servers" value="kafka集群地址"/>
    <entry key="group.id" value="0}"/>
    <entry key="retries" value="10"/>
    <entry key="batch.size" value="16384"/>
    <entry key="linger.ms" value="1"/>
    <entry key="buffer.memory" value="33554432"/>
    <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
    <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
    </map>
    </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
    <ref bean="producerProperties"/>
    </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用templatesend消息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="producerFactory"/>
    <constructor-arg name="autoFlush" value="true"/>
    <property name="defaultTopic" value="test"/>
    </bean>

    </beans>



    在需要发消息的类里自动注入
    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;


    用kafkaTemplate就可以调用发消息的方法
     
     
     
  • 相关阅读:
    JS 深拷贝方法
    数字图像处理中的混叠
    RoIAlign理解
    关于python项目vscode 提示import could not be resolved的问题解决
    HTTPS网站证书申请,HTTPS的安全特性
    使用多域名SSL证书 一种免费的证书申请方式
    During secondary validation: DNS problem: query timed out looking up CAA for ***
    The Next Gen Database Servers Powering Let's Encrypt
    Jenkins 构建及回滚任务
    Go优雅追踪堆栈错误包
  • 原文地址:https://www.cnblogs.com/tree1123/p/8386246.html
Copyright © 2011-2022 走看看