zoukankan      html  css  js  c++  java
  • SpringCloud使用Kafka消费者

    POM文件配置

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.donwait</groupId>
      <artifactId>my-kafka-demon</artifactId>
      <version>0.0.1-SNAPSHOT</version>
     
      <!-- spring boot项目 -->
      <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.0.4.RELEASE</version>
          <relativePath/> <!-- lookup parent from repository -->
      </parent>
     
      <!-- 项目属性:子模块不能引用父项目的properties变量 -->
      <properties>
           <!-- 系统全局版本号信息: 所有服务会继承 -->
          <dys.global.version>1.0.0.1</dys.global.version>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
          <java.version>1.8</java.version>
          <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
          <!-- <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>-->
          <lombok.version>1.16.20</lombok.version>
      </properties>
     
      <!-- 项目依赖:特殊强制依赖,其他继承父亲 -->
      <dependencies>
         <!--spring boot测试-->
         <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
         </dependency>
         <!-- 监控系统健康情况的工具 -->
         <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-actuator</artifactId>
         </dependency>
         <!--Lombok:消除模板代码-->
         <dependency>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
         </dependency>
         <!-- logback日志包 -->
         <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
        </dependency>
         <!-- SpringMVC -->
         <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <!-- kafka客户端 -->
         <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
      </dependencies>
     
      <!-- 编译插件 -->
      <build>
          <plugins>
                <!--spring boot maven插件-->
              <plugin>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-maven-plugin</artifactId>
              </plugin>
          </plugins>
      </build>
     
    </project>
    

    注意:
    (1)引入kafka客户端

    <!-- kafka客户端 -->
         <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
      <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    

    2)添加springboot-maven插件

    <!-- 编译插件 -->
      <build>
          <plugins>
                <!--spring boot maven插件-->
              <plugin>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-maven-plugin</artifactId>
              </plugin>
          </plugins>
      </build>
    

    创建kafka配置

    package com.donwait.config;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    @Configuration
    @EnableKafka
    public class KafkaConfig {
         @Value("${kafka.broker-list}")
        private String brokers;
         @Value("${kafka.producer.retries}")
        private Integer producer_retries;
         @Value("${kafka.producer.batch-size}")
        private Integer producer_batch_size;
         @Value("${kafka.producer.linger-ms}")
        private Integer producer_linger_ms;
         @Value("${kafka.producer.buffer-memory}")
        private Integer producer_buffer_memory;
         @Value("${kafka.producer.key-serializer}")
        private String producer_key_serializer;
         @Value("${kafka.producer.value-serializer}")
        private String producer_value_serializer;
         
         @Value("${kafka.consumer.topic}")
        private String consumer_topic;
         @Value("${kafka.consumer.gourp-id}")
        private String consumer_gourp_id;
         @Value("${kafka.consumer.enable-auto-commit}")
        private boolean consumer_enable_auto_commit;
         @Value("${kafka.consumer.auto-commit-ms}")
        private String consumer_auto_commit_ms;
         @Value("${kafka.consumer.session-timeout-ms}")
        private String consumer_session_timeout_ms;
         @Value("${kafka.consumer.key-deserializer}")
        private String consumer_key_deserializer;
         @Value("${kafka.consumer.value-deserializer}")
        private String consumer_value_deserializer;
         
         @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
         
         @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
         
         /**
          * 消费者参数配置
          * @return
          */
         @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_gourp_id);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer_enable_auto_commit);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumer_auto_commit_ms);
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumer_session_timeout_ms);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer_key_deserializer);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer_value_deserializer);
            return props;
        }
         
         @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
         
         /**
          * 生产者参数配置
          * @return
          */
         @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            props.put(ProducerConfig.RETRIES_CONFIG, producer_retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, producer_batch_size);
            props.put(ProducerConfig.LINGER_MS_CONFIG, producer_linger_ms);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producer_buffer_memory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer_key_serializer);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer_value_serializer);
            return props;
        }
         
         @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            System.out.println("init");
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }
    

    系统配置信息

    系统的application.yml配置内容如下:

    #服务配置
    server:
      port: 7002
    spring:
      application:
        name: kafka
    #日志信息配置
    logging:
      level:
        org.springframework.cloud.gateway: TRACE
        org.springframework.http.server.reactive: DEBUG
        org.springframework.web.reactive: DEBUG
        reactor.ipc.netty: DEBUG
    #Spring Boot Actuator:监控系统配置
    endpoints:
      shutdown:
        enabled: true
        path: /shutdown
        sensitive: true
    management:
      security:
        enabled: false
    kafka:
       broker-list: 192.168.12.150:9092,192.168.12.151:9092,192.168.12.152:9092
       producer:
          #发送失败后的重试次数,默认0
          retries: 1
          #以字节为单位控制默认的批量大小
          batch-size: 0
          #延迟时间
          linger-ms: 1
          #缓冲等待发送到服务器的记录的总内存字节数
          buffer-memory: 33554432
          #实现Serializer接口的序列化类键
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          #实现Serializer接口的序列化类值
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
       consumer:
          #消费的主题
          topic: test-topic
          #消费者组id
          gourp-id: test-group
          #是否自动提交偏移量
          enable-auto-commit: true
          #提交偏移量的间隔-毫秒
          auto-commit-ms: 1000
          #客户端消费的会话超时时间-毫秒
          session-timeout-ms: 10000
          #实现DeSerializer接口的反序列化类键
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          #实现DeSerializer接口的反序列化类值
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

    启动入口

    如果不需要测试生产者入口为:

    package com.donwait;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import lombok.extern.slf4j.Slf4j;
    /**
     * 实现命令行接口,从命令行读取参数发送
     * @author Administrator
     *
     */
    @SpringBootApplication
    @Slf4j
    public class KafkaApp {
         /**
          * kafka消费
          * @param cr
          * @throws Exception
          */
         @KafkaListener(topics = "test-topic")
        public void listen(ConsumerRecord<String, String> cr) throws Exception {
            log.info("我是消费者:{}:{}", cr.key(), cr.value());
            //latch.countDown();
        }
         
         public static void main(String[] args) {
            SpringApplication.run(KafkaApp.class, args).close();
        }
    }
    

    如果需要测试生产者,则实现命令中发送数据即可:

    package com.donwait;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    importorg.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import lombok.extern.slf4j.Slf4j;
    /**
     * 实现命令行接口,从命令行读取参数发送
     * @author Administrator
     *
     */
    @SpringBootApplication
    @Slf4j
    public class KafkaApp implements CommandLineRunner {
         // kafka模板
         @Autowired
        private KafkaTemplate<String, String> template;
         // 计数器-等待3个消息接收完成
         private final CountDownLatch latch = new CountDownLatch(3);
         @Override
         public void run(String... args) throws Exception {
              System.out.println("发送信息...");
            this.template.send("test-topic", "foo1");
            this.template.send("test-topic", "foo2");
            this.template.send("test-topic", "foo3");
           
            // 等待60秒接收完成退出
            latch.await(60, TimeUnit.SECONDS);
            log.info("接收完成");
         }
         
         /**
          * kafka消费
          * @param cr
          * @throws Exception
          */
         @KafkaListener(topics = "test-topic")
        public void listen(ConsumerRecord<String, String> cr) throws Exception {
            log.info("我是消费者:{}:{}", cr.key(), cr.value());
            //latch.countDown();
        }
         
         public static void main(String[] args) {
            SpringApplication.run(KafkaApp.class, args).close();
        }
    }
    
    定位问题原因* 根据原因思考问题解决方案* 实践验证方案有效性* 提交验证结果
  • 相关阅读:
    golang 垃圾回收 gc
    LINUX下目标文件的BSS段、数据段、代码段
    使用Golang利用ectd实现一个分布式锁
    KNN算法介绍
    机器学习
    golang map to struct
    NoSQL数据库-MongoDB和Redis
    Go语言中的单引号、双引号、反引号
    广告制胜无它,顺应人性尔——leo鉴书63
    从周迅发布恋情 看百度百科的社会价值
  • 原文地址:https://www.cnblogs.com/jimoliunian/p/14095872.html
Copyright © 2011-2022 走看看