zoukankan      html  css  js  c++  java
  • spring kafka之如何批量给topic加前缀

    前言

    最近业务开发部门给我们部门提了一个需求,因为他们开发环境和测试环境共用一套kafka,他们希望我们部门能帮他们实现自动给kafka的topic加上环境前缀,比如开发环境,则topic为dev_topic,测试环境,则topic为test_topic,他们kafka客户端是使用spring-kafka。一开始接到这个需求的时候,我心里是拒绝的,为啥开发环境和测试环境不分别部署一套kafka,还要那么麻烦。但老大都答应接这个需求了,作为小罗罗也只能接了

    实现思路

    1、生产者端

    可以通过生产者拦截器,来给topic加前缀

    2、实现步骤

    a、编写一个生产者拦截器

    @Slf4j
    public class KafkaProducerInterceptor implements ProducerInterceptor<String, MessageDTO> {
    
    
    
        /**
         * 运行在用户主线程中,在消息被序列化之前调用
         * @param record
         * @return
         */
        @Override
        public ProducerRecord<String, MessageDTO> onSend(ProducerRecord<String, MessageDTO> record) {
            log.info("原始topic:{}",record.topic());
            return new ProducerRecord<String, MessageDTO>(TOPIC_KEY_PREFIX + record.topic(),
                    record.partition(),record.timestamp(),record.key(), record.value());
        }
    
    
    
    
        /**
         * 在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中
         * @param metadata
         * @param exception
         */
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
          log.info("实际topic:{}",metadata.topic());
        }
    
    
        /**
         *  清理工作
         */
        @Override
        public void close() {
        }
    
    
        /**
         * 初始化工作
         * @param configs
         */
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    
    

    b、配置拦截器

    kafka:
        producer:
          # 生产者拦截器配置
          properties:
            interceptor.classes: com.github.lybgeek.kafka.producer.interceptor.KafkaProducerInterceptor
    

    c、测试

    image.png

    2、消费者端

    这个就稍微有点难搞了,因为业务开发部门他们是直接用@KafkaListener的注解,形如下

     @KafkaListener(id = "msgId",topics = {Constant.TOPIC})
    

    像这种也没啥好的办法,就只能通过源码了,通过源码可以发现在如下地方

    KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization
    

    会把@KafkaListener的值赋值给消费者,如果对spring有了解的朋友,可能会知道postProcessAfterInitialization是spring后置处理器的方法,主要用来bean初始化后的一些操作,既然我们知道@KafkaListener会在bean初始化后再进行赋值,那我们就可以在bean初始化前,修改掉@KafkaListener的值。具体实现如下

    @Component
    public class KafkaListenerFactoryBeanPostProcesser implements BeanFactoryPostProcessor {
    
        @SneakyThrows
        @Override
        public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    
            List<String> packageNames = AutoConfigurationPackages.get(beanFactory);
    
            for (String packageName : packageNames) {
                Reflections reflections = new Reflections(new ConfigurationBuilder()
                        .forPackages(packageName) // 指定路径URL
                        .addScanners(new SubTypesScanner()) // 添加子类扫描工具
                        .addScanners(new FieldAnnotationsScanner()) // 添加 属性注解扫描工具
                        .addScanners(new MethodAnnotationsScanner() ) // 添加 方法注解扫描工具
                        .addScanners(new MethodParameterScanner() ) // 添加方法参数扫描工具
                );
    
                Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
                if(!CollectionUtils.isEmpty(methodSet)){
                    for (Method method : methodSet) {
                        KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                        changeTopics(kafkaListener);
                    }
                }
            }
    
        }
    
    
        private void changeTopics(KafkaListener kafkaListener) throws Exception{
            InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
            Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
            memberValuesField.setAccessible(true);
            Map<String,Object> memberValues = (Map<String,Object>)memberValuesField.get(invocationHandler);
            String[] topics = (String[])memberValues.get("topics");
            System.out.println("修改前topics:" + Lists.newArrayList(topics));
            for (int i = 0; i < topics.length; i++) {
                topics[i] = Constant.TOPIC_KEY_PREFIX + topics[i];
            }
            memberValues.put("topics", topics);
            System.out.println("修改后topics:" + Lists.newArrayList(kafkaListener.topics()));
    
        }
    }
    

    测试

    image.png
    image.png

    总结

    虽然实现了动态修改topic,但我还是觉得topic不要随便改变,有条件的话,kafka还是得基于物理环境隔离,其次真的客观条件不允许,要动态变更topic,则需做好topic动态变更宣导以及相关wiki的编写,不然很容易掉坑

    demo链接

    https://github.com/lyb-geek/springboot-learning/tree/master/springboot-mq-idempotent-consume

  • 相关阅读:
    手把手教你利用create-nuxt-app脚手架创建NuxtJS应用
    初识NuxtJS
    webpack打包Vue应用程序流程
    用选择器代替表格列的筛选功能
    Element-UI
    Spectral Bounds for Sparse PCA: Exact and Greedy Algorithms[贪婪算法选特征]
    Sparse Principal Component Analysis via Rotation and Truncation
    Generalized Power Method for Sparse Principal Component Analysis
    Sparse Principal Component Analysis via Regularized Low Rank Matrix Approximation(Adjusted Variance)
    Truncated Power Method for Sparse Eigenvalue Problems
  • 原文地址:https://www.cnblogs.com/linyb-geek/p/14088291.html
Copyright © 2011-2022 走看看