zoukankan      html  css  js  c++  java
  • 《二十:消费者端统计加工消息》

    自定义拦截器实现ProducerInterceptor
    
    package com.wangbiao.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 拦截改造消费的消息   记录改造value用竖线分割(下游自行解析)
     */
    public class TimeInterceptor implements ProducerInterceptor<String,String> {
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            String value = record.value();
            //记录改造value用竖线分割
            return new ProducerRecord(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+"|"+value);
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    统计消费了多少数据
    
    package com.wangbiao.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 统计拦截多少失败多少数据
     */
    public class CounterInterceptor implements ProducerInterceptor {
        public int success;
        public int error;
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
                   if (metadata!=null){
                       if(success==Integer.MAX_VALUE){ success=0; success++; }
                       success++;
                   }else {
                       if(error==Integer.MAX_VALUE){error=0; error++; }
                       error++;
                   }
        }
    
        @Override
        public void close() {
            System.out.println("success:"+success);
            System.out.println("error:"+error);
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    一点点学习,一丝丝进步。不懈怠,才不会被时代淘汰
  • 相关阅读:
    谈谈vertical-align的text-bottom和text-top
    【golang】代码学习
    【golang】json相关:unmarshal
    【tidb】相关的调研
    【php】sort函数整理
    【hive学习笔记1】-开始
    python2和python3区别
    python: 类型转换(int,long,float->string)
    【java】查找应用程序的资源
    【java】已经学习的部分
  • 原文地址:https://www.cnblogs.com/wangbiaohistory/p/15800169.html
Copyright © 2011-2022 走看看