zoukankan      html  css  js  c++  java
  • springboot整合kafka

    参考地址:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/

    1、pom文件

    <!--kafka-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.41</version>
            </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    </dependency>

    2、配置文件

    ########################kafka相关配置##########################################
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=172.16.0.79:9092,172.16.0.79:9093
    #=============== provider  =======================
    #retries=0,时允许重试失败的发送
    spring.kafka.producer.retries=0
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    #指定消息key和消息体的编码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    #=============== consumer  =======================
    # 指定默认消费者group id
    spring.kafka.consumer.group-id=test-consumer-group
    #当没有初始化偏移量或者偏移量不存在时,自动重置偏移量为最开始的偏移量
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    
    # 指定消息key和消息体的编解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    3、消息类

    @Data
    public class Message {
    
        private Long id;
    
        private String msg;
    
        private Date sendTime;
    }

    4、kafka消息发送类

    @Component
    @Slf4j
    public class KafkaSenderService {
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        public void sendMessage(){
            Message message = new Message();
            message.setId(System.currentTimeMillis());
            message.setMsg(UUID.randomUUID().toString());
            message.setSendTime(new Date());
            String s = JSONObject.toJSONString(message);
            log.info("+++++++++++++++++++++  message = {}", s);
         //如果主题不存在,则会自动创建 kafkaTemplate.send(
    "test",s); } }

    5、消息接收

    @Component
    @Slf4j
    public class KafkaReceiverService {
    
    
        @KafkaListener(topics="test")
        public void listen(ConsumerRecord<?,?>record){
            Optional<?> value = Optional.of(record.value());
            if (value.isPresent()){
                Object o = value.get();
    
                log.info("-----------record:"+record);
                log.info("-----------message:"+o);
            }
        }
    }
  • 相关阅读:
    Linux下c开发 之 线程通信(转)
    mount -t nfs 的使用
    window共享linux下的文件 samba
    C/C++ 的使用
    php获取格式时间和时间戳
    php压缩文件夹
    php递归删除文件夹
    php生成文件夹(递归生成)
    QQ音乐API分析记录
    $(this)与this的区别
  • 原文地址:https://www.cnblogs.com/cq-yangzhou/p/11428927.html
Copyright © 2011-2022 走看看