zoukankan      html  css  js  c++  java
  • SpringBoot2.x 整合Kafka

    环境准备

    producer端maven依赖

        <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
        </dependency>
    

    application.properties配置

    ## Spring整合kafka
    spring.kafka.bootstrap-servers=192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
    # kafka producer 发送消息失败时的重试次数
    spring.kafka.producer.retries=3
    # 批量发送数据的配置
    spring.kafka.producer.batch-size=16384
    # 设置kafka 生产者内存缓冲区的大小(32M)
    spring.kafka.producer.buffer-memory=33554432
    # kafka消息的序列化配置
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringDeserializer
    # kafka 投递配置项
    spring.kafka.producer.acks=1
    

    生产端Service编写

    kafkaProducerService.java

    @Slf4j
    @Component
    public class KafkaProducerService {
        @Autowired
        private KafkaTemplate<String,Object> kafkaTemplate;
    
        public void sendMessage(String topic,Object object){
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);
    
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    log.error("发送消息失败:"+throwable.getMessage());
                }
    
                @Override
                public void onSuccess(SendResult<String, Object> result) {
                    log.info("发送消息成功:"+result.toString());
                }
            });
        }
    }
    

    consumer端application.xml配置

    # Spring整合kafka
    spring.kafka.bootstrap-servers=192.168.21.107:9092,192.168.21.108:9092,192.168.21.109:9092
    # kafka consumer 消息的签收机制:手工签收
    spring.kafka.consumer.enable-auto-commit=false
    # 手工签收
    spring.kafka.listener.ack-mode=manual
    # latest[默认]:在偏移量无效的情况下,消费者从最新的记录开始读取数据
    # earliest: 在偏移量无效的情况下,消费者从起始位置读取分区的进度
    spring.kafka.consumer.auto-offset-reset=earliest
    
    # 序列化配置
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization
    
    #并行度
    spring.kafka.listener.concurrency=5
    

    消费端Service

    KafkaConsumerService.java

    @KafkaListener(groupId = "group02",topics = "topic02")
        public void onMessage(ConsumerRecord<String,Object> record, Acknowledgment acknowledgment, Consumer<?,?> consumer){
            log.info("消费端接收消息:{}",record.value());
            record.value();
            //手工签收机制
            acknowledgment.acknowledge();
        }
    
  • 相关阅读:
    C++ 数字、string 简便互转
    【C语言】递归函数DigitSum(n)
    UVALIVE 4287 Proving Equivalences (强连通分量+缩点)
    【linux驱动分析】misc设备驱动
    C++ auto 与 register、static keyword 浅析
    spring 计时器
    Spring注解配置定时任务<task:annotation-driven/>
    去除ckeditor上传图片预览中的英文字母
    编码规范
    git 手动操作
  • 原文地址:https://www.cnblogs.com/shine-rainbow/p/springboot2x-zheng-hekafka.html
Copyright © 2011-2022 走看看