zoukankan      html  css  js  c++  java
  • kafka producer 序列化& 反序列化 & 分区分配计算

    消息的序列化在 Interceptor 之后,分配分区之前执行。

    KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。

    ProducerRecord 包括

    private final String topic;//所要发送的topic
    private final Integer partition;//指定的partition序号
    private final Headers headers;//一组键值对,与RabbitMQ中的headers类似
    private final K key;//消息的key
    private final V value;//消息的value,即消息体
    private final Long timestamp;//消息的时间戳

    在KafkaProducer的源码中,计算分区时调用的是下面的partition()方法:

    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        return partition != null ?
                partition :
                partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
    }

    默认的分区提供者是 org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法实现如下:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    没有指定 key,以一种随机的方式转发。如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。

    可自定义分区函数:

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (null == keyBytes || keyBytes.length<1) {
                return atomicInteger.getAndIncrement() % numPartitions;
            }
            //借用String的hashCode的计算方式
            int hash = 0;
            for (byte b : keyBytes) {
                hash = 31 * hash + b;
            }
            return hash % numPartitions;
        }

     

  • 相关阅读:
    一些要学习的文章
    Android 应用检查更新并下载
    Android中如何下载文件并显示下载进度
    android IntentService和ResultReceiver的异步处理
    android 优秀图表库之MPAndroidChart
    android 仿QQ气泡聊天界面
    android canvas中rotate()和translate()两个方法详解
    android 透明弹出搜索框
    【转】ANDROID自定义视图——onLayout源码 流程 思路详解
    【转】ANDROID自定义视图——onMeasure,MeasureSpec源码 流程 思路详解
  • 原文地址:https://www.cnblogs.com/hansc-blog/p/9289815.html
Copyright © 2011-2022 走看看