zoukankan      html  css  js  c++  java
  • spring kafka之如何批量给topic加前缀

    前言

    最近业务开发部门给我们部门提了一个需求,因为他们开发环境和测试环境共用一套kafka,他们希望我们部门能帮他们实现自动给kafka的topic加上环境前缀,比如开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是使用spring-kafka。一开始接到这个需求的时候,我心里是拒绝的,为啥开发环境和测试环境不分别部署一套kafka,还要那么麻烦。但老大都答应接这个需求了,作为小罗罗也只能接了

    实现思路

    1、生产者端

    可以通过生产者拦截器,来给topic加前缀

    2、实现步骤

    a、编写一个生产者拦截器

    @Slf4j
    public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {
    
    
    
        /**
         * 运行在用户主线程中,在消息被序列化之前调用
         * @param record
         * @return
         */
        @Override
        public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {
            log.info("原始topic:{}",record.topic());
            return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),
                    record.partition(),record.timestamp(),record.key(), record.value());
        }
    
    
    
    
        /**
         * 在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中
         * @param metadata
         * @param exception
         */
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
          log.info("实际topic:{}",metadata.topic());
        }
    
    
        /**
         *  清理工作
         */
        @Override
        public void close() {
        }
    
    
        /**
         * 初始化工作
         * @param configs
         */
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    
    

    b、配置拦截器

    kafka:
        producer:
          # 生产者拦截器配置
          properties:
            interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor
    

    c、测试

    image.png

    2、消费者端

    这个就稍微有点难搞了,因为业务开发部门他们是直接用@KafkaListener的注解,形如下

     @KafkaListener(id = "msgId",topics = {Constant.TOPIC})
    

    像这种也没啥好的办法,就只能通过源码了,通过源码可以发现在如下地方

    KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization
    

    会把@KafkaListener的值赋值给消费者,如果对spring有了解的朋友,可能会知道postProcessAfterInitialization是spring后置处理器的方法,主要用来bean初始化后的一些操作,既然我们知道@KafkaListener会在bean初始化后再进行赋值,那我们就可以在bean初始化前,修改掉@KafkaListener的值。具体实现如下

    @Component
    public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {
    
        @SneakyThrows
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    
            List<String> packageNames = AutoConfigurationPackages.get(beanFactory);
    
            for (String packageName : packageNames) {
                Reflections reflections = new Reflections(new ConfigurationBuilder()
                        .forPackages(packageName) // 指定路径URL
                        .addScanners(new SubTypesScanner()) // 添加子类扫描工具
                        .addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
                        .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
                        .addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
                );
    
                Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
                if(!CollectionUtils.isEmpty(methodSet)){
                    for (Method method : methodSet) {
                        KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                        changeTopics(kafkaListener);
                    }
                }
            }
    
        }
    
    
        private void changeTopics(KafkaListener kafkaListener) throws Exception{
            InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
            Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
            memberValuesField.setAccessible(true);
            Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);
            String[] topics = (String[])memberValues.get("topics");
            System.out.println("修改前topics:" + Lists.newArrayList(topics));
            for (int i = 0; i < topics.length; i++) {
                topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
            }
            memberValues.put("topics", topics);
            System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics()));
    
        }
    }
    

    测试

    image.png
    image.png

    总结

    虽然实现了动态修改topic,但我还是觉得topic不要随便改变,有条件的话,kafka还是得基于物理环境隔离,其次真的客观条件不允许,要动态变更topic,则需做好topic动态变更宣导以及相关wiki的编写,不然很容易掉坑

    demo链接

    https://github.com/lyb-geek/springboot-learning/tree/master/springboot-mq-idempotent-consume

  • 相关阅读:
    DCR挖矿成本¥355.92,市价¥346.24——五大币种挖矿成本分析 2018-08-7
    智者见智——区块链3.0与未来
    梭哈10万入场,拿住不放,3年后收益过十亿,币圈传奇—大空翼
    从路人甲到叱咤币圈的神话:“打死也不卖币”的宝二爷
    小白眼里的区块链和币圈 —— 持币待涨的故事
    区块链如何赋能网络互助行业?来自众托帮+区块链的应用、车车助+纷享车链AutoChain应用
    中国青年网记者-阿里巴巴王坚:互联网已成为世界发展的基础设施
    读者咩叭:畅谈经济未来
    中青网财经:请一位心理学博士当CTO 只有马云敢这么做
    云科技时代:阿里云创造者写了《在线》,这是一本怎样的书?
  • 原文地址:https://www.cnblogs.com/linyb-geek/p/14088291.html
Copyright © 2011-2022 走看看