zoukankan      html  css  js  c++  java
  • kafka-producer partitioner.class的使用

    partitioner.class的说明

     

    在API客户端中封装好的partition( )方法会为消息选择一个分区编号。为了保证消息负载均衡到每个分区,可以通过使用默认方式或者

    手动配置这个参数的方式使消息发布到topic中。

    1)如果默认不设置,就会通过计数器自增轮询的方式依次将消息分配到不同的分区上;

    2)如果设置此值,就可以通过自定义设置,进行自主计算,此文档主要是代码。

    JAVA代码配置了partitioner.class参数

    public class ProducerPartitioner implements Partitioner {
    	
    	private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<String, AtomicInteger>();
    	
    	public ProducerPartitioner(){
    		
    	}
    	@Override
    	public void configure(Map<String, ?> arg0) {
    		// TODO Auto-generated method stub
    	}
    	@Override
    	public void close() {
    		// TODO Auto-generated method stub
    	}
    	@Override
    	public int partition(String topic, Object key, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
    		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if(keyBytes == null) {
                int nextValue = this.nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if(availablePartitions.size() > 0) {
                    int part = Utils.abs(nextValue) % availablePartitions.size();
                    return ((PartitionInfo)availablePartitions.get(part)).partition();
                } else {
                    return Utils.abs(nextValue) % numPartitions;
                }
            } else {
                return Utils.abs(Utils.murmur2(keyBytes)) % numPartitions;
            }
    	}
    	   
    	private int nextValue(String topic) {
            AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
            if(null == counter) {
                counter = new AtomicInteger((new Random()).nextInt());
                AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
                if(currentCounter != null) {
                    counter = currentCounter;
                }
            }
            return counter.getAndIncrement();
        }
    
    }
    
  • 相关阅读:
    .NET、C#和ASP.NET,ASP.NET MVC 四者之间的区别
    Row_Number()显示行号
    iframe高度宽度自适应(转)
    Windows CMD命令大全(转)
    wndows系统命令总结
    删除ORACLE目录OCI.dll文件无法删除 (转)
    IE8兼容性调试及IE 8 css hack
    backbone学习笔记:视图(View)
    js中toFixed() 的使用(转)
    backbone学习笔记:集合(Collection)
  • 原文地址:https://www.cnblogs.com/boanxin/p/9145273.html
Copyright © 2011-2022 走看看