zoukankan      html  css  js  c++  java
  • Kafka 自定义拦截器ProducerInterceptor

    案例:

    自定义两个Producer 连接器,一个在消息上添加时间戳,一个统计消息成功失败个数(当然这两个拦截器可以写在一块,我们现在分两个拦截器来实现)。

    1:Pom文件 添加如下代码.(slf4j 依赖 是为了去编译警告的。可以不加) 

        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.25</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>

    2:编写TimeInterceptor  拦截器

    package com.Interceptor;
    
    
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    
    public class TimeInterceptor  implements ProducerInterceptor<String,String> {
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
    
            //读取 数据
            String value = producerRecord.value();
            value = System.currentTimeMillis() + value;
            //穿件一个新的ProducerRecord对象
            return  new ProducerRecord<String, String>(producerRecord.topic(),producerRecord.partition(),producerRecord.key(),value);
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
    
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }

    3:编写成功统计拦截器

    package com.Interceptor;
    
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    
    public class CounterInterceptor implements ProducerInterceptor<String,String>  {
        private  int success;
        private  int errors;
        @Override
        public ProducerRecord onSend(ProducerRecord producerRecord) {
    
            return producerRecord;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
            if(recordMetadata !=null)
            {
                success++;
            }
            else
            {
                errors++;
            }
    
        }
    
        @Override
        public void close() {
            System.out.println("Success : " + success);
            System.out.println("Errors : " + errors);
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    }

    4:添加拦截器到生产者中.

    package com.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class InterceptorProducer {
        public static void main(String[] args) {
            //Create kafka 生产者配置信息
            Properties properties = new Properties();
            //kafka 集群, broker list
            properties.put("bootstrap.servers", "hadoop101:9092");
            properties.put("acks", "all");
            //重试次数
            properties.put("retries", 1);
            //批次大小
            properties.put("batch.size", 16384);
            //等待时间
            properties.put("linger.ms", 1);
            //RecordAccumulator 缓冲区大小 32M
            properties.put("buffer.memory", 33554432);
            // key value 的序列化类
            properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    
            //添加多个拦截器
            ArrayList<String> interceptors=  new ArrayList<String> ();
            interceptors.add("com.Interceptor.TimeInterceptor");
            interceptors.add("com.Interceptor.CounterInterceptor");
            properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);
            //创建生产者对象
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    
            //发送数据
            for(int i = 11 ;i <= 20;i++)
            {
                producer.send(new ProducerRecord<String, String>("bbb","Kafkakafka--"+i));
            }
    
            //关闭连接
            producer.close();
    
        }
    }

    5:启动消费者

    bin/kafka-console-consumer.sh  --zookeeper hadoop101:2181 --topic bbb

    6:启动生产者,查看运行结果.

     

  • 相关阅读:
    GCC内置函数
    父类子类的拷贝构造与赋值
    外传三 动态内存申请的结果
    外传二 函数的异常规格说明
    外传一 异常处理深度解析
    第69课 技巧,自定义内存管理
    第68课 拾遗,令人迷惑的写法
    第67课 经典问题解析五
    第66课 C++中的类型识别
    第65课 C++中的异常处理(下)
  • 原文地址:https://www.cnblogs.com/kpwong/p/13780541.html
Copyright © 2011-2022 走看看