zoukankan      html  css  js  c++  java
  • kafka可插拔增强如何实现?

    image.png

    导弹拦截,精准防御。

    背景

    拦截器:在不修改应用程序业务逻辑的情况下,一组基于事件的可插拔的逻辑处理链;
    类比springMVC的拦截器:

    image.png

    file

    这些都是通过配置拦截器,插入到应用程序中,实现可插拔的修改业务逻辑;

    kafka在0.10.0.0版本中开始引入拦截器。分为生产者拦截器和消费者拦截器,类似责任链的方式编排多个拦截器为一个大拦截器。

    配置方法:配置参数

    
    Properties props = new Properties();
    List<String> interceptors = new ArrayList<>();
    interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
    interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
    ……
    

    file

    注意: 配置拦截器需要制定拦截器的全限定名,并且保证生产者或者消费者客户端能够正确加载到配置的拦截器;

    file

    通过拦截器实现,强制让所有的生产者,消费者配置该拦截器,实现消息审计的功能; |

    生产者拦截器

    拦截器需要实现org.apache.kafka.clients.producer.ProducerInterceptor

    file

    消费者拦截器

    org.apache.kafka.clients.consumer.ConsumerInterceptor

    file

    实操

    实现端到端的性能监控:

    处理过程:

    file

    生产者代码:

    public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
    
    
        private Jedis jedis; // 省略Jedis初始化
    
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            jedis.incr("totalSentMessage");
            return record;
        }
    
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        }
    
    
        @Override
        public void close() {
        }
    
    
        @Override
        public void configure(Map<java.lang.String, ?> configs) {
        }
    

    消费者代码:

    public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    
    
        private Jedis jedis; //省略Jedis初始化
    
    
        @Override
        public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
            long lantency = 0L;
            for (ConsumerRecord<String, String> record : records) {
                lantency += (System.currentTimeMillis() - record.timestamp());
            }
            jedis.incrBy("totalLatency", lantency);
            long totalLatency = Long.parseLong(jedis.get("totalLatency"));
            long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
            jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
            return records;
        }
    
    
        @Override
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        }
    
    
        @Override
        public void close() {
        }
    
    
        @Override
        public void configure(Map<String, ?> configs) 
    

    配置到拦截器到对应的生产者和消费者对象,即简单的实现了平均消息延时的端到端性能统计。

    小结

    类比AOP是Spring提供的核心功能,即面向切面编程,可以把跟业务逻辑无关的安全,审计,性能相关功能放到切面增强中实现。
    对Kafka进行一些可插拔的功能增强可以通过拦截器实现。

    本篇介绍了kafka的拦截器的使用方法,以及通过实例展示了具体的用法,希望对团队使用的kafka做一些增强功能的时候可以利用这个点去扩展。

    image.png

    原创不易,关注诚可贵,转发价更高!转载请注明出处,让我们互通有无,共同进步,欢迎沟通交流。
    我会持续分享Java软件编程知识和程序员发展职业之路,欢迎关注,我整理了这些年编程学习的各种资源,关注公众号‘李福春持续输出’,发送'学习资料'分享给你!

  • 相关阅读:
    HttpClient使用
    十九、springboot使用@ControllerAdvice(二)之深入理解
    如何同步删除svn管理的package包目录
    在使用FastJson开发遇到的的坑
    解决tomcat端口被占用:Port 8005 required by Tomcat v7.0 Server at localhost is already in use
    SpringBoot使用Mybatis-Generator
    SpringCloud Gateway入门
    使用Nginx部署静态网站
    SpringBoot使用Jsp
    SpringBoot应用War包形式部署到外部Tomcat
  • 原文地址:https://www.cnblogs.com/snidget/p/12836056.html
Copyright © 2011-2022 走看看