zoukankan      html  css  js  c++  java
  • Kafka 消费时机问题

    kafka的消费时机

    问题

    kafka启动后开始消费的话, 如果此时消费流程中有其他依赖没有启动完成的话,比如 Redis , Mysql ,RPC等就会有空指针或其他问题。这时候就要延后kafka的消费时机。

    解决

    kafka启动时,不在启动时开启消费线程。

    public class KafkaConsumer {
    
    private KafkaConsumer<String, byte[]> consumer;
    private ThreadPoolExecutor threadPoolExecutor;
    private ThreadPoolExecutor threadConsumerExecutor;
    
    
    public void init(String kafkaTopic, String groupIdConfig) {
        //init pool
        //init consumer
        。。。。。
    }
    
    //开始消费单独拿出方法
    public void startConsumer(final Handler handler) {
        threadConsumerExecutor.submit(new Runnable() {
            @Override
            public void run() {
                while(true){
                    ConsumerRecords<String, byte[]> data = consumer.poll(200);
                    .....
                    handler.onMessage(data)
                    sleep(100)
                }
                
            }
        });
    }
    

    然后在SpringBoot启动后,再启用kafka的消费线程

    @Component
    public class ApplicationRunnerImpl implements ApplicationRunner {
        @Override
        public void run(ApplicationArguments args) throws Exception {
            //校验Redis
            //校验Mysql
            //校验RPC
            kafkaConsumer.start()
        }
    }
    

    此时启用kafkaConsumer,就不会有问题了。

    升级版

    可以设置标志位,来灵活做到启/停/暂停/恢复

    public class KafkaConsumer {
        //标识位,是否开始消费线程
    private AtomicBoolean polling = new AtomicBoolean(true);
    private KafkaConsumer<String, byte[]> consumer;
    private ThreadPoolExecutor threadPoolExecutor;
    private ThreadPoolExecutor threadConsumerExecutor;
    private Handler handler;
    
    
    public void init(String kafkaTopic, String groupIdConfig) {
        //init pool
        //init consumer
        。。。。。
        startConsumer()
    }
    
    //开始消费单独拿出方法
    public void startConsumer() {
        threadConsumerExecutor.submit(new Runnable() {
            @Override
            public void run() {
                while(this.polling.get()){
                    ConsumerRecords<String, byte[]> data = consumer.poll(200);
                    .....
                    handler.onMessage(data)
                    sleep(100)
                }
            }
        });
    }
    //暂停消费线程
    public void pause(){
        this.polling.set(false)
    }
    
    //恢复消费线程
    public void resume(){
        this.polling.set(true)
    }
    
    //关闭消费
    public void shutDown(){
        pause();
        unsubscribe();
        threadConsumerExecutor.shutdown();
        ...
    
    }
    
    

    代码做了些简略处理。明白思路就好做,然后就可以针对业务场景细化一下了。

  • 相关阅读:
    wait
    iOS UITableviewCell优化
    iOS本地版本和服务器对比
    iOS 二维码生成 改变颜色 添加中心图
    iOS坑点解析
    iOS View快照,View截屏
    双缓冲读感感悟
    查找附近点--Geohash方案讨论
    各种报告word模板
    跳转到设置里面各个页面iOS8
  • 原文地址:https://www.cnblogs.com/ElEGenT/p/12516578.html
Copyright © 2011-2022 走看看