zoukankan      html  css  js  c++  java
  • spring整合kafka(配置文件方式 生产者)

    Kafka官方文档有   https://docs.spring.io/spring-kafka/reference/htmlsingle/

    这里是配置文件实现的方式

    先引入依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.0.RELEASE</version>
    </dependency>


    创建 spring-context-kafka-provider.xml 当然要配置spring扫描该配置文件

    配置文件如下

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 定义producer的参数 -->
    <bean id="producerProperties" class="java.util.HashMap">
    <constructor-arg>
    <map>
    <entry key="bootstrap.servers" value="kafka集群地址"/>
    <entry key="group.id" value="0}"/>
    <entry key="retries" value="10"/>
    <entry key="batch.size" value="16384"/>
    <entry key="linger.ms" value="1"/>
    <entry key="buffer.memory" value="33554432"/>
    <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
    <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
    </map>
    </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg>
    <ref bean="producerProperties"/>
    </constructor-arg>
    </bean>

    <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用templatesend消息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="producerFactory"/>
    <constructor-arg name="autoFlush" value="true"/>
    <property name="defaultTopic" value="test"/>
    </bean>

    </beans>



    在需要发消息的类里自动注入
    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;


    用kafkaTemplate就可以调用发消息的方法
     
     
     
  • 相关阅读:
    Android与互联网的交互方式有三种
    Android 本地tomcat服务器接收处理手机上传的数据之案例演示
    Android 本地tomcat服务器接收处理手机上传的数据之环境搭建
    win7 查看当前java路径
    Android 使用tomcat搭建HTTP文件下载服务器
    Android 本地搭建Tomcat服务器供真机测试
    Android org.apache.http.*找不到
    Android IndicatorSeekBar
    Volley overview
    android.database.sqlite.SQLiteCantOpenDatabaseException: unknown error(Sqlite code 14): Could not open database,(OS error
  • 原文地址:https://www.cnblogs.com/tree1123/p/8386246.html
Copyright © 2011-2022 走看看