zoukankan      html  css  js  c++  java
  • 《二十:消费者端统计加工消息》

    自定义拦截器实现ProducerInterceptor
    
    package com.wangbiao.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 拦截改造消费的消息   记录改造value用竖线分割(下游自行解析)
     */
    public class TimeInterceptor implements ProducerInterceptor<String,String> {
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            String value = record.value();
            //记录改造value用竖线分割
            return new ProducerRecord(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+"|"+value);
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    统计消费了多少数据
    
    package com.wangbiao.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 统计拦截多少失败多少数据
     */
    public class CounterInterceptor implements ProducerInterceptor {
        public int success;
        public int error;
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
                   if (metadata!=null){
                       if(success==Integer.MAX_VALUE){ success=0; success++; }
                       success++;
                   }else {
                       if(error==Integer.MAX_VALUE){error=0; error++; }
                       error++;
                   }
        }
    
        @Override
        public void close() {
            System.out.println("success:"+success);
            System.out.println("error:"+error);
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    一点点学习,一丝丝进步。不懈怠,才不会被时代淘汰
  • 相关阅读:
    2018-8-18 训练神经网络笔记
    ffmpeg解码视频为图片和将图片合成一个MP4视频
    minikube start error
    按顺序将目录下的所有文件的绝对路径写入文件中
    ssh远程免密登录
    Ubuntu默认的awk一直报语法错误
    ffmpeg常用操作
    ssh免密登录server
    cv::namedWindow是非线程安全的
    lingcrypt源码安装undefined reference to ...
  • 原文地址:https://www.cnblogs.com/wangbiaohistory/p/15800169.html
Copyright © 2011-2022 走看看