zoukankan      html  css  js  c++  java
  • Kafka 生产者 拦截器

    自定义生产者拦截器,完成以下两个功能:

    1. 在推送消息到Broker之前,在消息头部添加时间戳
    2. 在推送回调时,统计发送消息失败成功的条数

    eg: 定义时间戳拦截器

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     *
     *
     * 可能运行在多个线程中,用户自行保证线程安全
     *
     *
     */
    public class TimeStampPrependerInterceptor implements ProducerInterceptor<String,String> {
    
    
        //运行在用户主线程中,在消息被序列化之前调用,但最好不要改变消息所属的分区和Topic
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return new ProducerRecord<String, String>(record.topic(),
                    record.partition(),record.timestamp(),record.key(),
                    System.currentTimeMillis() + "," + record.value().toString());
        }
    
    
        //在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中,不要加入很重的逻辑
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
        //清理工作
        public void close() {
        }
        //初始化工作
        public void configure(Map<String, ?> configs) {
    
        }
    }

    自定义统计消息数的拦截器:

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class CounterInterceptor implements ProducerInterceptor<String,String> {
    
        private int errorCounter = 0;
        private int successCounter = 0;
    
    
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            return record;
        }
    
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            if(exception == null){
                successCounter ++;
            }else {
                errorCounter++;
            }
        }
    
        public void close() {
            System.out.println("successful sent :"  + successCounter);
            System.out.println("failed sent:" + errorCounter);
        }
    
        public void configure(Map<String, ?> configs) {
    
        }
    }

    主类添加拦截器配置:

    List<String> interceptors = new ArrayList<String>();
     interceptors.add("cn.org.fubin.producer.TimeStampPrependerInterceptor");
     interceptors.add("cn.org.fubin.producer.CounterInterceptor");
     properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
    
     String topic = "test-topic";
     Producer<String,String> producer = new KafkaProducer<String, String>(properties);
     for (int i = 0; i < 10; i++) {
         ProducerRecord<String,String> record = new ProducerRecord<String, String>(topic,"message"+i);
         producer.send(record).get();
     }

    运行推送消息程序之前先打开消费者,查看拦截器是否生效:

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test-topic

    消息如下:

    1558785539173,message0
    1558785539476,message1
    1558785539490,message2
    1558785539504,message3
    1558785539521,message4
    1558785539535,message5
    1558785539547,message6
    1558785539562,message7
    1558785539574,message8
    1558785539587,message9
  • 相关阅读:
    一 数据库备份与恢复 2 数据库恢复 2.2 数据库重定向与重建
    附录 常用SQL语句 Dynamic SQL
    alt_disk_install 克隆系统rootvg
    Mysql版本升级
    DB29.7 HADR环境升级
    EMC VNX系列存储维护
    保存最开始的flink code,  数据是自动生成而不是通过kafka
    opentsdb restful api使用方法
    flink 和 hbase的链接
    opentsdb
  • 原文地址:https://www.cnblogs.com/fubinhnust/p/11967874.html
Copyright © 2011-2022 走看看