zoukankan      html  css  js  c++  java
  • kafka producer interceptor拦截器(五)

      producer在发送数据时,会经过拦截器和序列化,最后到达相应的分区。在经过拦截器时,我们可以对发送的数据做进步的处理。

      要正确的使用拦截器需要以下步骤:

        1.实现拦截器ProducerInterceptor的方法

        2.在producer的prop中配置 

          prop.put("interceptor.classes", "com.xxx.interceptor.xxxInterceptor")

         如果是拦截器链的话,在后面追加即可

          prop.put("interceptor.classes", ""com.xxx.interceptor.xxxInterceptor1,com.xxx.interceptor.xxxInterceptor2");

    生产者的拦截器需要实现ProducerInterceptor接口中的方法来实现

      @Override
      public
    void configure(Map<String, ?> arg0) {}  #获取broker的配置信息
    @Override
    public void close() {}       #在producer关闭时调用此方法         
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {} #数据在写到broker时,无论是否成功的回调 @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {}  #拦截的信息

    生产者:

    public class ProducerDemo {
        
        private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);
        
        public static void main(String[] args) throws InterruptedException, ExecutionException {
          
            //1.加载配置信息
            Properties prop = loadProperties();
            
            //2.创建生产者
            KafkaProducer<String,String> producer = new KafkaProducer<>(prop);
            
            //3.发送内容
            String sendContent = "hello_kafka";
            
            IntStream.range(0, 10).forEach(i ->{
                try {
                    ProducerRecord<String,String> record = new ProducerRecord<>("test1",sendContent+"_"+i);
                    Future<RecordMetadata> future = producer.send(record);
                    RecordMetadata recordMetadata = future.get();
                    LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                
            });
                    
            producer.close();    //回调拦截器中的close方法
            
        }
            
        //配置文件的设置
        public static Properties loadProperties() {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
            prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("interceptor.classes", "com.zpb.interceptor.ProducerInterceptorDemo,com.zpb.interceptor.ProducerInterceptorDemo2");
            prop.put("acks", "all");    //发送到所有的ISR队列中
            return prop;
        }
    }

    拦截器一:

    public class ProducerInterceptorDemo implements ProducerInterceptor<String, String>{
        
        private static final Logger LOG = LoggerFactory.getLogger(ProducerInterceptorDemo.class);
    
        private volatile long succNum = 0;
        
        private volatile long failNum = 0;
        
        @Override
        public void configure(Map<String, ?> arg0) {
            LOG.info("configure ==>"+arg0);
        }
    
        @Override
        public void close() {
            double succRatio = succNum/succNum+failNum;
            LOG.info("成功率是:"+succRatio*100);
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
            
            if(null == e){
                succNum++;
            }else{
                failNum++;
            }
            
        }
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
            
            String prefixValue = producerRecord.value()+"prefix_1";
            
            return new ProducerRecord<String, String>(producerRecord.topic(),prefixValue);
        }
    }

    拦截器二:

    public class ProducerInterceptorDemo2 implements ProducerInterceptor<String, String>{
    
        @Override
        public void configure(Map<String, ?> configs) {
            
        }
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            String prefixValue = record.value()+"prefix_2";
            return new ProducerRecord<String, String>(record.topic(), prefixValue);
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            
        }
    
        @Override
        public void close() {
            
        }
    }
  • 相关阅读:
    asp.net导出数据到execl并保存到本地 不需要调用Office组件
    动态创建DataTable,GridView创建多表头,表头跨行或跨列合并,创建计算列及列内容自适应等
    Oracle内置SQL函数收集整理大全
    无比强大的GridView,表头固定,表体有滚动条可滚动
    很不错的asp.net文件上传类c# 搜索文件 移动文件 删除文件等
    【备用】非常不错的ASP操作数据库类,支持多数据库MSSQL,ACCESS,ORACLE,MYSQL等
    Asp.Net读取Execl常见问题收集
    经常用到的交叉表问题,一般用动态SQL能生成动态列
    C# asp.net中常见的字符串处理函数及数字格式化
    比较两个DataTable中不同的记录,且合并两个DataTable的列显示,有图
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11345447.html
Copyright © 2011-2022 走看看