zoukankan      html  css  js  c++  java
  • kafka 自定义分区类和拦截器

    自定义分区类

    生产者发送到对应的分区有以下几种方式:

    1指定了patition,则直接使用;(可以查阅对应的java api, 有多种参数)

    2未指定patition但指定key,通过对keyvalue进行hash出一个patition

    3patitionkey都未指定,使用轮询选出一个patition

    但是kafka提供了,自定义分区算法的功能,由业务手动实现分布:

    1、实现一个自定义分区类,CustomPartitioner实现Partitioner

    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    public class CustomPartitioner implements Partitioner {
    
        /**
         *
         * @param topic  当前的发送的topic
         * @param key    当前的key值
         * @param keyBytes  当前的key的字节数组
         * @param value  当前的value值
         * @param valueBytes  当前的value的字节数组
         * @param cluster
         * @return
         */
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            //这边根据返回值就是分区号, 这边就是固定发送到三号分区
            return 3;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    
    }

    2、producer配置文件指定,具体的分区类

    // 具体的分区类
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");

    技巧:可以使用ProducerConfig中提供的配置ProducerConfig

    kafka  producer拦截器

    拦截器(interceptor)是在Kafka 0.10版本被引入的。

    interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。

    许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)

    所使用的类为:

    org.apache.kafka.clients.producer.ProducerInterceptor

    我们可以编码测试下:

    1、定义消息拦截器,实现消息处理(可以是加时间戳等等,unid等等。)

    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    import java.util.Map;
    import java.util.UUID;
    
    public class MessageInterceptor implements ProducerInterceptor<String, String> {
    
        @Override
        public void configure(Map<String, ?> configs) {
            System.out.println("这是MessageInterceptor的configure方法");
        }
    
        /**
         * 这个是消息发送之前进行处理
         *
         * @param record
         * @return
         */
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            // 创建一个新的record,把uuid入消息体的最前部
            System.out.println("为消息添加uuid");
            return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
                    UUID.randomUUID().toString().replace("-", "") + "," + record.value());
        }
    
        /**
         *  这个是生产者回调函数调用之前处理
         * @param metadata
         * @param exception
         */
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            System.out.println("MessageInterceptor拦截器的onAcknowledgement方法");
        }
    
        @Override
        public void close() {
            System.out.println("MessageInterceptor close 方法");
        }
    }

    2、定义计数拦截器

    import java.util.Map;
    import org.apache.kafka.clients.producer.ProducerInterceptor;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    
    public class CounterInterceptor implements ProducerInterceptor<String, String>{
        private int errorCounter = 0;
        private int successCounter = 0;
    
        @Override
        public void configure(Map<String, ?> configs) {
            System.out.println("这是CounterInterceptor的configure方法");
        }
    
        @Override
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
            System.out.println("CounterInterceptor计数过滤器不对消息做任何操作");
            return record;
        }
    
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
            // 统计成功和失败的次数
            System.out.println("CounterInterceptor过滤器执行统计失败和成功数量");
            if (exception == null) {
                successCounter++;
            } else {
                errorCounter++;
            }
        }
    
        @Override
        public void close() {
            // 保存结果
            System.out.println("Successful sent: " + successCounter);
            System.out.println("Failed sent: " + errorCounter);
        }
    }

    3、producer客户端:

    import org.apache.kafka.clients.producer.*;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    public class Producer1 {
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            // Kafka服务端的主机名和端口号
            props.put("bootstrap.servers", "localhost:9092");
            // 等待所有副本节点的应答
            props.put("acks", "all");
            // 消息发送最大尝试次数
            props.put("retries", 0);
            // 一批消息处理大小
            props.put("batch.size", 16384);
            // 请求延时,可能生产数据太快了
            props.put("linger.ms", 1);
            // 发送缓存区内存大小,数据是先放到生产者的缓冲区
            props.put("buffer.memory", 33554432);
            // key序列化
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // value序列化
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // 具体的分区类
            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
            //定义拦截器
            List<String> interceptors = new ArrayList<>();
            interceptors.add("kafka.MessageInterceptor");
            interceptors.add("kafka.CounterInterceptor");
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            for (int i = 0; i < 1; i++) {
                producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        System.out.println("这是producer回调函数");
                    }
                });
            }
            /*System.out.println("现在执行关闭producer");
            producer.close();*/
            producer.close();
        }
    }

    总结,我们可以知道拦截器链各个方法的执行顺序,假如有A、B拦截器,在一个拦截器链中:

    (1)执行A的configure方法,执行B的configure方法

    (2)执行A的onSend方法,B的onSend方法

    (3)生产者发送完毕后,执行A的onAcknowledgement方法,B的onAcknowledgement方法。

    (4)执行producer自身的callback回调函数。

    (5)执行A的close方法,B的close方法。

  • 相关阅读:
    collections queue、os、datetime,序列化(json和pickle)模块
    re模块和正则
    模块介绍
    迭代器,生成器,生成器表达式,常用内置方法
    交互式shell和非交互式shell的区别
    /usr 的由来及/usr目录结
    Hadoop
    联通、联在中文机器上乱码问题
    正斜杠与反斜杠
    java中static关键字解析
  • 原文地址:https://www.cnblogs.com/chenmz1995/p/12896140.html
Copyright © 2011-2022 走看看