zoukankan      html  css  js  c++  java
  • SpringBoot整合kafka的简单应用

    引入依赖

            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.7.1</version>
            </dependency>

    如果启动报错

    Caused by: java.lang.NoClassDefFoundError: org/springframework/core/log/LogAccessor
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.<init>(KafkaListenerAnnotationBeanPostProcessor.java:148)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:172)
    ... 19 common frames omitted

    就把指定版本去掉

    配置文件yml

    修改kafka连接地址 其他按需修改

    #kafka的topic名称
    kafkaTopic: topic-test
    
    spring:
      kafka:
        bootstrap-servers: 192.168.1.12:9092 #kafka连接地址
        producer:
          acks: 1  #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
          batch-size: 16384  #批量大小
          properties:
            linger.ms: 0   # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
          buffer-memory: 33554432  #生产端缓冲区大小
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: defaultConsumerGroup  # 默认的消费组ID
          enable-auto-commit: true  # 是否自动提交offset
          ## 当kafka中没有初始offset或offset超出范围时将自动重置offset
          ## earliest:重置为分区中最小的offset;
          ## latest:重置为分区中最新的offset(消费分区中新产生的数据);
          ## none:只要有一个分区不存在已提交的offset,就抛出异常;
          auto-commit-interval:
            ms: 1000
          auto-offset-reset: latest
          properties:
            session.timeout.ms: 120000    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
            request.timeout.ms: 180000   # 消费请求超时时间
        listener:
          missing-topics-fatal: false  # 消费监听接口监听的主题不存在时,自动创建,true时表示如果不存在启动报错
    flyway:
      connect-retries: 0  #重试次数

    消费者:

    KafkaConsumer.java

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author yvioo
     */
    @Component
    public class KafkaConsumer {
    
        /**
         * 消费监听
         * @param record
         */
        @KafkaListener(topics = "${kafkaTopic}")
        public void onMessage(ConsumerRecord<?, ?> record){
            System.out.println("收到消息:topic名称:"+record.topic()+",分区:"+record.partition()+",值:"+record.value());
        }
    }

    生产者

    KafkaProducer.java

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * @author*/
    @RestController
    public class KafkaProducer {
    
        @Value("${kafkaTopic}")
        private String kafkaTopic;
    
        @Resource
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        /**
         *  发送消息
         * @param message
         */
        @GetMapping("/send")
        public void sendMessage1(String message) {
            kafkaTemplate.send(kafkaTopic, message);
        }
    
    
        /**
         * 有发送结果回调
         * @param message
         */
        @GetMapping("/send/callback")
        public void sendMessage3(String message) {
            kafkaTemplate.send(kafkaTopic, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable ex) {
                    System.out.println("fail:"+ex.getMessage());
                }
    
                @Override
                public void onSuccess(SendResult<String, Object> result) {
                    System.out.println("success:topic名称:" + result.getRecordMetadata().topic() + ",分区:"
                            + result.getRecordMetadata().partition() + ",消息在分区中的标识:" + result.getRecordMetadata().offset());
                }
            });
        }
    }
    -----------------------有任何问题可以在评论区评论,也可以私信我,我看到的话会进行回复,欢迎大家指教------------------------ (蓝奏云官网有些地址失效了,需要把请求地址lanzous改成lanzoux才可以)
  • 相关阅读:
    Medium | LeetCode 148. 排序链表 | 归并排序(递归)
    Hard | LeetCode 4. 寻找两个正序数组的中位数 | 二分法
    Medium | LeetCode 341. 扁平化嵌套列表迭代器 | 递归 | 栈
    Hard | LeetCode 312. 戳气球 | 递归+记忆化数组 | 动态规划
    如何删除万能输入法
    javaweb 怎么获取路径
    Controller 返回 json那些小事
    螺旋矩阵
    javaweb怎么使用html
    tomcat中文乱码
  • 原文地址:https://www.cnblogs.com/pxblog/p/14821853.html
Copyright © 2011-2022 走看看