zoukankan      html  css  js  c++  java
  • Kafka的接口回调 +自定义分区、拦截器

    一、接口回调+自定义分区

      1.接口回调:在使用消费者的send方法时添加Callback回调

    producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    if (recordMetadata!=null){
    System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
    }
    }
     2.自定义分区:定义类实现Patitioner接口,实现接口的方法:
       设置configure、分区逻辑partition(return 1;)、释放资源close、在生产者的配置过程中添加入分区属性。
     在定义生产者属性时添加分区的属性即可
    /**
     * @author: PrincessHug
     * @date: 2019/2/28, 16:24
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class PartitionDemo implements Partitioner {
        public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
            return 1;
        }
    
        public void close() {
    
        }
    
        public void configure(Map<String, ?> map) {
    
        }
    }
    
    public class ProducerDemo {
        public static void main(String[] args) {
            Properties prop = new Properties();
    
            //参数配置
            //kafka节点的地址
            prop.put("bootstrap.servers", "192.168.126.128:9092");
            //发送消息是否等待应答
            prop.put("acks", "all");
            //配置发送消息失败重试
            prop.put("retries", "0");
            //配置批量处理消息大小
            prop.put("batch.size", "10241");
            //配置批量处理数据延迟
            prop.put("linger.ms","5");
            //配置内存缓冲大小
            prop.put("buffer.memory", "12341235");
            //消息在发送前必须序列化
            prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("partitioner.class", "PartitionDemo");
    
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
    
            for (int i=10;i<100;i++){
                producer.send(new ProducerRecord<String, String>("xinnian", "20" + i + "年新年好!"), new Callback() {
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (recordMetadata!=null){
                            System.out.println(recordMetadata.topic()+"-----"+recordMetadata.offset()+"-----"+recordMetadata.partition());
                        }
                    }
                });
            }
            producer.close();
        }
    }
    

      注意:在自定义分区后,你的消费者会收不到消息,因为消费者默认接收的分区为0。

    二、拦截器

      1)创建生产者类;
         2)创建自定义拦截器类实现ProducerInterceptor接口,重写抽象方法;
         3)在业务逻辑方法ProducerRecord方法中,修改返回值,
            return new ProducerRecord<String,String>(
            record.topic(),
            record.partiiton(),
            record.key(),
            System.currentTimeMillis() + "-" + record.value() + "-" + record.topic());
         4)在生产者类中将自定义拦截器生效
           prop.put(ProducerConfig.INTERCEPTOR_CLASSEA_CONFIG,"com.wyh.com.wyh.kafka.interceptor.TimeInterceptor");
         5)运行生产者main方法,或者在linux端用shell测试。

    /**
     * @author: PrincessHug
     * @date: 2019/2/28, 20:59
     * @Blog: https://www.cnblogs.com/HelloBigTable/
     */
    public class TimeInterceptor implements ProducerInterceptor<String, String> {
    
        //业务逻辑
        public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
            return new ProducerRecord<String,String>(
                    producerRecord.topic(),
                    producerRecord.partition(),
                    producerRecord.key(),
                    System.currentTimeMillis()+"--"+producerRecord.value()
            );
        }
    
        //发送失败调用
        public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
    
        }
    
        //释放资源
        public void close() {
    
        }
    
        //获取配置信息
        public void configure(Map<String, ?> map) {
    
        }
    }
    
    public class ItctorProducer {
        public static void main(String[] args) {
            //配置生产者属性
            Properties prop = new Properties();
            //kafka节点的地址
            prop.put("bootstrap.servers", "192.168.126.128:9092");
            //发送消息是否等待应答
            prop.put("acks", "all");
            //配置发送消息失败重试
            prop.put("retries", "0");
            //配置批量处理消息大小
            prop.put("batch.size", "1024");
            //配置批量处理数据延迟
            prop.put("linger.ms","5");
            //配置内存缓冲大小
            prop.put("buffer.memory", "12341235");
            //消息在发送前必须序列化
            prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            //添加拦截器
            ArrayList<String> inList = new ArrayList<String>();
            inList.add("interceptor.TimeInterceptor");
            prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,inList);
    
            //实例化producer
            KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
    
            //发送消息
            for (int i=0;i<99;i++){
                producer.send(new ProducerRecord<String, String>("xinnian","You are genius!"+i));
            }
    
            //释放资源
            producer.close();
            
        }
    }

      
  • 相关阅读:
    算法之美_源码公布(5)
    SDL2源码分析2:窗体(SDL_Window)
    hdu5303Delicious Apples
    Android之怎样给ListView加入过滤器
    EntboostChat 0.9(越狱版)公布,iOS免费企业IM
    unix关于打包命令zip的使用
    用 query 方法 获得xml 节点的值
    用友ERP T6技术解析(六) 库龄分析
    [笔试题] 两个有趣的问题
    使用SecueCRT在本地主机与远程主机之间交互文件
  • 原文地址:https://www.cnblogs.com/HelloBigTable/p/10453884.html
Copyright © 2011-2022 走看看