zoukankan      html  css  js  c++  java
  • Kafka--自定义分区器与拦截器

    首先了解KafkaProduce发送消息流程

    一、定义一个简单的生产者(不会写找源码抄--改)

    代码:

    public static void main(String[] args) {
    
            //producer的配置信息
            Properties props = new Properties();
    // 服务器的地址和端口 props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    // 接受服务端ack确认消息的参数,0,-1,1 props.put("acks", "all");
    // 如果接受ack超时,重试的次数 props.put("retries", 3);
    // sender一次从缓冲区中拿一批的数据量 props.put("batch.size", 16384);
    // 如果缓冲区中的数据不满足batch.size,只要和上次发送间隔了linger.ms也会执行一次发送 props.put("linger.ms", 1);
    // 缓存区的大小 props.put("buffer.memory", 33554432);
    //配置生产者使用的key-value的序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // <key,value>,泛型,必须要和序列化器所匹配 Producer<Integer, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++){ producer.send(new ProducerRecord<Integer, String>("test1", i, "atguigu"+i)); }
    producer.close(); }

    二、自定义分区器

    注意事项:

    1、查找主题的分区数:找源码中是如何获取的,抄过来即可

    代码:

    public class MyPartitioner implements Partitioner {
    
        //为每个ProduceRecord计算分区号
        // 根据key的hashCode() % 分区数
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            //获取主题的分区数
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    
            int numPartitions = partitions.size();
    
            return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    
        // Producer执行close()方法时调用
        @Override
        public void close() {
    
        }
    
        // 从Producer的配置文件中读取参数,在partition之前调用
        @Override
        public void configure(Map<String, ?> configs) {

    } }

    2、在Produce中设置自定义分区器

    代码:

    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.custom.MyPartitioner");

    三、自定义拦截器

    注意事项:

    1、拦截器链的概念,是一个集合,需要把拦截器放入到一个集合中,先放入的先执行

    2、拦截器链:生产数据时,拦截一次,sender线程返回ack时,拦截一次

    代码:

    public class TimeStampInterceptor implements ProducerInterceptor<Integer,String> {
    
        //拦截数据
        @Override
        public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
    
            String newValue=System.currentTimeMillis()+"|"+record.value();
    
            return new ProducerRecord<Integer, String>(record.topic(),record.key(),newValue);
        }
    
        //当拦截器收到此条消息的ack时,会自动调用onAcknowledgement()
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
    
        // Producer关闭时,调用拦截器的close()
        @Override
        public void close() {
    
        }
    
        //读取Producer中的配置
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }

    3、在Produce中设置自定义拦截器

    代码:

        //拦截器链
            ArrayList<String> interCeptors = new ArrayList<>();
    
            // 添加的是全类名,注意顺序,先添加的会先执行
            interCeptors.add("com.atguigu.kafka.custom.TimeStampInterceptor");
            interCeptors.add("com.atguigu.kafka.custom.CounterInterceptor");
             //设置拦截器
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interCeptors);
  • 相关阅读:
    VS2010 自动跳过代码现象
    Reverse Linked List II 【纠结逆序!!!】
    Intersection of Two Linked Lists
    Linked List Cycle II
    Remove Nth Node From End of List 【另一个技巧,指针的指针】
    Swap Nodes in Pairs
    Merge Two Sorted Lists
    Remove Duplicates from Sorted List
    Linked List Cycle
    Dungeon Game
  • 原文地址:https://www.cnblogs.com/atBruce/p/12507781.html
Copyright © 2011-2022 走看看