zoukankan      html  css  js  c++  java
  • kafka指定partiton生产

    kafka发送一个消息的时候需要封装成一个ProducerRecord :

    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
            if (topic == null)
                throw new IllegalArgumentException("Topic cannot be null");
            if (timestamp != null && timestamp < 0)
                throw new IllegalArgumentException("Invalid timestamp " + timestamp);
            this.topic = topic;
            this.partition = partition;
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
        }
    

    我们需要关注的是partition和key。

    kafka在调用send的时候实际上是将消息放到了内存中,并没有发送出去。在放到内存队列之前,会计算消息应该放到哪个partiton中

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        // 忽略
        int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); // partiton用来计算书消息具体放置的partiton
        //忽略
        if (result.batchIsFull || result.newBatchCreated) {
                    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                    this.sender.wakeup();
                }
                return result.future;
        // 忽略
    }
    

    我们来分析下partiton方法:

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {
            Integer partition = record.partition(); // ProducerRecord中partiton参数
    
            if (partition != null) {
                List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
                int lastPartition = partitions.size() - 1;
                // they have given us a partition, use it
                if (partition < 0 || partition > lastPartition) {
                    throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));
                }
                return partition; // 指定了partiton,则消息发送到该指定的partiton
            }
    
            // 否则使用partitioner根据ProducerRecord的key参数来计算发送的partiton
            return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue,
                cluster);
        }
    

    可以通过在配置中指定“partitioner.class”配置项使用自定义的partitioner,自定义的partitioner需要实现Partitioner接口:

    public interface Partitioner extends Configurable {
    
        /**
         * Compute the partition for the given record.
         *
         * @param topic The topic name
         * @param key The key to partition on (or null if no key)
         * @param keyBytes The serialized key to partition on( or null if no key)
         * @param value The value to partition on or null
         * @param valueBytes The serialized value to partition on or null
         * @param cluster The current cluster metadata
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
    
        /**
         * This is called when partitioner is closed.
         */
        public void close();
    
    }
    

    如果没有指定“partitioner.class”配置项则使用默认的partitioner:DefaultPartitioner。我们来看下DefaultPartitioner的分配方法

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); // 获取partiton列表,该列表是更新metadata的时候获取的,默认每30s更新一次metadata
            int numPartitions = partitions.size();
            if (keyBytes == null) { // 如果ProducerRecord没有传入key,则从一个随机数开始,采用round-robin方式
                int nextValue = counter.getAndIncrement(); // counter被初始化为一个随机值,每次递增
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
                    // no partitions are available, give a non-available partition
                    return DefaultPartitioner.toPositive(nextValue) % numPartitions;
                }
            } else { // 对 keyBytes 进行 hash 选出一个 patition
                // hash the keyBytes to choose a partition
                return DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
  • 相关阅读:
    【数据结构与算法】用go语言实现数组结构及其操作
    ElasticSearch搜索引擎
    【系统】Libevent库和Libev
    pod管理调度约束、与健康状态检查
    使用yaml配置文件管理资源
    Oracle中exists替代in语句
    【oracle】通过存储过程名查看package名
    解决Flink消费Kafka信息,结果存储在Mysql的重复消费问题
    利用Flink消费Kafka数据保证全局有序
    Oracle 字符集的查看和修改
  • 原文地址:https://www.cnblogs.com/set-cookie/p/9126307.html
Copyright © 2011-2022 走看看