partitioner.class的说明
在API客户端中封装好的partition( )方法会为消息选择一个分区编号。为了保证消息负载均衡到每个分区,可以通过使用默认方式或者
手动配置这个参数的方式使消息发布到topic中。
1)如果默认不设置,就会通过计数器自增轮询的方式依次将消息分配到不同的分区上;
2)如果设置此值,就可以通过自定义设置,进行自主计算,此文档主要是代码。
JAVA代码配置了partitioner.class参数
public class ProducerPartitioner implements Partitioner { private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<String, AtomicInteger>(); public ProducerPartitioner(){ } @Override public void configure(Map<String, ?> arg0) { // TODO Auto-generated method stub } @Override public void close() { // TODO Auto-generated method stub } @Override public int partition(String topic, Object key, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if(keyBytes == null) { int nextValue = this.nextValue(topic); List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if(availablePartitions.size() > 0) { int part = Utils.abs(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { return Utils.abs(nextValue) % numPartitions; } } else { return Utils.abs(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic); if(null == counter) { counter = new AtomicInteger((new Random()).nextInt()); AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter); if(currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } }