zoukankan      html  css  js  c++  java
  • springboot整合kafka消息队列

    一、概述

    消息队列,我们“窥探”已久,终于将kafka集成到项目springboot项目里面了,这里记录下操作流程。知识的回顾;

    二、kafka服务器的安装

    服务端下载地址 ,Linux下,我选择安装最新的版本2.13,但是window系统下 ,该版本无法启动,只能选择安装kafka_2.11-2.0.0.tgz;

     Linux和window系统下解压该文件

    Linux进入cmd启动命令:zk端口默认2181,kafka默认端口9092

    ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties 

    ./kafka-server-start.sh -daemon ../config/server.properties  

    window进入cmd启动命令cmd:

    ./window/zookeeper-server-start.sh  ../config/zookeeper.properties 

    ./window/kafka-server-start.sh -daemon ../config/server.properties  

    三、java代码

    由于我的项目是springboot版本是1.5.2.RELEASE,kafka的版本只能选择spring-kafka版本1.2.1.RELEASE,否则版本冲突;

    <dependency>                    
       <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>1.2.1.RELEASE</version>
    </dependency>

    spring boot 项目配置

    #制定kafka代理地址
    spring.kafka.bootstrap-servers=localhost:9092
    #Kafkaf--producer---消息发送失败重试次数
    spring.kafka.producer.retries=0
    #每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    #每次批量发送消息的缓冲区大小
    spring.kafka.producer.buffer-memory=335554432
    # 指定消息key和消息体的编解码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    # 指定消息key和消息体的编解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Kafkaf--consumer  =======================
    # 指定默认消费者group id:commit手动提交
    spring.kafka.consumer.group-id=user-log-group
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=false
    spring.kafka.consumer.auto-commit-interval=100
    spring.kafka.listener.ack-mode=manual_immediate

    Sprintboot代码编写

    producter代码

    package com.szdbgo.sale.invoicemgr.domain.service.kafka;
    
    import com.szdbgo.framework.core.constant.SaleConstant;
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.util.concurrent.ListenableFuture;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    /**
     * Description  发票日志处理
     * Author justin.jia
     * Date 2021/11/27 17:11
     **/
    @Component("invoiceProducterService")
    public class InvoiceProducterService {
    
        private static Logger logger = Logger.getLogger(InvoiceProducterService.class);
    
    
        @Autowired
        private KafkaTemplate<String, Object> kafkaTemplate;
    
    
        //发票上传信息查询
        public void invoiceUploadSend(String key,String value) {
            logger.info("Kafka接口准备上传发送消息为发票ID:"+ key+"****"+value);
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(SaleConstant.TOPIC_INVOICE_UPLOAD,value);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {            //发送失败的处理
                    logger.error(SaleConstant.TOPIC_INVOICE_UPLOAD + " - 生产者 发送消息失败:" + throwable.getMessage());
                }
                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    //成功的处理
                    logger.info(SaleConstant.TOPIC_INVOICE_UPLOAD + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
                }
            });
        }
    }

    customer代码

    package com.szdbgo.sale.invoicemgr.domain.service.kafka;
    
    import com.szdbgo.framework.core.constant.SaleConstant;
    import com.szdbgo.framework.core.utils.common.CommonStringUtils;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.log4j.Logger;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.support.Acknowledgment;
    import org.springframework.stereotype.Component;
    
    /**
     * Description  发票上传消费者
     * Author justin.jia
     * Date 2021/11/27 17:12
     **/
    @Component("invoiceUploadCustomerService")
    public class InvoiceUploadCustomerService {
    
        private static Logger logger = Logger.getLogger(InvoiceUploadCustomerService.class);//kafka的监听器
        @KafkaListener(topics = SaleConstant.TOPIC_INVOICE_UPLOAD)
        public void invoiceUploadCmd(ConsumerRecord<String, String> record, Acknowledgment ack) {
            //手动提交offset
            ack.acknowledge();
            String value = record.value();
            logger.info("***********接受数据,开始上传发票,发票ID:"+value);
            try{
                if(CommonStringUtils.isNotEmpty(value)) {
                    //do something
                }
            }
            catch (Exception exception){
                logger.error("发票上传处理失败");
            }
            finally {
    
            }
        }
    
    }
  • 相关阅读:
    取得元素节点的默认display值
    mass Framework emitter模块 v2
    memset函数详细说明
    八大排序算法总结
    电脑很卡,怎么办?这里帮你解决
    Android APK反编译详解(附图)
    java环境变量配置
    如何使用U盘安装操作系统 安装GHOST XP, xp纯净版
    c# WinForm开发 DataGridView控件的各种操作总结(单元格操作,属性设置)
    Js apply 方法 详解
  • 原文地址:https://www.cnblogs.com/xibei666/p/15627910.html
Copyright © 2011-2022 走看看