zoukankan      html  css  js  c++  java
  • SpringCloud使用Kafka消费者

    POM文件配置

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.donwait</groupId>
      <artifactId>my-kafka-demon</artifactId>
      <version>0.0.1-SNAPSHOT</version>
     
      <!-- spring boot项目 -->
      <parent>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-parent</artifactId>
          <version>2.0.4.RELEASE</version>
          <relativePath/> <!-- lookup parent from repository -->
      </parent>
     
      <!-- 项目属性:子模块不能引用父项目的properties变量 -->
      <properties>
           <!-- 系统全局版本号信息: 所有服务会继承 -->
          <dys.global.version>1.0.0.1</dys.global.version>
          <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
          <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
          <java.version>1.8</java.version>
          <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
          <!-- <spring-cloud.version>Finchley.BUILD-SNAPSHOT</spring-cloud.version>-->
          <lombok.version>1.16.20</lombok.version>
      </properties>
     
      <!-- 项目依赖:特殊强制依赖,其他继承父亲 -->
      <dependencies>
         <!--spring boot测试-->
         <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-test</artifactId>
              <scope>test</scope>
         </dependency>
         <!-- 监控系统健康情况的工具 -->
         <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-actuator</artifactId>
         </dependency>
         <!--Lombok:消除模板代码-->
         <dependency>
              <groupId>org.projectlombok</groupId>
              <artifactId>lombok</artifactId>
         </dependency>
         <!-- logback日志包 -->
         <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
        </dependency>
         <!-- SpringMVC -->
         <dependency>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
         <!-- kafka客户端 -->
         <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
      </dependencies>
     
      <!-- 编译插件 -->
      <build>
          <plugins>
                <!--spring boot maven插件-->
              <plugin>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-maven-plugin</artifactId>
              </plugin>
          </plugins>
      </build>
     
    </project>
    

    注意:
    (1)引入kafka客户端

    <!-- kafka客户端 -->
         <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
      <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    

    2)添加springboot-maven插件

    <!-- 编译插件 -->
      <build>
          <plugins>
                <!--spring boot maven插件-->
              <plugin>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-maven-plugin</artifactId>
              </plugin>
          </plugins>
      </build>
    

    创建kafka配置

    package com.donwait.config;
    import java.util.HashMap;
    import java.util.Map;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.annotation.EnableKafka;
    import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
    import org.springframework.kafka.core.ConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;
    @Configuration
    @EnableKafka
    public class KafkaConfig {
         @Value("${kafka.broker-list}")
        private String brokers;
         @Value("${kafka.producer.retries}")
        private Integer producer_retries;
         @Value("${kafka.producer.batch-size}")
        private Integer producer_batch_size;
         @Value("${kafka.producer.linger-ms}")
        private Integer producer_linger_ms;
         @Value("${kafka.producer.buffer-memory}")
        private Integer producer_buffer_memory;
         @Value("${kafka.producer.key-serializer}")
        private String producer_key_serializer;
         @Value("${kafka.producer.value-serializer}")
        private String producer_value_serializer;
         
         @Value("${kafka.consumer.topic}")
        private String consumer_topic;
         @Value("${kafka.consumer.gourp-id}")
        private String consumer_gourp_id;
         @Value("${kafka.consumer.enable-auto-commit}")
        private boolean consumer_enable_auto_commit;
         @Value("${kafka.consumer.auto-commit-ms}")
        private String consumer_auto_commit_ms;
         @Value("${kafka.consumer.session-timeout-ms}")
        private String consumer_session_timeout_ms;
         @Value("${kafka.consumer.key-deserializer}")
        private String consumer_key_deserializer;
         @Value("${kafka.consumer.value-deserializer}")
        private String consumer_value_deserializer;
         
         @Bean
        ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
         
         @Bean
        public ConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }
         
         /**
          * 消费者参数配置
          * @return
          */
         @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, consumer_gourp_id);
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumer_enable_auto_commit);
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumer_auto_commit_ms);
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumer_session_timeout_ms);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, consumer_key_deserializer);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, consumer_value_deserializer);
            return props;
        }
         
         @Bean
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
         
         /**
          * 生产者参数配置
          * @return
          */
         @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            props.put(ProducerConfig.RETRIES_CONFIG, producer_retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, producer_batch_size);
            props.put(ProducerConfig.LINGER_MS_CONFIG, producer_linger_ms);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producer_buffer_memory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, producer_key_serializer);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, producer_value_serializer);
            return props;
        }
         
         @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            System.out.println("init");
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }
    

    系统配置信息

    系统的application.yml配置内容如下:

    #服务配置
    server:
      port: 7002
    spring:
      application:
        name: kafka
    #日志信息配置
    logging:
      level:
        org.springframework.cloud.gateway: TRACE
        org.springframework.http.server.reactive: DEBUG
        org.springframework.web.reactive: DEBUG
        reactor.ipc.netty: DEBUG
    #Spring Boot Actuator:监控系统配置
    endpoints:
      shutdown:
        enabled: true
        path: /shutdown
        sensitive: true
    management:
      security:
        enabled: false
    kafka:
       broker-list: 192.168.12.150:9092,192.168.12.151:9092,192.168.12.152:9092
       producer:
          #发送失败后的重试次数,默认0
          retries: 1
          #以字节为单位控制默认的批量大小
          batch-size: 0
          #延迟时间
          linger-ms: 1
          #缓冲等待发送到服务器的记录的总内存字节数
          buffer-memory: 33554432
          #实现Serializer接口的序列化类键
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          #实现Serializer接口的序列化类值
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
       consumer:
          #消费的主题
          topic: test-topic
          #消费者组id
          gourp-id: test-group
          #是否自动提交偏移量
          enable-auto-commit: true
          #提交偏移量的间隔-毫秒
          auto-commit-ms: 1000
          #客户端消费的会话超时时间-毫秒
          session-timeout-ms: 10000
          #实现DeSerializer接口的反序列化类键
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          #实现DeSerializer接口的反序列化类值
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    

    启动入口

    如果不需要测试生产者入口为:

    package com.donwait;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import lombok.extern.slf4j.Slf4j;
    /**
     * 实现命令行接口,从命令行读取参数发送
     * @author Administrator
     *
     */
    @SpringBootApplication
    @Slf4j
    public class KafkaApp {
         /**
          * kafka消费
          * @param cr
          * @throws Exception
          */
         @KafkaListener(topics = "test-topic")
        public void listen(ConsumerRecord<String, String> cr) throws Exception {
            log.info("我是消费者:{}:{}", cr.key(), cr.value());
            //latch.countDown();
        }
         
         public static void main(String[] args) {
            SpringApplication.run(KafkaApp.class, args).close();
        }
    }
    

    如果需要测试生产者,则实现命令中发送数据即可:

    package com.donwait;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.springframework.beans.factory.annotation.Autowired;
    importorg.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import lombok.extern.slf4j.Slf4j;
    /**
     * 实现命令行接口,从命令行读取参数发送
     * @author Administrator
     *
     */
    @SpringBootApplication
    @Slf4j
    public class KafkaApp implements CommandLineRunner {
         // kafka模板
         @Autowired
        private KafkaTemplate<String, String> template;
         // 计数器-等待3个消息接收完成
         private final CountDownLatch latch = new CountDownLatch(3);
         @Override
         public void run(String... args) throws Exception {
              System.out.println("发送信息...");
            this.template.send("test-topic", "foo1");
            this.template.send("test-topic", "foo2");
            this.template.send("test-topic", "foo3");
           
            // 等待60秒接收完成退出
            latch.await(60, TimeUnit.SECONDS);
            log.info("接收完成");
         }
         
         /**
          * kafka消费
          * @param cr
          * @throws Exception
          */
         @KafkaListener(topics = "test-topic")
        public void listen(ConsumerRecord<String, String> cr) throws Exception {
            log.info("我是消费者:{}:{}", cr.key(), cr.value());
            //latch.countDown();
        }
         
         public static void main(String[] args) {
            SpringApplication.run(KafkaApp.class, args).close();
        }
    }
    
    定位问题原因* 根据原因思考问题解决方案* 实践验证方案有效性* 提交验证结果
  • 相关阅读:
    Leetcode 15 3Sum
    Leetcode 383 Ransom Note
    用i个点组成高度为不超过j的二叉树的数量。
    配对问题 小于10 1.3.5
    字符矩阵的旋转 镜面对称 1.2.2
    字符串统计 连续的某个字符的数量 1.1.4
    USACO twofive 没理解
    1002 All Roads Lead to Rome
    USACO 5.5.1 求矩形并的周长
    USACO 5.5.2 字符串的最小表示法
  • 原文地址:https://www.cnblogs.com/jimoliunian/p/14095872.html
Copyright © 2011-2022 走看看