zoukankan      html  css  js  c++  java
  • kafka default partitioner java版本和scala版本的不同

    scala

    import kafka.utils._
    
    class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
      private val random = new java.util.Random
      
      def partition(key: Any, numPartitions: Int): Int = {
        Utils.abs(key.hashCode) % numPartitions
      }
    }
    

      

    java

    public class DefaultPartitioner implements Partitioner {
    
        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
    
        public void configure(Map<String, ?> configs) {}
    
        /**
         * Compute the partition for the given record.
         *
         * @param topic The topic name
         * @param key The key to partition on (or null if no key)
         * @param keyBytes serialized key to partition on (or null if no key)
         * @param value The value to partition on or null
         * @param valueBytes serialized value to partition on or null
         * @param cluster The current cluster metadata
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if (keyBytes == null) {
                int nextValue = nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if (availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return availablePartitions.get(part).partition();
                } else {
                    // no partitions are available, give a non-available partition
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                // hash the keyBytes to choose a partition
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    

      

    我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=3hs4tlnma0ao0

  • 相关阅读:
    continue用法
    break用法
    VLAN的划分
    子网掩码的计算
    简述RIP路由协议和OSPF路由协议的相同点和不同点。
    工程监理的内容是什么?
    工程监理的意义和职责是什么?
    双绞线测试的参数主要有哪些?
    光纤熔接损耗原因?
    综合布线系统的设计等级有哪几种?各有什么特点?
  • 原文地址:https://www.cnblogs.com/dongxiao-yang/p/11400050.html
Copyright © 2011-2022 走看看