一、概述
消息队列,我们“窥探”已久,终于将kafka集成到项目springboot项目里面了,这里记录下操作流程。知识的回顾;
二、kafka服务器的安装
服务端下载地址 ,Linux下,我选择安装最新的版本2.13,但是window系统下 ,该版本无法启动,只能选择安装kafka_2.11-2.0.0.tgz;
Linux和window系统下解压该文件
Linux进入cmd启动命令:zk端口默认2181,kafka默认端口9092
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
./kafka-server-start.sh -daemon ../config/server.properties
window进入cmd启动命令cmd:
./window/zookeeper-server-start.sh ../config/zookeeper.properties
./window/kafka-server-start.sh -daemon ../config/server.properties
三、java代码
由于我的项目是springboot版本是1.5.2.RELEASE,kafka的版本只能选择spring-kafka版本1.2.1.RELEASE,否则版本冲突;
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.1.RELEASE</version> </dependency>
spring boot 项目配置
#制定kafka代理地址 spring.kafka.bootstrap-servers=localhost:9092 #Kafkaf--producer---消息发送失败重试次数 spring.kafka.producer.retries=0 #每次批量发送消息的数量 spring.kafka.producer.batch-size=16384 #每次批量发送消息的缓冲区大小 spring.kafka.producer.buffer-memory=335554432 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer # 指定消息key和消息体的编解码方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #Kafkaf--consumer ======================= # 指定默认消费者group id:commit手动提交 spring.kafka.consumer.group-id=user-log-group spring.kafka.consumer.auto-offset-reset=earliest spring.kafka.consumer.enable-auto-commit=false spring.kafka.consumer.auto-commit-interval=100 spring.kafka.listener.ack-mode=manual_immediate
Sprintboot代码编写
producter代码
package com.szdbgo.sale.invoicemgr.domain.service.kafka; import com.szdbgo.framework.core.constant.SaleConstant; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * Description 发票日志处理 * Author justin.jia * Date 2021/11/27 17:11 **/ @Component("invoiceProducterService") public class InvoiceProducterService { private static Logger logger = Logger.getLogger(InvoiceProducterService.class); @Autowired private KafkaTemplate<String, Object> kafkaTemplate; //发票上传信息查询 public void invoiceUploadSend(String key,String value) { logger.info("Kafka接口准备上传发送消息为发票ID:"+ key+"****"+value); ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(SaleConstant.TOPIC_INVOICE_UPLOAD,value); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { //发送失败的处理 logger.error(SaleConstant.TOPIC_INVOICE_UPLOAD + " - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { //成功的处理 logger.info(SaleConstant.TOPIC_INVOICE_UPLOAD + " - 生产者 发送消息成功:" + stringObjectSendResult.toString()); } }); } }
customer代码
package com.szdbgo.sale.invoicemgr.domain.service.kafka; import com.szdbgo.framework.core.constant.SaleConstant; import com.szdbgo.framework.core.utils.common.CommonStringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; /** * Description 发票上传消费者 * Author justin.jia * Date 2021/11/27 17:12 **/ @Component("invoiceUploadCustomerService") public class InvoiceUploadCustomerService { private static Logger logger = Logger.getLogger(InvoiceUploadCustomerService.class);//kafka的监听器 @KafkaListener(topics = SaleConstant.TOPIC_INVOICE_UPLOAD) public void invoiceUploadCmd(ConsumerRecord<String, String> record, Acknowledgment ack) { //手动提交offset ack.acknowledge(); String value = record.value(); logger.info("***********接受数据,开始上传发票,发票ID:"+value); try{ if(CommonStringUtils.isNotEmpty(value)) { //do something } } catch (Exception exception){ logger.error("发票上传处理失败"); } finally { } } }