zoukankan      html  css  js  c++  java
  • kafka与Spring的集成

    准备工作

    kafka版本:kafka_2.10-0.10.1.0

    spring版本:spring4.3

    配置文件

    pom文件配置(也可以直接下载jar包)

    Kafkaspring集成的支持类库,springkafka通信监听

    1 <dependency>
    2   <groupId>org.springframework.integration</groupId>
    3   <artifactId>spring-integration-kafka</artifactId>
    4   <version>1.3.0.RELEASE</version>
    5 </dependency>

    kafka发送消息以及接受消息使用的类库

    1 <dependency>
    2     <groupId>org.apache.kafka</groupId>
    3     <artifactId>kafka-clients</artifactId>
    4     <version>0.10.1.0</version>
    5 </dependency>

    使用高版本是因为低版本无法支持kafka监听,springkafka集成不好

    1 <dependency>
    2     <groupId>org.springframework</groupId>
    3     <artifactId>spring-webmvc</artifactId>
    4     <version>4.3.0.RELEASE</version>
    5 </dependency>

    kafka自带监听器,依赖于spring,所以需要和pring-integration-kafka结合使用

    1 <dependency>
    2     <groupId>org.springframework.kafka</groupId>
    3     <artifactId>spring-kafka</artifactId>
    4     <version>1.0.0.RC1</version>
    5 </dependency>

    producer配置

      1.如果你的topic没有设置名称,按照默认的topic的名字生成对应的数据文件夹。

      2.producerListener用来判断kafka发送数据是否成功以及发送反馈信息。

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
     4     xsi:schemaLocation="http://www.springframework.org/schema/beans
     5          http://www.springframework.org/schema/beans/spring-beans.xsd
     6          http://www.springframework.org/schema/context
     7          http://www.springframework.org/schema/context/spring-context.xsd">
     8 
     9     <!-- 定义producer的参数 -->
    10     <bean id="producerProperties" class="java.util.HashMap">
    11         <constructor-arg>
    12             <map>
    13                 <entry key="bootstrap.servers" value="localhost:7000" />
    14                 <entry key="group.id" value="0" />
    15                 <entry key="retries" value="1" />
    16                 <entry key="batch.size" value="16384" />
    17                 <entry key="linger.ms" value="1" />
    18                 <entry key="buffer.memory" value="33554432" />
    19                 <entry key="key.serializer"
    20                 value="org.apache.kafka.common.serialization.StringSerializer" />
    21                 <entry key="value.serializer"
    22                 value="org.apache.kafka.common.serialization.StringSerializer" />
    23             </map>
    24         </constructor-arg>
    25     </bean>
    26 
    27     <!-- 创建kafkatemplate需要使用的producerfactory bean -->
    28     <bean id="producerFactory"
    29         class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    30         <constructor-arg>
    31             <ref bean="producerProperties" />
    32         </constructor-arg>
    33     </bean>
    34 
    35     <!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
    36     <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    37         <constructor-arg ref="producerFactory" />
    38         <constructor-arg name="autoFlush" value="true" />
    39         <property name="defaultTopic" value="defaultTopic" />
    40         <property name="producerListener" ref="producerListener"/>
    41     </bean>
    42     
    43     <bean id="producerListener" class="com.git.kafka.producer.KafkaProducerListener" /> 
    44 </beans>

    consumer配置

      1.使用kafka的listener进行消息消费监听,如果有消费消息进入会自动调用OnMessage方法进行消息消费以及后续业务处理。

      2.如果要配置多个topic,需要创建新的消费者容器,然后统一指向listner的消息处理类,统一让这个类进行后续业务处理。

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4      xmlns:context="http://www.springframework.org/schema/context"
     5      xsi:schemaLocation="http://www.springframework.org/schema/beans 
     6      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
     7      http://www.springframework.org/schema/tx 
     8      http://www.springframework.org/schema/tx/spring-tx-3.0.xsd 
     9      http://www.springframework.org/schema/jee 
    10      http://www.springframework.org/schema/jee/spring-jee-3.0.xsd 
    11      http://www.springframework.org/schema/context 
    12       http://www.springframework.org/schema/context/spring-context-3.0.xsd">
    13        
    14     
    15     <!-- 定义consumer的参数 -->
    16      <bean id="consumerProperties" class="java.util.HashMap">
    17          <constructor-arg>
    18              <map>
    19                  <entry key="bootstrap.servers" value="127.0.0.1:7000"/>
    20                  <entry key="group.id" value="0"/>
    21                  <entry key="enable.auto.commit" value="false"/>
    22                  <entry key="auto.commit.interval.ms" value="1000"/>
    23                  <entry key="session.timeout.ms" value="15000"/>
    24                  <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
    25                  <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
    26              </map>
    27          </constructor-arg>
    28      </bean>
    29      
    30      <!-- 创建consumerFactory bean -->
    31      <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    32          <constructor-arg>
    33              <ref bean="consumerProperties"/>
    34          </constructor-arg>
    35      </bean>
    36      
    37      <!-- 实际执行消息消费的类 -->
    38      <bean id="messageListernerConsumerService" class="com.git.kafka.consumer.KafkaConsumerServer"/>
    39      
    40      <!-- 消费者容器配置信息 -->
    41      <bean id="containerProperties_trade" class="org.springframework.kafka.listener.config.ContainerProperties">
    42          <constructor-arg value="order_test_topic"/>
    43          <property name="messageListener" ref="messageListernerConsumerService"/>
    44      </bean>
    45      <bean id="containerProperties_other" class="org.springframework.kafka.listener.config.ContainerProperties">
    46          <constructor-arg value="other_test_topic"/>
    47          <property name="messageListener" ref="messageListernerConsumerService"/>
    48      </bean>
    49      
    50      <!-- 创建messageListenerContainer bean,使用的时候,只需要注入这个bean -->
    51      <bean id="messageListenerContainer_trade" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 
    52          init-method="doStart">
    53          <constructor-arg ref="consumerFactory"/>
    54          <constructor-arg ref="containerProperties_trade"/>
    55      </bean>
    56      
    57      <bean id="messageListenerContainer_other" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" 
    58          init-method="doStart">
    59          <constructor-arg ref="consumerFactory"/>
    60          <constructor-arg ref="containerProperties_other"/>
    61      </bean>
    62      
    63 </beans>

    applicationContext配置

    1 <import resource="classpath:kafkaConsumer.xml" />
    2 <import resource="classpath:kafkaProducer.xml" />

    具体实现

    constant.java  //常量类

     1 package com.git.kafka.constant;
     2 
     3 /**
     4  * kafkaMessageConstant
     5  * @author wangb
     6  *
     7  */
     8 public class KafkaMesConstant {
     9 
    10     public static final String SUCCESS_CODE = "00000";
    11     public static final String SUCCESS_MES = "成功";
    12     
    13     /*kakfa-code*/
    14     public static final String KAFKA_SEND_ERROR_CODE = "30001";
    15     public static final String KAFKA_NO_RESULT_CODE = "30002";
    16     public static final String KAFKA_NO_OFFSET_CODE = "30003";
    17     
    18     /*kakfa-mes*/
    19     public static final String KAFKA_SEND_ERROR_MES = "发送消息超时,联系相关技术人员";
    20     public static final String KAFKA_NO_RESULT_MES = "未查询到返回结果,联系相关技术人员";
    21     public static final String KAFKA_NO_OFFSET_MES = "未查到返回数据的offset,联系相关技术人员";
    22     
    23     
    24 }

    KafkaConsumerServer.java  //消费者监听

     1 package com.git.kafka.consumer;
     2 
     3 import org.apache.kafka.clients.consumer.ConsumerRecord;
     4 import org.slf4j.Logger;
     5 import org.slf4j.LoggerFactory;
     6 import org.springframework.kafka.listener.MessageListener;
     7 
     8 /**
     9  * kafka监听器启动
    10  * 自动监听是否有消息需要消费
    11  * @author wangb
    12  *
    13  */
    14 public class KafkaConsumerServer implements MessageListener<String, String> {
    15     protected final Logger LOG = LoggerFactory.getLogger("kafkaConsumer");
    16     /**
    17      * 监听器自动执行该方法
    18      *     消费消息
    19      *     自动提交offset
    20      *     执行业务代码
    21      *     (high level api 不提供offset管理,不能指定offset进行消费)
    22      */
    23     @Override
    24     public void onMessage(ConsumerRecord<String, String> record) {
    25         LOG.info("=============kafkaConsumer开始消费=============");
    26         String topic = record.topic();
    27         String key = record.key();
    28         String value = record.value();
    29         long offset = record.offset();
    30         int partition = record.partition();
    31         LOG.info("-------------topic:"+topic);
    32         LOG.info("-------------value:"+value);
    33         LOG.info("-------------key:"+key);
    34         LOG.info("-------------offset:"+offset);
    35         LOG.info("-------------partition:"+partition);
    36         LOG.info("~~~~~~~~~~~~~kafkaConsumer消费结束~~~~~~~~~~~~~");
    37     }
    38 
    39 }

    kafkaProducerListener.java  //生产者监听-打印日志

    package com.git.kafka.producer;
    
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.support.ProducerListener;
    
    /**
     * kafkaProducer监听器,在producer配置文件中开启
     * @author wangb
     *
     */
    @SuppressWarnings("rawtypes")
    public class KafkaProducerListener implements ProducerListener{
        protected final Logger LOG = LoggerFactory.getLogger("kafkaProducer");
        /**
         * 发送消息成功后调用
         */
        @Override
        public void onSuccess(String topic, Integer partition, Object key,
                Object value, RecordMetadata recordMetadata) {
            LOG.info("==========kafka发送数据成功(日志开始)==========");
            LOG.info("----------topic:"+topic);
            LOG.info("----------partition:"+partition);
            LOG.info("----------key:"+key);
            LOG.info("----------value:"+value);
            LOG.info("----------RecordMetadata:"+recordMetadata);
            LOG.info("~~~~~~~~~~kafka发送数据成功(日志结束)~~~~~~~~~~");
        }
    
        /**
         * 发送消息错误后调用
         */
        @Override
        public void onError(String topic, Integer partition, Object key,
                Object value, Exception exception) {
            LOG.info("==========kafka发送数据错误(日志开始)==========");
            LOG.info("----------topic:"+topic);
            LOG.info("----------partition:"+partition);
            LOG.info("----------key:"+key);
            LOG.info("----------value:"+value);
            LOG.info("----------Exception:"+exception);
            LOG.info("~~~~~~~~~~kafka发送数据错误(日志结束)~~~~~~~~~~");
            exception.printStackTrace();
        }
    
        /**
         * 方法返回值代表是否启动kafkaProducer监听器
         */
        @Override
        public boolean isInterestedInSuccess() {
            LOG.info("///kafkaProducer监听器启动///");
            return true;
        }
    
    }

    KafkaProducerServer.java  //生产者

    package com.git.kafka.producer;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ExecutionException;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.util.concurrent.ListenableFuture;
    
    import com.alibaba.fastjson.JSON;
    import com.git.kafka.constant.KafkaMesConstant;
    
    /**
     * kafkaProducer模板
     *     使用此模板发送消息
     * @author wangb
     *
     */
    @Component
    public class KafkaProducerServer{
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
        
        /**
         * kafka发送消息模板
         * @param topic 主题
         * @param value    messageValue
         * @param ifPartition 是否使用分区 0是1不是
         * @param partitionNum 分区数 如果是否使用分区为0,分区数必须大于0
         * @param role 角色:bbc app erp...
         */
        public Map<String,Object> sndMesForTemplate(String topic, Object value, String ifPartition, 
                Integer partitionNum, String role){
            String key = role+"-"+value.hashCode();
            String valueString = JSON.toJSONString(value);
            if(ifPartition.equals("0")){
                //表示使用分区
                int partitionIndex = getPartitionIndex(key, partitionNum);
                ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, partitionIndex, key, valueString);
                Map<String,Object> res = checkProRecord(result);
                return res;
            }else{
                ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, valueString);
                Map<String,Object> res = checkProRecord(result);
                return res;
            }
        }
    
        /**
         * 根据key值获取分区索引
         * @param key
         * @param partitionNum
         * @return
         */
        private int getPartitionIndex(String key, int partitionNum){
            if (key == null) {
                Random random = new Random();
                return random.nextInt(partitionNum);
            }
            else {
                int result = Math.abs(key.hashCode())%partitionNum;
                return result;
            }
        }
        
        /**
         * 检查发送返回结果record
         * @param res
         * @return
         */
        @SuppressWarnings("rawtypes")
        private Map<String,Object> checkProRecord(ListenableFuture<SendResult<String, String>> res){
            Map<String,Object> m = new HashMap<String,Object>();
            if(res!=null){
                try {
                    SendResult r = res.get();//检查result结果集
                    /*检查recordMetadata的offset数据,不检查producerRecord*/
                    Long offsetIndex = r.getRecordMetadata().offset();
                    if(offsetIndex!=null && offsetIndex>=0){
                        m.put("code", KafkaMesConstant.SUCCESS_CODE);
                        m.put("message", KafkaMesConstant.SUCCESS_MES);
                        return m;
                    }else{
                        m.put("code", KafkaMesConstant.KAFKA_NO_OFFSET_CODE);
                        m.put("message", KafkaMesConstant.KAFKA_NO_OFFSET_MES);
                        return m;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
                    m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
                    return m;
                } catch (ExecutionException e) {
                    e.printStackTrace();
                    m.put("code", KafkaMesConstant.KAFKA_SEND_ERROR_CODE);
                    m.put("message", KafkaMesConstant.KAFKA_SEND_ERROR_MES);
                    return m;
                }
            }else{
                m.put("code", KafkaMesConstant.KAFKA_NO_RESULT_CODE);
                m.put("message", KafkaMesConstant.KAFKA_NO_RESULT_MES);
                return m;
            }
        }
        
    
    }

    KafkaProducerTest.java  //kafka生产者测试(消费者使用spring启动监听,自动执行onMessage方法)

    package com.git.test;
    
    import java.util.Map;
    
    import com.git.kafka.producer.KafkaProducerServer;
    
    public class KafkaProducerTest {
        public static void main(String[] args) {
            
            KafkaProducerServer kafkaProducer = new KafkaProducerServer();
            String topic = "orderTopic";
            String value = "test";
            String ifPartition = "0";
            Integer partitionNum = 3;
            String role = "test";//用来生成key
            Map<String,Object> res = kafkaProducer.sndMesForTemplate
                    (topic, value, ifPartition, partitionNum, role);
            
            System.out.println("测试结果如下:===============");
            String message = (String)res.get("message");
            String code = (String)res.get("code");
            
            System.out.println("code:"+code);
            System.out.println("message:"+message);
        }
    }

    具体项目代码

    项目地址:https://git.oschina.net/wsmd/kafka-0.10-demo

  • 相关阅读:
    获取yyyymmdd hh:ii:ss形式的日期时间
    详解SQL Server如何链接远程MySQL
    SET QUERY_GOVERNOR_COST_LIMIT
    STR函数将数字数据转换成字符数据
    表的转置
    C#中时间的Ticks属性
    创建CheckBox样式的下拉列表
    HTML DOM whiteSpace 属性
    TRUNCATE TABLE
    NFS服务配置.
  • 原文地址:https://www.cnblogs.com/wangb0402/p/6187796.html
Copyright © 2011-2022 走看看