zoukankan      html  css  js  c++  java
  • Kafka 入门实战(3)--SpringBoot 整合 Kafka

    spring-kafka 使得在 Spring 环境中使用 Kafka 变的很简单,只需少量的配置和少量的代码就可以发送和接受消息了。本文主要介绍在 SpringBoot 中用 spring-kafka 操作 Kafka,文中使用到的软件版本:Kafka 2.8.0、SpringBoot 2.4.6、Java 1.8.0_191。

    1、参数说明

    spring-kafka 中参数是以 spring.kafka 开头的,后面的参数名称和 Kafka 的原始参数很类似,只不过 spring-kafka 会把一些参数中的 "." 改为 "-",如 auto.offset.reset 改为 spring.kafka.consumer.auto-offset-reset。

    前缀 描述
    spring.kafka Spring 中 Kafka 相关配置总的前缀
    spring.kafka.consumer 消费者相关参数
    spring.kafka.producer  生产者相关参数
    spring.kafka.admin Kafka 管理相关参数

    kafka 的原始参数说明可参考:Kafka入门实战(1)-概念、安装及简单使用;或参考官方文档

    2、SpringBoot 整合 Kafka

    2.1、引入依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <!--流处理需要用到-->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
    </dependency>

    2.2、增加 Kafka 配置

    spring:
      kafka:
        bootstrap-servers: 10.40.100.69:9092
        producer:
          acks: all
          transaction-id-prefix: tx. #开启事务,发送消息的方法需增加@Transactional注解
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
          group-id: groupA
          auto-offset-reset:
        streams:
          application-id: streams-test
          properties:
            "[default.key.serde]": org.apache.kafka.common.serialization.Serdes$StringSerde
            "[default.value.serde]": org.apache.kafka.common.serialization.Serdes$StringSerde

    2.3、发送消息

    package com.abc.demo.kafka;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    @Component
    public class Producer {
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Transactional
        @Scheduled(cron = "0/10 * * * * ?")
        public void sendMessage() {
            for (int i = 0; i < 10; i++) {
                kafkaTemplate.send("test", "消息" + i);
            }
        }
    
    //    @Scheduled(cron = "0/10 * * * * ?")
    //    public void sendMessage2() {
    //        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){
    //            @Override
    //            public Object doInOperations(KafkaOperations kafkaOperations) {
    //                for (int i = 0; i < 10; i++) {
    //                    kafkaTemplate.send("test", "消息" + i);
    //                }
    //                return null;
    //            }
    //        });
    //    }
    }

    2.4、接受消息

    package com.abc.demo.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Consumer {
        private static Logger logger = LoggerFactory.getLogger(Consumer.class);
    
        @KafkaListener(topics = "test")
        public void recevieMessage(ConsumerRecord<String, String> record) {
            logger.info("offset={}, key={}, value={}", record.offset(), record.key(), record.value());
        }
    }

    2.5、流处理

    package com.abc.demo.kafka;
    
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Produced;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafkaStreams;
    import org.springframework.kafka.support.serializer.JsonSerde;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @Configuration(proxyBeanMethods = false)
    @EnableKafkaStreams
    public class StreamConfig {
        @Bean
        public KStream<String, String> kStream(StreamsBuilder streamsBuilder) {
            KStream<String, String> stream = streamsBuilder.stream("stream-in");
            //从 stream-in 队列中读取数据,处理后发送给 stream-out 队列
            //发送数据 key,value 分别使用的序列化类为Serdes.String(),JsonSerde
            stream.map(this::uppercaseValue).to("stream-out", Produced.with(Serdes.String(), new JsonSerde<>()));
            return stream;
        }
    
        /**
         * 消息转换,新的消息:key-原来的value值,value-一个map
         */
        private KeyValue<String, Map> uppercaseValue(String key, String value) {
            Map<String, String> map = new HashMap<>();
            map.put("message", value.toUpperCase());
            map.put("timestamp", System.currentTimeMillis() + "");
            return new KeyValue(value, map);
        }
    }

    程序从 stream-in 中读取消息,对消息加工后再发送给 stream-out;打开两个终端,一个往 stream-in 发送消息,一个接受 stream-out 的消息。

    ./kafka-console-producer.sh --broker-list 10.40.100.69:9092 --topic stream-in #发送消息
    
    ./kafka-console-consumer.sh --bootstrap-server 10.40.100.69:9092 --topic stream-out --property print.key=true #接受消息

    stream-in 的输入:

    stream-out 的输出:

  • 相关阅读:
    C++标准转换运算符(2)
    C++标准转换运算符(1)
    未能加载视图状态。正在向其中加载视图状态的控件树必须与前一请求期间用于……
    我的第一篇博客
    C语言C语言程序
    C语言基本运算符
    C语言流程控制
    C语言关键字、标识(zhi)符、注释
    msado15.dll版本引发的离奇故障
    mySQL错误: The used table type doesn't support FULLTEXT indexes
  • 原文地址:https://www.cnblogs.com/wuyongyin/p/14900587.html
Copyright © 2011-2022 走看看