zoukankan      html  css  js  c++  java
  • Kafka 生产者 拦截器

    自定义生产者拦截器,完成以下两个功能:

    1. 在推送消息到Broker之前,在消息头部添加时间戳
    2. 在推送回调时,统计发送消息失败成功的条数

    eg: 定义时间戳拦截器

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     *
     *
     * 可能运行在多个线程中,用户自行保证线程安全
     *
     *
     */
    public class TimeStampPrependerInterceptor implements ProducerInterceptor<String,String> {
    
    
        //运行在用户主线程中,在消息被序列化之前调用,但最好不要改变消息所属的分区和Topic
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return new ProducerRecord<String, String>(record.topic(),
                    record.partition(),record.timestamp(),record.key(),
                    System.currentTimeMillis() + "," + record.value().toString());
        }
    
    
        //在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中,不要加入很重的逻辑
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
        //清理工作
        public void close() {
        }
        //初始化工作
        public void configure(Map<String, ?> configs) {
    
        }
    }

    自定义统计消息数的拦截器:

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class CounterInterceptor implements ProducerInterceptor<String,String> {
    
        private int errorCounter = 0;
        private int successCounter = 0;
    
    
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return record;
        }
    
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            if(exception == null){
                successCounter ++;
            }else {
                errorCounter++;
            }
        }
    
        public void close() {
            System.out.println("successful sent :"  + successCounter);
            System.out.println("failed sent:" + errorCounter);
        }
    
        public void configure(Map<String, ?> configs) {
    
        }
    }

    主类添加拦截器配置:

    List<String> interceptors = new ArrayList<String>();
     interceptors.add("cn.org.fubin.producer.TimeStampPrependerInterceptor");
     interceptors.add("cn.org.fubin.producer.CounterInterceptor");
     properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
    
     String topic = "test-topic";
     Producer<String,String> producer = new KafkaProducer<String, String>(properties);
     for (int i = 0; i < 10; i++) {
         ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,"message"+i);
         producer.send(record).get();
     }

    运行推送消息程序之前先打开消费者,查看拦截器是否生效:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test-topic

    消息如下:

    1558785539173,message0
    1558785539476,message1
    1558785539490,message2
    1558785539504,message3
    1558785539521,message4
    1558785539535,message5
    1558785539547,message6
    1558785539562,message7
    1558785539574,message8
    1558785539587,message9
  • 相关阅读:
    SEO
    Hack写法
    文学漫步
    [BZOJ4565] [Haoi2016] 字符合并
    [bzoj 3123][Sdoi2013]森林
    [UVA 12633] Super Rooks on Chessboard FFT+计数
    [HDU4609] 3-idiots FFT+计数
    [bzoj4554] [Tjoi2016&Heoi2016]游戏
    [bzoj4556] [Tjoi2016&Heoi2016]字符串
    [bzoj4552][Tjoi2016&Heoi2016]排序
  • 原文地址:https://www.cnblogs.com/fubinhnust/p/11967874.html
Copyright © 2011-2022 走看看