zoukankan      html  css  js  c++  java
  • SprinigBoot整合Kafka

    1、POM依赖

    <!--kafka-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!--lombok-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    <!--fastjson-->
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.58</version>
    </dependency>
    

    2、配置文件

    序列化的地方,注意别写错了。。

    # Kafka
    spring:
      kafka:
        bootstrap-servers: 192.168.88.200:9092  #kafka的主机和端口
        consumer:
          group-id: myGroup
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    3、编码

    3.1 消息类

    import lombok.Data;
    import lombok.experimental.Accessors;
    import java.time.LocalDateTime;
    
    @Data
    @Accessors(chain = true)
    public class Message {
    
        private String id;
        private String name;
        private LocalDateTime sendTime;
    
    }
    

    3.2 生产者

    import com.agan.pojo.Message;
    import com.alibaba.fastjson.JSON;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Component;
    
    import java.time.LocalDateTime;
    import java.util.UUID;
    
    @Component
    public class KafkaSender {
    
        protected static final String TOPIC = "myTopic";
    
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        public void send(String name) {
            Message message = new Message();
            message.setId(UUID.randomUUID().toString()).setName(name).setSendTime(LocalDateTime.now());
            kafkaTemplate.send(TOPIC, JSON.toJSONString(message));
        }
    }
    
    

    3.3 消费者

    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Optional;
    
    @Component
    @Slf4j
    public class KafkaReceiver {
    
        @KafkaListener(topics = {KafkaSender.TOPIC})
        public void listen(ConsumerRecord<?, ?> record) {
            Optional<?> value = Optional.ofNullable(record.value());
            if (value.isPresent()) {
                Object message = value.get();
                log.info("record" + record);
                log.info("message" + message.toString());
            }
        }
    }
    

    3.4 调用

    在controller中加一个方法调用

    @Autowired
    private KafkaSender sender;
        
    @PostMapping("/send")
    public Object send(@RequestBody Map<String, String> map) {
        sender.send(map.get("name"));
        return "success";
    }
    

    4、结果

    post请求发送name = 2
    打印日志

    2019-12-22 23:53:20.445  INFO 11456 --- [ntainer#0-0-C-1] com.agan.service.KafkaReceiver           : recordConsumerRecord(topic = myTopic, partition = 0, offset = 7, CreateTime = 1577030000442, serialized key size = -1, serialized value size = 93, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":"509e9d65-76b7-42e1-b341-0f4be02c0da9","name":"2","sendTime":"2019-12-22T23:53:20.441"})
    2019-12-22 23:53:20.446  INFO 11456 --- [ntainer#0-0-C-1] com.agan.service.KafkaReceiver           : message{"id":"509e9d65-76b7-42e1-b341-0f4be02c0da9","name":"2","sendTime":"2019-12-22T23:53:20.441"}
    

    同时,命令行启动的消费者同样收到了消息

    {"id":"0f17bb5a-2ef3-461b-8cf1-3b45cdff7ba0","name":"1","sendTime":"2019-12-22T23:52:46.722"}
    {"id":"509e9d65-76b7-42e1-b341-0f4be02c0da9","name":"2","sendTime":"2019-12-22T23:53:20.441"}
    

    5、大坑

    在发送消息时,总是无法成功,并报错。

    org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for t2-0: 30042 ms has passed since batch creation plus linger time
    

    去虚拟机上看了一些,kafka、zk都活得好好的,生产者消费者也能用。。。

    将日志等级调低,
    logging:
    level:
    root: debug
    观察到下面的问题:

    java.io.IOException: Can't resolve address: learn200:9092
    	at org.apache.kafka.common.network.Selector.doConnect(Selector.java:223) ~[kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.common.network.Selector.connect(Selector.java:202) ~[kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:793) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:62) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:944) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:848) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:458) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:221) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:153) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:na]
    	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:na]
    	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_202]
    	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_202]
    	at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_202]
    	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_202]
    Caused by: java.nio.channels.UnresolvedAddressException: null
    	at sun.nio.ch.Net.checkAddress(Net.java:101) ~[na:1.8.0_202]
    	at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[na:1.8.0_202]
    	at org.apache.kafka.common.network.Selector.doConnect(Selector.java:221) ~[kafka-clients-1.0.2.jar:na]
    	... 20 common frames omitted
    
    

    无法解析这个域名和端口,在本机的hosts文件中,加入该域名的解析
    windows为例,
    在 C:WindowsSystem32driversetc的hosts文件
    最下面加上

    # Kafka虚拟机
    192.168.88.200 learn200
    

    重启项目就好了

    项目代码地址 https://github.com/AganRun/Learn/tree/master/SpringBootKafka

    参考:

    https://blog.csdn.net/guijiaoba/article/details/78637375#spring-boot集成
    https://www.cnblogs.com/kaituorensheng/p/10475551.html
    https://blog.csdn.net/maoyuanming0806/article/details/80553632

  • 相关阅读:
    跨域(cross-domain)访问 cookie (读取和设置)
    实用的PHP正则表达式
    Leetcode:find_minimum_in_rotated_sorted_array
    spring Jdbc自己主动获取主键。
    《学习opencv》笔记——矩阵和图像操作——cvSetIdentity,cvSolve,cvSplit,cvSub,cvSubS and cvSubRS
    HTML5 input placeholder 颜色 改动
    Java面试宝典2014版
    Go语言 关于go error处理风格的一些讨论和个人观点(上)
    动静结合学内核:linux idle进程和init进程浅析
    【Bootstrap3.0建站笔记二】button可下拉弹出层
  • 原文地址:https://www.cnblogs.com/AganRun/p/12081776.html
Copyright © 2011-2022 走看看