zoukankan      html  css  js  c++  java
  • springboot整合kafka

    参考地址:http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/

    1、pom文件

    <!--kafka-->
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.41</version>
            </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    </dependency>

    2、配置文件

    ########################kafka相关配置##########################################
    # 指定kafka 代理地址,可以多个
    spring.kafka.bootstrap-servers=172.16.0.79:9092,172.16.0.79:9093
    #=============== provider  =======================
    #retries=0,时允许重试失败的发送
    spring.kafka.producer.retries=0
    # 每次批量发送消息的数量
    spring.kafka.producer.batch-size=16384
    spring.kafka.producer.buffer-memory=33554432
    #指定消息key和消息体的编码方式
    spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
    spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
    
    #=============== consumer  =======================
    # 指定默认消费者group id
    spring.kafka.consumer.group-id=test-consumer-group
    #当没有初始化偏移量或者偏移量不存在时,自动重置偏移量为最开始的偏移量
    spring.kafka.consumer.auto-offset-reset=earliest
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=100
    
    # 指定消息key和消息体的编解码方式
    spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
    spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

    3、消息类

    @Data
    public class Message {
    
        private Long id;
    
        private String msg;
    
        private Date sendTime;
    }

    4、kafka消息发送类

    @Component
    @Slf4j
    public class KafkaSenderService {
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        public void sendMessage(){
            Message message = new Message();
            message.setId(System.currentTimeMillis());
            message.setMsg(UUID.randomUUID().toString());
            message.setSendTime(new Date());
            String s = JSONObject.toJSONString(message);
            log.info("+++++++++++++++++++++  message = {}", s);
         //如果主题不存在,则会自动创建 kafkaTemplate.send(
    "test",s); } }

    5、消息接收

    @Component
    @Slf4j
    public class KafkaReceiverService {
    
    
        @KafkaListener(topics="test")
        public void listen(ConsumerRecord<?,?>record){
            Optional<?> value = Optional.of(record.value());
            if (value.isPresent()){
                Object o = value.get();
    
                log.info("-----------record:"+record);
                log.info("-----------message:"+o);
            }
        }
    }
  • 相关阅读:
    .Net自动生成Html新闻系统V1.0 Beta 下载
    Visual Studio .NET 2003中自己找到的一个小技巧[图]
    多表连接的SQL写法(SqlServer、Oracle)
    在线人数统计 V1.0(Asp.net+ SqlServer) 源码下载
    Visual Studio 2005安装后,原来的Asp.net1.1不能执行的解决方法。
    [函数]截取固定长的字符串(双字节的计2位)
    [原创]asp.net 2.0下的自定义树(myTreeView)
    通用的数据库操作助手类
    关于时间国际化的方案
    HTTPS Cipher Suite问题
  • 原文地址:https://www.cnblogs.com/cq-yangzhou/p/11428927.html
Copyright © 2011-2022 走看看