zoukankan      html  css  js  c++  java
  • Kafka--自定义分区器与拦截器

    首先了解KafkaProduce发送消息流程

    一、定义一个简单的生产者(不会写找源码抄--改)

    代码:

    public static void main(String[] args) {
    
            //producer的配置信息
            Properties props = new Properties();
    // 服务器的地址和端口 props.put("bootstrap.servers", "hadoop102:9092,hadoop103:9092,hadoop104:9092");
    // 接受服务端ack确认消息的参数,0,-1,1 props.put("acks", "all");
    // 如果接受ack超时,重试的次数 props.put("retries", 3);
    // sender一次从缓冲区中拿一批的数据量 props.put("batch.size", 16384);
    // 如果缓冲区中的数据不满足batch.size,只要和上次发送间隔了linger.ms也会执行一次发送 props.put("linger.ms", 1);
    // 缓存区的大小 props.put("buffer.memory", 33554432);
    //配置生产者使用的key-value的序列化器 props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // <key,value>,泛型,必须要和序列化器所匹配 Producer<Integer, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++){ producer.send(new ProducerRecord<Integer, String>("test1", i, "atguigu"+i)); }
    producer.close(); }

    二、自定义分区器

    注意事项:

    1、查找主题的分区数:找源码中是如何获取的,抄过来即可

    代码:

    public class MyPartitioner implements Partitioner {
    
        //为每个ProduceRecord计算分区号
        // 根据key的hashCode() % 分区数
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            //获取主题的分区数
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    
            int numPartitions = partitions.size();
    
            return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    
        // Producer执行close()方法时调用
        @Override
        public void close() {
    
        }
    
        // 从Producer的配置文件中读取参数,在partition之前调用
        @Override
        public void configure(Map<String, ?> configs) {

    } }

    2、在Produce中设置自定义分区器

    代码:

    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.custom.MyPartitioner");

    三、自定义拦截器

    注意事项:

    1、拦截器链的概念,是一个集合,需要把拦截器放入到一个集合中,先放入的先执行

    2、拦截器链:生产数据时,拦截一次,sender线程返回ack时,拦截一次

    代码:

    public class TimeStampInterceptor implements ProducerInterceptor<Integer,String> {
    
        //拦截数据
        @Override
        public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) {
    
            String newValue=System.currentTimeMillis()+"|"+record.value();
    
            return new ProducerRecord<Integer, String>(record.topic(),record.key(),newValue);
        }
    
        //当拦截器收到此条消息的ack时,会自动调用onAcknowledgement()
        @Override
        public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    
        }
    
        // Producer关闭时,调用拦截器的close()
        @Override
        public void close() {
    
        }
    
        //读取Producer中的配置
        @Override
        public void configure(Map<String, ?> configs) {
    
        }
    }

    3、在Produce中设置自定义拦截器

    代码:

        //拦截器链
            ArrayList<String> interCeptors = new ArrayList<>();
    
            // 添加的是全类名,注意顺序,先添加的会先执行
            interCeptors.add("com.atguigu.kafka.custom.TimeStampInterceptor");
            interCeptors.add("com.atguigu.kafka.custom.CounterInterceptor");
             //设置拦截器
            props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interCeptors);
  • 相关阅读:
    浏览器定位是如何实现的?为什么会有浏览器定位失败的情况?
    【高德地图API】如何设置Icon的imageSize?
    【地图API】地址录入时如何获得准确的经纬度?淘宝收货地址详解
    【高德地图API】如何设置Marker的offset?
    如何实现LBS轨迹回放功能?含多平台实现代码
    AgileEAS.NET SOA 中间件平台5.2版本下载、配置学习(二):配置WinClient分布式运行环境
    AgileEAS.NET SOA 中间件平台5.2版本下载、配置学习(一):下载平台并基于直连环境运行
    AgileEAS.NET SOA 中间件平台 5.2 发布说明-包含Silverlight及报表系统的开源代码下载
    AgileEAS.NET SOA 中间件平台.Net Socket通信框架-完整应用例子-在线聊天室系统-代码解析
    AgileEAS.NET SOA 中间件平台.Net Socket通信框架-完整应用例子-在线聊天室系统-下载配置
  • 原文地址:https://www.cnblogs.com/atBruce/p/12507781.html
Copyright © 2011-2022 走看看