zoukankan      html  css  js  c++  java
  • kafka-producer partitioner.class的使用

    partitioner.class的说明

     

    在API客户端中封装好的partition( )方法会为消息选择一个分区编号。为了保证消息负载均衡到每个分区,可以通过使用默认方式或者

    手动配置这个参数的方式使消息发布到topic中。

    1)如果默认不设置,就会通过计数器自增轮询的方式依次将消息分配到不同的分区上;

    2)如果设置此值,就可以通过自定义设置,进行自主计算,此文档主要是代码。

    JAVA代码配置了partitioner.class参数

    public class ProducerPartitioner implements Partitioner {
    	
    	private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<String, AtomicInteger>();
    	
    	public ProducerPartitioner(){
    		
    	}
    	@Override
    	public void configure(Map<String, ?> arg0) {
    		// TODO Auto-generated method stub
    	}
    	@Override
    	public void close() {
    		// TODO Auto-generated method stub
    	}
    	@Override
    	public int partition(String topic, Object key, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
    		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if(keyBytes == null) {
                int nextValue = this.nextValue(topic);
                List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
                if(availablePartitions.size() > 0) {
                    int part = Utils.abs(nextValue) % availablePartitions.size();
                    return ((PartitionInfo)availablePartitions.get(part)).partition();
                } else {
                    return Utils.abs(nextValue) % numPartitions;
                }
            } else {
                return Utils.abs(Utils.murmur2(keyBytes)) % numPartitions;
            }
    	}
    	   
    	private int nextValue(String topic) {
            AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
            if(null == counter) {
                counter = new AtomicInteger((new Random()).nextInt());
                AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
                if(currentCounter != null) {
                    counter = currentCounter;
                }
            }
            return counter.getAndIncrement();
        }
    
    }
    
  • 相关阅读:
    linux的vim按了ctrl+s之后假死的解决办法
    linux下的终端模拟器urxvt的配置
    vim下正则表达式的非贪婪匹配
    linux中的一个看图的软件
    解决windows的控制台显示utf8乱码的问题
    [PHP][位转换积累]之异或运算的简单加密应用
    [PHP][REDIS]phpredis 'RedisException' with message 'read error on connection'
    [PHP][位转换积累]之与运算截取二进制流的值
    [PHP][位转换积累]之pack和unpack
    [正则表达式]PCRE反向分组引用
  • 原文地址:https://www.cnblogs.com/boanxin/p/9145273.html
Copyright © 2011-2022 走看看