zoukankan      html  css  js  c++  java
  • 《二十:消费者端统计加工消息》

    自定义拦截器实现ProducerInterceptor
    
    package com.wangbiao.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 拦截改造消费的消息   记录改造value用竖线分割(下游自行解析)
     */
    public class TimeInterceptor implements ProducerInterceptor<String,String> {
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            String value = record.value();
            //记录改造value用竖线分割
            return new ProducerRecord(record.topic(),record.partition(),record.key(),System.currentTimeMillis()+"|"+value);
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    统计消费了多少数据
    
    package com.wangbiao.kafka.interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    /**
     * 统计拦截多少失败多少数据
     */
    public class CounterInterceptor implements ProducerInterceptor {
        public int success;
        public int error;
        @Override
        public ProducerRecord onSend(ProducerRecord record) {
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
                   if (metadata!=null){
                       if(success==Integer.MAX_VALUE){ success=0; success++; }
                       success++;
                   }else {
                       if(error==Integer.MAX_VALUE){error=0; error++; }
                       error++;
                   }
        }
    
        @Override
        public void close() {
            System.out.println("success:"+success);
            System.out.println("error:"+error);
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }
    
    一点点学习,一丝丝进步。不懈怠,才不会被时代淘汰
  • 相关阅读:
    在阿里云centos7.6上部署vue.js2.6前端应用
    gRPC
    Docker
    ES6、ES7、ES8、ES9、ES10
    HTTPS工作原理 HTTP协议数据结构分析 HTTP和HTTPS协议的不同之处
    SpringBean的工作原理
    Nginx负载均衡高可用---架构
    集群的负载策略
    redis 和 memcached的区别
    Django的基础教程
  • 原文地址:https://www.cnblogs.com/wangbiaohistory/p/15800169.html
Copyright © 2011-2022 走看看