zoukankan      html  css  js  c++  java
  • 《二十:消费者端统计加工消息》

    自定义拦截器实现ProducerInterceptor
    
    package com.wangbiao.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 拦截改造消费的消息   记录改造value用竖线分割(下游自行解析)
     */
    public class TimeInterceptor implements ProducerInterceptor<String,String> {
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            String value = record.value();
            //记录改造value用竖线分割
            return new ProducerRecord(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+"|"+value);
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    统计消费了多少数据
    
    package com.wangbiao.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 统计拦截多少失败多少数据
     */
    public class CounterInterceptor implements ProducerInterceptor {
        public int success;
        public int error;
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
                   if (metadata!=null){
                       if(success==Integer.MAX_VALUE){ success=0; success++; }
                       success++;
                   }else {
                       if(error==Integer.MAX_VALUE){error=0; error++; }
                       error++;
                   }
        }
    
        @Override
        public void close() {
            System.out.println("success:"+success);
            System.out.println("error:"+error);
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    一点点学习,一丝丝进步。不懈怠,才不会被时代淘汰
  • 相关阅读:
    Codeforces Beta Round #9 (Div. 2 Only) C. Hexadecimal's Numbers dfs
    Codeforces Beta Round #9 (Div. 2 Only) B. Running Student 水题
    Codeforces Beta Round #9 (Div. 2 Only) A. Die Roll 水题
    51nod 1035 最长的循环节 数学
    BSGS 模板
    51nod 1040 最大公约数之和 欧拉函数
    51NOD 1179 最大的最大公约数 筛法
    BZOJ 2818: Gcd 筛法
    川大校赛总结
    SCOJ 4484 The Graver Robbers' Chronicles 后缀自动机
  • 原文地址:https://www.cnblogs.com/wangbiaohistory/p/15800169.html
Copyright © 2011-2022 走看看