zoukankan      html  css  js  c++  java
  • kafka producer partitions分区器(七)

      消息在经过拦截器、序列化后,就需要确定它发往哪个分区,如果在ProducerRecord中指定了partition字段,那么就不再需要partitioner分区器进行分区了,如果没有指定,那么会根据key来将数据进行分区,如果partitioner和key都没有指定,那么就会采用默认的方式进行数据分区。

      有没有指定partition可以从源码中看出:

     public ProducerRecord(String topic, Integer partition, K key, V value) {}

    如果指定的partition,那就指定了数据发往哪个分区上,如果没有就会根据key来进行数据分区,如果2个都没有,那么会采用默认的分区策略来进行数据分区

    1.根据key进行分区

    public class CustomPartitioner {
        
        private static final Logger LOG = LoggerFactory.getLogger(CustomPartitioner.class);
        
        public static void main(String[] args) {
            //1.加载配置信息
            Properties prop = loadProperties();
            
            //2.创建生产者
            KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop);
            
            String sendContent = "hello_kafka";
            IntStream.range(0, 10).forEach(i ->{
                try {
                    ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i);  //topic key value
                    Future<RecordMetadata> future = producer.send(record);
                    RecordMetadata recordMetadata = future.get();
                    LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                
            });
            
        }
         //配置文件的设置
        public static Properties loadProperties() {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
            prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("acks", "all");    //发送到所有的ISR队列中
            return prop;
        }
    }

     2.自定义分区

      同样在使用自定义分区的时候,需要写实现类和在producer中配置引用

      我们在这个示例中,根据key来分区,key在序列化的时候用的是IntegerSerializer,在ProducerRecord中我们没有指定partition

      自定义分区器

    public class CustomPartition implements Partitioner{
    
        @Override
        public void configure(Map<String, ?> configs) {
            // TODO Auto-generated method stub
            
        }
    
        @SuppressWarnings({ "null", "unused" })
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            
            int partitionNum = cluster.partitionsForTopic(topic).size();
            int partition = (Integer)key%partitionNum;
            return key == null? 0:partition;
        }
    
        @Override
        public void close() {
            // TODO Auto-generated method stub
            
        }
    }

      生产者

    public class ProducerDemo {
        
        private static final Logger LOG = LoggerFactory.getLogger(ProducerDemo.class);
            
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            //1.加载配置信息
            Properties prop = loadProperties();
            
            //2.创建生产者
            KafkaProducer<Integer,String> producer = new KafkaProducer<>(prop);
            
            //3.发送内容
            String sendContent = "hello_kafka";
            IntStream.range(0, 10).forEach(i ->{
                try {
                    ProducerRecord<Integer,String> record = new ProducerRecord<>("test1",i,sendContent+"_"+i);
                    Future<RecordMetadata> future = producer.send(record);
                    RecordMetadata recordMetadata = future.get();
                    LOG.info("发送的数据是 :{},offset:是{},partition是:{}",sendContent,recordMetadata.offset(),recordMetadata.partition());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                
            });                
            producer.close();    //回调拦截器中的close方法
            
        }
            
        //配置文件的设置
        public static Properties loadProperties() {
            Properties prop = new Properties();
            prop.put("bootstrap.servers", "192.168.100.144:9092,192.168.100.145:9092,192.168.100.146:9092");
            prop.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
            prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            prop.put("partitioner.class", "com.zpb.partitioner.CustomPartition");
            prop.put("acks", "all");
            return prop;
        }
    }

     

  • 相关阅读:
    JavaScript 深入之从原型到原型链
    js重写内置的call、apply、bind
    js中flat方法的实现原理
    【我的物联网成长记6】由浅入深了解NB-IoT【华为云技术分享】
    GO富集分析示例【华为云技术分享】
    华为“方舟编译器”到底是啥?一文看懂TA如何让手机性能再突破【华为云技术分享】
    无码系列-7-代码的语文修养_上篇【华为云技术分享】
    机器学习笔记(一)----基本概念【华为云技术分享】
    性能达到原生 MySQL 七倍,华为云 Taurus 技术解读【华为云技术分享】
    【立即报名】人脸情绪识别案例分享【华为云技术分享】
  • 原文地址:https://www.cnblogs.com/MrRightZhao/p/11345846.html
Copyright © 2011-2022 走看看