zoukankan      html  css  js  c++  java
  • springboot整合kafka

    相关环境搭建(centos7下搭建单机kafka)

    1、官网下载[kafka][http://kafka.apache.org/]

    tar -xzf kafka_2.12-2.6.0.tgz
    cd kafka_2.13-2.6.0
    

    2、修改配置文件(conf下面service.properties中advertised.listeners)

    # 允许外部端口连接                                            
    listeners=PLAINTEXT://0.0.0.0:9092  
    # 外部代理地址                                                
    advertised.listeners=PLAINTEXT://192.168.0.175:9092
    

    3、通过守护进程启动命令

    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
    bin/kafka-server-start.sh -daemon config/server.properties
    

    环境配置好之后,下面进入测试。
    4、创建一个主题

    bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
    

    5、将事件/消息写入主题(创建生产者)

    bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
    >this is test!
    

    按crtl+c可退出当前输入模式
    6、消费

    bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
    

    springboot集成kafka

    1、新建工程,添加pom

    <!--引入kafak和spring整合的jar-->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    

    2、application.yml添加kafka相关配置:

    spring:
      application:
        name: cloud-kafka
      kafka:
        bootstrap-servers: 192.168.0.175:9092
        producer: # producer 生产者
          retries: 0 # 重试次数
          acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
          batch-size: 16384 # 批量大小
          buffer-memory: 33554432 # 生产端缓冲区大小
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer: # consumer消费者
          group-id: test-consumer-group # 默认的消费组ID
          enable-auto-commit: true # 是否自动提交offset
          auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
          auto-offset-reset: latest
          # 当kafka中没有初始offset或offset超出范围时将自动重置offset;
          # earliest:重置为分区中最小的offset;
          # latest:重置为分区中最新的offset(消费分区中新产生的数据);
          # none:只要有一个分区不存在已提交的offset,就抛出异常;
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

    3、生产者(发送者)

    @RestController
    public class KafkaProducer {
        @Resource
        private KafkaTemplate<String, Object> kafkaTemplate;
    
        @GetMapping("/kafka/normal/{msg}")
        public void sendMessage(@PathVariable("msg") String msg) {
            Message message = new Message();
            message.setId(UUID.randomUUID().toString());
            message.setSendTime(new Date());
            message.setMessage(msg);
            kafkaTemplate.send("test", JSONUtil.toJsonStr(message));
        }
    }
    

    4、消费者(接受者)

    @Component
    public class KafkaConsumer {
        private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    
        @KafkaListener(topics = {"test"})
        public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
            Optional<?> optional = Optional.ofNullable(consumerRecord.value());
            if (optional.isPresent()) {
                Object msg = optional.get();
                logger.info("record:{}", consumerRecord);
                logger.info("message:{}", msg);
            }
        }
    }
    

    5、实体类

    public class Message {
        private String id;
        private String message;
        private Date sendTime;
        // getter setter 略  
    }
    

    上面示例创建了一个生产者,发送消息到test,消费者监听test消费消息。监听器用@KafkaListener注解,topics表示监听的topic,支持同时监听多个,用英文逗号分隔。启动项目,postman调接口触发生产者发送消息。
    同时查看日志信息:

    2020-11-09 17:28:08.530  INFO 15076 --- [ntainer#0-0-C-1] com.example.service.KafkaConsumer        : record:ConsumerRecord(topic = test, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1604914088509, serialized key size = -1, serialized value size = 87, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"message":"test","sendTime":1604914088452,"id":"f4dcc246-8721-4ef8-bad4-555269328901"})
    2020-11-09 17:28:08.530  INFO 15076 --- [ntainer#0-0-C-1] com.example.service.KafkaConsumer        : message:{"message":"test","sendTime":1604914088452,"id":"f4dcc246-8721-4ef8-bad4-555269328901"}
    

    可以看到消费成功。

    更详细内容,请参考:https://blog.csdn.net/yuanlong122716/article/details/105160545/

  • 相关阅读:
    默认路由
    路由与交换,cisco路由器配置,动态路由协议—RIP
    路由与交换,cisco路由器配置,浮动静态路由
    路由与交换,cisco路由器配置,静态路由
    路由与交换,cisco路由器配置,基础知识点(二)
    路由与交换,cisco路由器配置,基础知识点(一)
    Linux服务器架设篇,Nginx服务器的架设
    BIOS和DOS中断大全
    一步一个脚印
    汇编缩写大全
  • 原文地址:https://www.cnblogs.com/jone-chen/p/13949673.html
Copyright © 2011-2022 走看看