zoukankan      html  css  js  c++  java
  • partition生成规则

    partition生成规则

    • 不指定key

      private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
              TopicPartition tp = null;
      
              try {
                  this.throwIfProducerClosed();
                  long nowMs = this.time.milliseconds();
      
                  KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
                  try {
                      clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
                  } catch (KafkaException var22) {
                      if (this.metadata.isClosed()) {
                          throw new KafkaException("Producer closed while send in progress", var22);
                      }
      
                      throw var22;
                  }
      
                  nowMs += clusterAndWaitTime.waitedOnMetadataMs;
                  long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
                  Cluster cluster = clusterAndWaitTime.cluster;
      
                  byte[] serializedKey;
                  try {
                      serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
                  } catch (ClassCastException var21) {
                      throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
                  }
      
                  byte[] serializedValue;
                  try {
                      serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
                  } catch (ClassCastException var20) {
                      throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
                  }
      
                  int partition = this.partition(record, serializedKey, serializedValue, cluster);
                  tp = new TopicPartition(record.topic(), partition);
                  this.setReadOnly(record.headers());
                  Header[] headers = record.headers().toArray();
                  int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
                  this.ensureValidRecordSize(serializedSize);
                  long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
                  if (this.log.isTraceEnabled()) {
                      this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
                  }
      
                  Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
                  if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                      this.transactionManager.failIfNotReadyForSend();
                  }
      
                  RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
                  if (result.abortForNewBatch) {
                      int prevPartition = partition;
                      this.partitioner.onNewBatch(record.topic(), cluster, partition);
                      partition = this.partition(record, serializedKey, serializedValue, cluster);
                      tp = new TopicPartition(record.topic(), partition);
                      if (this.log.isTraceEnabled()) {
                          this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
                      }
      
                      interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
                      result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
                  }
      
                  if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                      this.transactionManager.maybeAddPartitionToTransaction(tp);
                  }
      
                  if (result.batchIsFull || result.newBatchCreated) {
                      this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                      this.sender.wakeup();
                  }
      
                  return result.future;
              } catch (ApiException var23) {
                  this.log.debug("Exception occurred during message send:", var23);
                  if (callback != null) {
                      callback.onCompletion((RecordMetadata)null, var23);
                  }
      
                  this.errors.record();
                  this.interceptors.onSendError(record, tp, var23);
                  return new KafkaProducer.FutureFailure(var23);
              } catch (InterruptedException var24) {
                  this.errors.record();
                  this.interceptors.onSendError(record, tp, var24);
                  throw new InterruptException(var24);
              } catch (KafkaException var25) {
                  this.errors.record();
                  this.interceptors.onSendError(record, tp, var25);
                  throw var25;
              } catch (Exception var26) {
                  this.interceptors.onSendError(record, tp, var26);
                  throw var26;
              }
          }
      
      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
              return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
          }
      
      public int partition(String topic, Cluster cluster) {
              Integer part = (Integer)this.indexCache.get(topic);
              return part == null ? this.nextPartition(topic, cluster, -1) : part;
          }
      
      public int nextPartition(String topic, Cluster cluster, int prevPartition) {
              List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
              Integer oldPart = (Integer)this.indexCache.get(topic);
              Integer newPart = oldPart;
              if (oldPart != null && oldPart != prevPartition) {
                  return (Integer)this.indexCache.get(topic);
              } else {
                  List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                  Integer random;
                  if (availablePartitions.size() < 1) {
                      random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                      newPart = random % partitions.size();
                  } else if (availablePartitions.size() == 1) {
                      newPart = ((PartitionInfo)availablePartitions.get(0)).partition();
                  } else {
                      while(newPart == null || newPart.equals(oldPart)) {
                          random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
                          newPart = ((PartitionInfo)availablePartitions.get(random % availablePartitions.size())).partition();
                      }
                  }
      
                  if (oldPart == null) {
                      this.indexCache.putIfAbsent(topic, newPart);
                  } else {
                      this.indexCache.replace(topic, prevPartition, newPart);
                  }
      
                  return (Integer)this.indexCache.get(topic);
              }
          }
      
      

    由上可知,如果没有指定key 会根据topic ,headers,value 从现有的partition列表中随机选一个,但是一但选定(也就是存在prepartition)就直接用原来的。

    • 指定key

      public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
              return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
          }
      public static int murmur2(byte[] data) {
              int length = data.length;
              int seed = -1756908916;
              int m = 1540483477;
              int r = true;
              int h = seed ^ length;
              int length4 = length / 4;
      
              for(int i = 0; i < length4; ++i) {
                  int i4 = i * 4;
                  int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24);
                  k *= 1540483477;
                  k ^= k >>> 24;
                  k *= 1540483477;
                  h *= 1540483477;
                  h ^= k;
              }
      
              switch(length % 4) {
              case 3:
                  h ^= (data[(length & -4) + 2] & 255) << 16;
              case 2:
                  h ^= (data[(length & -4) + 1] & 255) << 8;
              case 1:
                  h ^= data[length & -4] & 255;
                  h *= 1540483477;
              default:
                  h ^= h >>> 13;
                  h *= 1540483477;
                  h ^= h >>> 15;
                  return h;
              }
          }
      
      public static int toPositive(int number) {
              return number & 2147483647;
          }
      
    • 指定partition,刚直接使用指定的partition,但是要尽量不要把所有的,特别是大量数据的情况放,放入同一个partition,不然可以导致kafka死掉。

    作者:Ants_double

    出处:https://www.cnblogs.com/ants_double/

    本文版权归作者和博客园所有,欢迎转载。转载请在留言板处留言给我,且在文章标明原文链接,谢谢!

    如果您觉得本篇博文对您有所收获,觉得我还算用心,请点击右下角的 [大拇指],谢谢!

  • 相关阅读:
    单位根反演
    安卓第十三天笔记-服务(Service)
    安卓第十二天笔记-广播
    安卓第十一天笔记-Intent与inter-filter配置
    安卓第十天笔记-fragment
    安卓第九天笔记-Activity
    安卓第八天笔记--网络编程二
    安卓第七天笔记--网络编程一
    安卓第六天笔记--ListView
    安卓第五天笔记-对话框
  • 原文地址:https://www.cnblogs.com/ants_double/p/15786102.html
Copyright © 2011-2022 走看看