zoukankan      html  css  js  c++  java
  • SpringBoot整合kafka的简单应用

    引入依赖

            <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
                <version>2.7.1</version>
            </dependency>

    如果启动报错

    Caused by: java.lang.NoClassDefFoundError: org/springframework/core/log/LogAccessor
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.<init>(KafkaListenerAnnotationBeanPostProcessor.java:148)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:172)
    ... 19 common frames omitted

    就把指定版本去掉

    配置文件yml

    修改kafka连接地址 其他按需修改

    #kafka的topic名称
    kafkaTopic: topic-test
    
    spring:
      kafka:
        bootstrap-servers: 192.168.1.12:9092 #kafka连接地址
        producer:
          acks: 1  #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
          batch-size: 16384  #批量大小
          properties:
            linger.ms: 0   # 当生产端积累的消息达到batch-size或接收到消息linger.ms后,生产者就会将消息提交给kafka linger.ms为0表示每接收到一条消息就提交给kafka,这时候batch-size其实就没用了
          buffer-memory: 33554432  #生产端缓冲区大小
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: defaultConsumerGroup  # 默认的消费组ID
          enable-auto-commit: true  # 是否自动提交offset
          ## 当kafka中没有初始offset或offset超出范围时将自动重置offset
          ## earliest:重置为分区中最小的offset;
          ## latest:重置为分区中最新的offset(消费分区中新产生的数据);
          ## none:只要有一个分区不存在已提交的offset,就抛出异常;
          auto-commit-interval:
            ms: 1000
          auto-offset-reset: latest
          properties:
            session.timeout.ms: 120000    # 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
            request.timeout.ms: 180000   # 消费请求超时时间
        listener:
          missing-topics-fatal: false  # 消费监听接口监听的主题不存在时,自动创建,true时表示如果不存在启动报错
    flyway:
      connect-retries: 0  #重试次数

    消费者:

    KafkaConsumer.java

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    /**
     * @author yvioo
     */
    @Component
    public class KafkaConsumer {
    
        /**
         * 消费监听
         * @param record
         */
        @KafkaListener(topics = "${kafkaTopic}")
        public void onMessage(ConsumerRecord<?, ?> record){
            System.out.println("收到消息:topic名称:"+record.topic()+",分区:"+record.partition()+",值:"+record.value());
        }
    }

    生产者

    KafkaProducer.java

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.support.SendResult;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * @author*/
    @RestController
    public class KafkaProducer {
    
        @Value("${kafkaTopic}")
        private String kafkaTopic;
    
        @Resource
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        /**
         *  发送消息
         * @param message
         */
        @GetMapping("/send")
        public void sendMessage1(String message) {
            kafkaTemplate.send(kafkaTopic, message);
        }
    
    
        /**
         * 有发送结果回调
         * @param message
         */
        @GetMapping("/send/callback")
        public void sendMessage3(String message) {
            kafkaTemplate.send(kafkaTopic, message).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable ex) {
                    System.out.println("fail:"+ex.getMessage());
                }
    
                @Override
                public void onSuccess(SendResult<String, Object> result) {
                    System.out.println("success:topic名称:" + result.getRecordMetadata().topic() + ",分区:"
                            + result.getRecordMetadata().partition() + ",消息在分区中的标识:" + result.getRecordMetadata().offset());
                }
            });
        }
    }
    -----------------------有任何问题可以在评论区评论,也可以私信我,我看到的话会进行回复,欢迎大家指教------------------------ (蓝奏云官网有些地址失效了,需要把请求地址lanzous改成lanzoux才可以)
  • 相关阅读:
    User Get 'Access Denied' with Excel Service WebPart
    How To Search and Restore files from Site Collection Recycle Bin
    How To Collect ULS Log from SharePoint Farm
    How To Restart timer service on all servers in farm
    How to Operate SharePoint User Alerts with PowerShell
    How to get Timer Job History
    Synchronization Service Manager
    SharePoint 2007 Full Text Searching PowerShell and CS file content with SharePoint Search
    0x80040E14 Caused by Max Url Length bug
    SharePoint 2007 User Re-created in AD with new SID issue on MySite
  • 原文地址:https://www.cnblogs.com/pxblog/p/14821853.html
Copyright © 2011-2022 走看看