zoukankan      html  css  js  c++  java
  • Kafka之SpringBoot集成Kafka实战

      在spring应用中如果需要订阅kafka消息,通常情况下我们不会直接使用kafka-client, 而是使用更方便的一层封装spring-kafka。
      在spring-kafka在运行时会启动两类线程,一类是Consumer线程,另一类是Listener线程。前者用来直接调用kafka-client的poll()方法获取消息,后者才是调用我们代码中标有@KafkaListener注解方法的线程。如果直接使用kafka-client的话,那么正常的写法是一个while循环,在循环里面调用poll(),然后处理消息,这在kafka broker看来就是一个Consumer。如果想用多个Consumer,除了多启动几个进程以外,也可以在一个进程使用多个线程执行此while()循环。spring-kafka就是这么干的。

    1.添加依赖

    <dependencies>
        <dependency>
    	<groupId>org.springframework.kafka</groupId>
    	<artifactId>spring-kafka</artifactId>
    	<version>2.5.5.RELEASE</version>
        </dependency>
        <dependency>
        	<groupId>org.springframework.boot</groupId>
        	<artifactId>spring-boot-starter-web</artifactId>
        	<version>2.3.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.74</version>
        </dependency>
    </dependencies>

    2.kafka配置

      springBoot集成kafka,kafka的原生配置可以参考以下源码:

    org.apache.kafka.clients.CommonClientConfigs.class
    org.apache.kafka.clients.consumer.ConsumerConfig.class
    org.apache.kafka.clients.producer.ProducerConfig.class

       application.properties配置如下:

    #============== KAFKA START===================
    spring.kafka.listener.concurrency=5
    
    spring.kafka.producer.bootstrap.servers=192.168.15.218:9093
    spring.kafka.producer.retries= 3
    spring.kafka.producer.buffer.memory=33554432
    spring.kafka.producer.acks=0
    #自定义配置,控制生产者是否发送消息
    spring.kafka.producer.enable=false
    
    spring.kafka.consumer.bootstrap.servers=192.168.15.218:9093
    spring.kafka.consumer.group.id=kafka-group-ryj
    spring.kafka.consumer.enable.auto.commit=true
    spring.kafka.consumer.auto.offset.reset=earliest
    spring.kafka.consumer.max.poll.records=10
    #自定义配置,控制消费者是否监听
    spring.kafka.consumer.enable=true
    #============== KAFKA END======================
    
    #============== TOPIC START======================
    topic.testRecord=topic.testRecord
    #============== TOPIC END======================

    3.修改启动类,支持kafka注解

    @SpringBootApplication()
    @EnableKafka
    public class KafkaTest {
        public static void main(String[] args) {
            System.out.println("Hello World!");
            SpringApplication.run(KafkaTest.class, args);
        }
    }

    4.增加kafka配置类,生成生产者、消费者相关信息

    @Configuration
    public class KafkaConfig {
    
        @Value("${spring.kafka.producer.bootstrap.servers}")
        private String producerServer;
    
        @Value("${spring.kafka.producer.retries}")
        private Integer producerRetries;
    
        @Value("${spring.kafka.producer.buffer.memory}")
        private String producerBufferMemory;
    
        @Value("${spring.kafka.producer.acks}")
        private String producerAcks;
    
        @Value("${spring.kafka.consumer.bootstrap.servers}")
        private String consumerServer;
    
        @Value("${spring.kafka.consumer.enable.auto.commit}")
        private Boolean consumerAutoCommit;
    
        @Value("${spring.kafka.consumer.group.id}")
        private String consumerGroupId;
    
        @Value("${spring.kafka.consumer.auto.offset.reset}")
        private String consumerOffsetReset;
    
        @Value("${spring.kafka.consumer.max.poll.records}")
        private String consumerPollNum;
    
        /**
         * 生产者配置信息
         */
        @Bean
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<String, Object>();
            props.put(ProducerConfig.ACKS_CONFIG, producerAcks);// 为0时,生产者不会等待返回消息发送结果
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, producerServer);
            props.put(ProducerConfig.RETRIES_CONFIG, producerRetries);// 发送失败时,重新发送消息次数
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);// 批量发送消息的间隔时间
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferMemory);// 生产者缓存消息的内存字节数
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        /**
         * 生产者工厂
         */
        @Bean
        public ProducerFactory<String, Object> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        /**
         * 生产者模板
         */
        @Bean
        public KafkaTemplate<String, Object> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
        /**
         * 消费者配置信息
         */
        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> props = new HashMap<String, Object>();
            props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);// 消费者组ID
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerOffsetReset);// offser没有初始化或者不存在时默认的配置
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, consumerServer);
            props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, consumerPollNum);// 每次拉取记录的数量
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);// 用于检测客户端故障的超时时间
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);// 请求响应的超时时间
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            return props;
        }
        
        @Bean
        public ConsumerFactory<String, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
        }
    
        @Bean
        public KafkaListenerContainerFactory<?> containerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
            container.setConsumerFactory(consumerFactory());
            container.setBatchListener(true);//批量拉取消息,与消费者的接收参数有关
            return container;
        }
    
        /**
         * KafkaListener 延迟启动监听工厂
         * @return
         */
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
            container.setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(consumerConfigs()));
            // 禁止自动启动
            container.setAutoStartup(false);
            container.setBatchListener(true);
            return container;
        }
    }

    5.生产者消息发送测试类

    @Component
    public class KafkaProducer {
    
        private static Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
    
        @Autowired
        KafkaTemplate<String, Object> kafkaTemplate;
    
        public void sendJsonMessageToKafka(String jsonMessage, String topicName) {
            ListenableFuture<SendResult<String, Object>> listenableFuture = kafkaTemplate.send(topicName, jsonMessage);
            listenableFuture
                    .addCallback(
                            o -> logger.info("send message to kafka success !!! topicName={}, partition={}, offset={},msg={}", topicName,
                                    o.getRecordMetadata().partition(), o.getRecordMetadata().offset(), o.getProducerRecord().value()),
                            throwable -> this.sendMsgFail(throwable, topicName));
        }
    
        private void sendMsgFail(Throwable throwable, String topicName) {
            logger.error("send message to kafka fail !!! topicName=" + topicName + " error " + throwable.getMessage());
        }
    }
    @Component
    @Order(100)
    public class KafkaTestRunner implements ApplicationRunner {
    
        @Value("${topic.testRecord}")
        private String topicName;
        
        @Value("${spring.kafka.producer.enable}")
        private Boolean producerEnable;
    
        @Autowired
        KafkaProducer kafkaProducer;
    
        @Override
        public void run(ApplicationArguments args) {
            if(producerEnable) {
                new Thread() {
                    @Override
                    public void run() {
                        sendMessage();
                    }
                    
                }.start();
            }
        }
    
        private void sendMessage() {
            for (Integer i = 0; i < 1000; i++) {
                kafkaProducer.sendJsonMessageToKafka(JSON.toJSONString(new Digit(i)), topicName);
            }
        }
    }
    
    @Data
    class Digit {
    
        Integer i;
    
        public Digit(Integer i) {
            super();
            this.i = i;
        }
        
    }

    6.消费者批量消费测试类

    @Component
    @SpringBootConfiguration
    public class KafkaConsumerTest {
    
        private static Logger logger = LoggerFactory.getLogger(KafkaConsumerTest.class);
        //如果想立即消费,可以更换containerFactory
        @KafkaListener(id = "delayConsumer",topics = "${topic.testRecord}", containerFactory = "delayContainerFactory", groupId = "${spring.kafka.consumer.group.id}")
        //批量时不能用Object作为参数,否则会报错
        public void delayConsumer(List<ConsumerRecord<String, String>> message) {
            try {
                message.forEach(record -> {
                    logger.info("delayConsumer consumer success.partition={}, offset={},msg={}",record.partition(),record.offset(),record.value());
                });
            } catch (Exception e) {
                logger.error("delayConsumer error.",e);
            }
        }
    }
    @Component
    @Order(10)
    public class KafkaDelayConsumerRunner implements ApplicationRunner {
    
        @Autowired
        private KafkaListenerEndpointRegistry registry;
        
        @Value("${spring.kafka.consumer.enable}")
        private Boolean consumerEnable;
    
        @Override
        public void run(ApplicationArguments args) {
            if (consumerEnable) {
                //唤醒延迟启动的kafka消费者
                registry.getListenerContainer("delayConsumer").start();
            }
        }
    }
  • 相关阅读:
    Hibernate学习之缓存机制
    Hibernate学习之hibernate状态
    Ajax学习之小结
    Hibernate学习之hibernate执行顺序
    Svn入门
    Svn服务启动的两种方式
    Eclipse安装Svn插件
    一种给力的带背景的超链接的写法
    转载:IE下div使用margin:0px auto不居中的原因
    github上的Lua in Erlang
  • 原文地址:https://www.cnblogs.com/ryjJava/p/14381467.html
Copyright © 2011-2022 走看看