zoukankan      html  css  js  c++  java
  • springboot整合kafka消息队列

    一、概述

    消息队列,我们“窥探”已久,终于将kafka集成到项目springboot项目里面了,这里记录下操作流程。知识的回顾;

    二、kafka服务器的安装

    服务端下载地址 ,Linux下,我选择安装最新的版本2.13,但是window系统下 ,该版本无法启动,只能选择安装kafka_2.11-2.0.0.tgz;

     Linux和window系统下解压该文件

    Linux进入cmd启动命令:zk端口默认2181,kafka默认端口9092

    ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties 

    ./kafka-server-start.sh -daemon ../config/server.properties  

    window进入cmd启动命令cmd:

    ./window/zookeeper-server-start.sh  ../config/zookeeper.properties 

    ./window/kafka-server-start.sh -daemon ../config/server.properties  

    三、java代码

    由于我的项目是springboot版本是1.5.2.RELEASE,kafka的版本只能选择spring-kafka版本1.2.1.RELEASE,否则版本冲突;

    <dependency>                    
       <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.2.1.RELEASE</version>
    </dependency>

    spring boot 项目配置

    #制定kafka代理地址
    spring.kafka.bootstrap-servers=localhost:9092
    #Kafkaf--producer---消息发送失败重试次数
    spring.kafka.producer.retries=0
    #每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    #每次批量发送消息的缓冲区大小
    spring.kafka.producer.buffer-memory=335554432
    # 指定消息key和消息体的编解码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 指定消息key和消息体的编解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Kafkaf--consumer  =======================
    # 指定默认消费者group id:commit手动提交
    spring.kafka.consumer.group-id=user-log-group
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-commit-interval=100
    spring.kafka.listener.ack-mode=manual_immediate

    Sprintboot代码编写

    producter代码

    package com.szdbgo.sale.invoicemgr.domain.service.kafka;
    
    import com.szdbgo.framework.core.constant.SaleConstant;
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    /**
     * Description  发票日志处理
     * Author justin.jia
     * Date 2021/11/27 17:11
     **/
    @Component("invoiceProducterService")
    public class InvoiceProducterService {
    
        private static Logger logger = Logger.getLogger(InvoiceProducterService.class);
    
    
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
    
        //发票上传信息查询
        public void invoiceUploadSend(String key,String value) {
            logger.info("Kafka接口准备上传发送消息为发票ID:"+ key+"****"+value);
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(SaleConstant.TOPIC_INVOICE_UPLOAD,value);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {            //发送失败的处理
                    logger.error(SaleConstant.TOPIC_INVOICE_UPLOAD + " - 生产者 发送消息失败:" + throwable.getMessage());
                }
                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    //成功的处理
                    logger.info(SaleConstant.TOPIC_INVOICE_UPLOAD + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
                }
            });
        }
    }

    customer代码

    package com.szdbgo.sale.invoicemgr.domain.service.kafka;
    
    import com.szdbgo.framework.core.constant.SaleConstant;
    import com.szdbgo.framework.core.utils.common.CommonStringUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    /**
     * Description  发票上传消费者
     * Author justin.jia
     * Date 2021/11/27 17:12
     **/
    @Component("invoiceUploadCustomerService")
    public class InvoiceUploadCustomerService {
    
        private static Logger logger = Logger.getLogger(InvoiceUploadCustomerService.class);//kafka的监听器
        @KafkaListener(topics = SaleConstant.TOPIC_INVOICE_UPLOAD)
        public void invoiceUploadCmd(ConsumerRecord<String, String> record, Acknowledgment ack) {
            //手动提交offset
            ack.acknowledge();
            String value = record.value();
            logger.info("***********接受数据,开始上传发票,发票ID:"+value);
            try{
                if(CommonStringUtils.isNotEmpty(value)) {
                    //do something
                }
            }
            catch (Exception exception){
                logger.error("发票上传处理失败");
            }
            finally {
    
            }
        }
    
    }
  • 相关阅读:
    2017ccpc全国邀请赛(湖南湘潭) E. Partial Sum
    Codeforces Round #412 C. Success Rate (rated, Div. 2, base on VK Cup 2017 Round 3)
    2017 中国大学生程序设计竞赛 女生专场 Building Shops (hdu6024)
    51nod 1084 矩阵取数问题 V2
    Power收集
    红色的幻想乡
    Koishi Loves Segments
    Wood Processing
    整数对
    Room and Moor
  • 原文地址:https://www.cnblogs.com/xibei666/p/15627910.html
Copyright © 2011-2022 走看看