zoukankan      html  css  js  c++  java
  • springboot整合kafka

    TOC

    springboot整合kafka

    参考:

    配置

    依赖

    需要web和kafka

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    注意,springboot版本对kafka版本影响不小,1.x可以使用1.x的kafka(比如1.1.1.RELEASE),2.0.x使用2.1.7.RELEASE,2.1.x使用 2.2.x.RELEASE;
    版本不对都会导致项目无法启动

    yml配置

    #============== kafka ===================
    # 指定kafka 代理地址,可以多个
    spring:
      kafka:
      #指定kafka server的地址,集群配多个,中间,逗号隔开,或者使用  列表格式  
      # - 服务1
      # - 服务2   ....
        bootstrap-servers: 192.168.88.128:9092
        #=============== provider  =======================
        producer:
          retries: 0
          # 每次批量发送消息的数量
          batch-size: 16384
          acks: 1
          #这个值只能大不能小了,否则会影响sleuth。可以使用的最大内存来缓存等待发送到server端的消息
          buffer-memory: 1048576  # 这是最小的?
          retries: 0
          # 指定消息key和消息体的编解码方式
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          properties:
            # 单个请求的最大大小(以字节为单位)
            max.request.size: 2097152
            # 从发送请求到收到ACK确认等待的最长时间(超时时间)
            request.timeout.ms: 40000
            # 这项设置设定了批量处理的更高的延迟边界:一旦我们获得某个partition的batch.size,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比这项设置要小的多,
            # 我们需要“linger”特定的时间以获取更多的消息。 这个设置默认为0,即没有延迟。设定linger.ms=5,例如,将会减少请求数目,但是同时会增加5ms的延迟。
            linger.ms: 1
            # 消息发送失败的情况下,重试发送的次数 存在消息发送是成功的,只是由于网络导致ACK没收到的重试,会出现消息被重复发送的情况
            message.send.max.retries: 0
        consumer:
          # 指定默认消费者group id
          group-id: test-consumer-group
          auto-offset-reset: earliest
          enable-auto-commit: true
          auto-commit-interval: 100
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

    测试

    消息实体

    @Data
    @Accessors(chain = true)
    @NoArgsConstructor
    public class Message {
        /**
         * id
         */
        private Long id;
        /**
         * 消息
         */
        private String msg;
        /**
         * 时间戳
         */
        private Date sendTime;
    
    }

    发送消息

    /** 消息发送方
     * @author jingshiyu
     * @date 2019/7/31 14:04:21
     * @desc
     */
    @RestController
    @Slf4j
    public class KafkaSender {
        @Autowired
        private KafkaTemplate<String,String> kafkaTemplate;
    
        @RequestMapping("/send")
        public void send(@RequestParam String msg) {
            Message message=new Message();
            message.setId(123L).setMsg(msg).setSendTime(new Date());
            kafkaTemplate.send("kafka_one", JSON.toJSONString(message));
        }
    
    }

    就这样,发送消息代码就实现了。

    这里关键的代码为 kafkaTemplate.send() 方法,kafka_one 是 Kafka 里的 topic ,这个 topic 在 Java 程序中是不需要提前在 Kafka 中设置的,因为它会在发送的时候自动创建你设置的 topic, JSON.toJSONString(message) 是消息内容

    接收消息

    /**
     * 监听服务器上的kafka是否有相关的消息发过来
     */
    @Component
    @Slf4j
    public class KafkaReceiver {
        /**
         * 定义此消费者接收topics = {"kafka_one"}的消息,与controller中的topic对应上即可
         * @param record 变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息
         */
        @KafkaListener(topics = {"kafka_one"})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            if (kafkaMessage.isPresent()) {
                Object message = kafkaMessage.get();
                log.info("----------------- record =" + record);
                log.info("------------------ message =" + message);
            }
        }
    }

    客户端 consumer 接收消息特别简单,直接用@KafkaListener 注解即可,并在监听中设置监听的 topic ,topics 是一个数组所以是可以绑定多个主题的,上面的代码中修改为 @KafkaListener(topics = {"zhisheng","tian"}) 就可以同时监听两个 topic 的消息了。需要注意的是:这里的 topic 需要和消息发送类 KafkaSender.java 中设置的 topic 一致。

    发送消息

    启动项目之后,调用接口发送消息

    http://192.168.0.173:8083/send?msg=测试消息

    将会接收到消息

    record =ConsumerRecord(topic = kafka_one, partition = 0, offset = 0, CreateTime = 1564556254952, serialized key size = -1, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":123,"msg":"测试消息","sendTime":1564556254808})
    
    message ={"id":123,"msg":"测试消息","sendTime":1564556254808}

    kafka查看

    ./kafka-topics.sh --list --zookeeper localhost:2181  在kafka上查看topic列表


    就会发现刚才我们程序中的 kafka_one 已经自己创建了





  • 相关阅读:
    sencha touch 扩展篇之将sencha touch打包成安装程序(上)- 使用sencha cmd打包安装程序
    sencha touch 扩展篇之使用sass自定义主题样式 (下)通过css修改官方组件样式以及自定义图标
    一个不错的android组件的网站
    sencha touch 扩展篇之使用sass自定义主题样式 (上)使用官方的api修改主题样式
    sencha touch 入门系列 (九) sencha touch 布局layout
    面试题总结
    国外接活网站Elance, Freelancer和ScriptLance的介绍和对比
    sencha touch 入门系列 扩展篇之sencha touch 项目打包压缩
    Android Design Support Library——Navigation View
    设计模式——命令模式
  • 原文地址:https://www.cnblogs.com/ziyue7575/p/a9f37e6a4d36636374a7f1e0fa3b9b2a.html
Copyright © 2011-2022 走看看