zoukankan      html  css  js  c++  java
  • springboot整合kafka

    TOC

    springboot整合kafka

    参考:

    配置

    依赖

    需要web和kafka

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    注意,springboot版本对kafka版本影响不小,1.x可以使用1.x的kafka(比如1.1.1.RELEASE),2.0.x使用2.1.7.RELEASE,2.1.x使用 2.2.x.RELEASE;
    版本不对都会导致项目无法启动

    yml配置

    #============== kafka ===================
    # 指定kafka 代理地址,可以多个
    spring:
      kafka:
      #指定kafka server的地址,集群配多个,中间,逗号隔开,或者使用  列表格式  
      # - 服务1
      # - 服务2   ....
        bootstrap-servers: 192.168.88.128:9092
        #=============== provider  =======================
        producer:
          retries: 0
          # 每次批量发送消息的数量
          batch-size: 16384
          acks: 1
          #这个值只能大不能小了,否则会影响sleuth。可以使用的最大内存来缓存等待发送到server端的消息
          buffer-memory: 1048576  # 这是最小的?
          retries: 0
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          properties:
            # 单个请求的最大大小(以字节为单位)
            max.request.size: 2097152
            # 从发送请求到收到ACK确认等待的最长时间(超时时间)
            request.timeout.ms: 40000
            # 这项设置设定了批量处理的更高的延迟边界:一旦我们获得某个partition的batch.size,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比这项设置要小的多,
            # 我们需要“linger”特定的时间以获取更多的消息。 这个设置默认为0,即没有延迟。设定linger.ms=5,例如,将会减少请求数目,但是同时会增加5ms的延迟。
            linger.ms: 1
            # 消息发送失败的情况下,重试发送的次数 存在消息发送是成功的,只是由于网络导致ACK没收到的重试,会出现消息被重复发送的情况
            message.send.max.retries: 0
        consumer:
          # 指定默认消费者group id
          group-id: test-consumer-group
          auto-offset-reset: earliest
          enable-auto-commit: true
          auto-commit-interval: 100
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    测试

    消息实体

    @Data
    @Accessors(chain = true)
    @NoArgsConstructor
    public class Message {
        /**
         * id
         */
        private Long id;
        /**
         * 消息
         */
        private String msg;
        /**
         * 时间戳
         */
        private Date sendTime;
    
    }

    发送消息

    /** 消息发送方
     * @author jingshiyu
     * @date 2019/7/31 14:04:21
     * @desc
     */
    @RestController
    @Slf4j
    public class KafkaSender {
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        @RequestMapping("/send")
        public void send(@RequestParam String msg) {
            Message message=new Message();
            message.setId(123L).setMsg(msg).setSendTime(new Date());
            kafkaTemplate.send("kafka_one", JSON.toJSONString(message));
        }
    
    }

    就这样,发送消息代码就实现了。

    这里关键的代码为 kafkaTemplate.send() 方法,kafka_one 是 Kafka 里的 topic ,这个 topic 在 Java 程序中是不需要提前在 Kafka 中设置的,因为它会在发送的时候自动创建你设置的 topic, JSON.toJSONString(message) 是消息内容

    接收消息

    /**
     * 监听服务器上的kafka是否有相关的消息发过来
     */
    @Component
    @Slf4j
    public class KafkaReceiver {
        /**
         * 定义此消费者接收topics = {"kafka_one"}的消息,与controller中的topic对应上即可
         * @param record 变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息
         */
        @KafkaListener(topics = {"kafka_one"})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                log.info("----------------- record =" + record);
                log.info("------------------ message =" + message);
            }
        }
    }

    客户端 consumer 接收消息特别简单,直接用@KafkaListener 注解即可,并在监听中设置监听的 topic ,topics 是一个数组所以是可以绑定多个主题的,上面的代码中修改为 @KafkaListener(topics = {"zhisheng","tian"}) 就可以同时监听两个 topic 的消息了。需要注意的是:这里的 topic 需要和消息发送类 KafkaSender.java 中设置的 topic 一致。

    发送消息

    启动项目之后,调用接口发送消息

    http://192.168.0.173:8083/send?msg=测试消息

    将会接收到消息

    record =ConsumerRecord(topic = kafka_one, partition = 0, offset = 0, CreateTime = 1564556254952, serialized key size = -1, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":123,"msg":"测试消息","sendTime":1564556254808})
    
    message ={"id":123,"msg":"测试消息","sendTime":1564556254808}

    kafka查看

    ./kafka-topics.sh --list --zookeeper localhost:2181  在kafka上查看topic列表


    就会发现刚才我们程序中的 kafka_one 已经自己创建了





  • 相关阅读:
    DOM笔记(二):Node接口
    DOM笔记(一):HTMLDocument接口
    mysql_connect v/s mysql_pconnect
    HTML 5:绘制旋转的太极图
    HTML 5:你必须知道的data属性
    CSS:7个你可能不认识的单位
    CSS 3的display:盒类型详解
    PHP:6种GET和POST请求发送方法
    asp.net mvc生命周期学习
    关于sql row_number,rank,dense_rank,ntile函数
  • 原文地址:https://www.cnblogs.com/ziyue7575/p/a9f37e6a4d36636374a7f1e0fa3b9b2a.html
Copyright © 2011-2022 走看看