zoukankan      html  css  js  c++  java
  • kafka producer 序列化& 反序列化 & 分区分配计算

    消息的序列化在 Interceptor 之后,分配分区之前执行。

    KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。

    ProducerRecord 包括

    private final String topic;//所要发送的topic
    private final Integer partition;//指定的partition序号
    private final Headers headers;//一组键值对,与RabbitMQ中的headers类似
    private final K key;//消息的key
    private final V value;//消息的value,即消息体
    private final Long timestamp;//消息的时间戳

    在KafkaProducer的源码中,计算分区时调用的是下面的partition()方法:

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

    默认的分区提供者是 org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法实现如下:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    没有指定 key,以一种随机的方式转发。如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。

    可自定义分区函数:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (null == keyBytes || keyBytes.length<1) {
                return atomicInteger.getAndIncrement() % numPartitions;
            }
            //借用String的hashCode的计算方式
            int hash = 0;
            for (byte b : keyBytes) {
                hash = 31 * hash + b;
            }
            return hash % numPartitions;
        }

     

  • 相关阅读:
    Lua环境
    WebKit
    Net线程间通信的异步机制
    Cucumber入门1 传统流程下的使用
    Windows Server 2008中安装IIS7.0
    WebCore
    百度云计算平台Python环境试用
    认识ASP.NET MVC的5种AuthorizationFilter
    浅谈java中常见的排序
    go语言中goroutine的使用
  • 原文地址:https://www.cnblogs.com/hansc-blog/p/9289815.html
Copyright © 2011-2022 走看看