zoukankan      html  css  js  c++  java
  • Kafka 消费时机问题

    kafka的消费时机

    问题

    kafka启动后开始消费的话, 如果此时消费流程中有其他依赖没有启动完成的话,比如 Redis , Mysql ,RPC等就会有空指针或其他问题。这时候就要延后kafka的消费时机。

    解决

    kafka启动时,不在启动时开启消费线程。

    public class KafkaConsumer {
    
    private KafkaConsumer<String, byte[]> consumer;
    private ThreadPoolExecutor threadPoolExecutor;
    private ThreadPoolExecutor threadConsumerExecutor;
    
    
    public void init(String kafkaTopic, String groupIdConfig) {
        //init pool
        //init consumer
        。。。。。
    }
    
    //开始消费单独拿出方法
    public void startConsumer(final Handler handler) {
        threadConsumerExecutor.submit(new Runnable() {
            @Override
            public void run() {
                while(true){
                    ConsumerRecords<String, byte[]> data = consumer.poll(200);
                    .....
                    handler.onMessage(data)
                    sleep(100)
                }
                
            }
        });
    }
    

    然后在SpringBoot启动后,再启用kafka的消费线程

    @Component
    public class ApplicationRunnerImpl implements ApplicationRunner {
        @Override
        public void run(ApplicationArguments args) throws Exception {
            //校验Redis
            //校验Mysql
            //校验RPC
            kafkaConsumer.start()
        }
    }
    

    此时启用kafkaConsumer,就不会有问题了。

    升级版

    可以设置标志位,来灵活做到启/停/暂停/恢复

    public class KafkaConsumer {
        //标识位,是否开始消费线程
    private AtomicBoolean polling = new AtomicBoolean(true);
    private KafkaConsumer<String, byte[]> consumer;
    private ThreadPoolExecutor threadPoolExecutor;
    private ThreadPoolExecutor threadConsumerExecutor;
    private Handler handler;
    
    
    public void init(String kafkaTopic, String groupIdConfig) {
        //init pool
        //init consumer
        。。。。。
        startConsumer()
    }
    
    //开始消费单独拿出方法
    public void startConsumer() {
        threadConsumerExecutor.submit(new Runnable() {
            @Override
            public void run() {
                while(this.polling.get()){
                    ConsumerRecords<String, byte[]> data = consumer.poll(200);
                    .....
                    handler.onMessage(data)
                    sleep(100)
                }
            }
        });
    }
    //暂停消费线程
    public void pause(){
        this.polling.set(false)
    }
    
    //恢复消费线程
    public void resume(){
        this.polling.set(true)
    }
    
    //关闭消费
    public void shutDown(){
        pause();
        unsubscribe();
        threadConsumerExecutor.shutdown();
        ...
    
    }
    
    

    代码做了些简略处理。明白思路就好做,然后就可以针对业务场景细化一下了。

  • 相关阅读:
    JS编码解码详解
    web的几种返回顶部
    图片的懒加载的两种效果
    获取两个日期差
    C#虚方法
    依赖注入(DI)和Ninject
    在应用程序级别之外使用注册为 allowDefinition='MachineToApplication' 的节是错误的
    PetShop的系统架构设计
    C#综合揭秘——细说多线程(下)
    C# Socket编程(4)初识Socket和数据流
  • 原文地址:https://www.cnblogs.com/ElEGenT/p/12516578.html
Copyright © 2011-2022 走看看