zoukankan      html  css  js  c++  java
  • kafka 自定义分区器

    package cn.xiaojf.kafka.producer;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.utils.Utils;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 自定义分区方式
     */
    public class CustomPartitioner implements Partitioner {
        private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap();
    
        public CustomPartitioner() {
        }
    
        public void configure(Map<String, ?> configs) {
        }
    
        /**
         * 自定义分区规则
         * @param topic
         * @param key
         * @param keyBytes
         * @param value
         * @param valueBytes
         * @param cluster
         * @return
         */
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            List partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            if(keyBytes == null) {
                int nextValue = this.nextValue(topic);
                List availablePartitions = cluster.availablePartitionsForTopic(topic);
                if(availablePartitions.size() > 0) {
                    int part = Utils.toPositive(nextValue) % availablePartitions.size();
                    return ((PartitionInfo)availablePartitions.get(part)).partition();
                } else {
                    return Utils.toPositive(nextValue) % numPartitions;
                }
            } else {
                return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
            }
        }
    
        private int nextValue(String topic) {
            AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic);
            if(null == counter) {
                counter = new AtomicInteger((new Random()).nextInt());
                AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter);
                if(currentCounter != null) {
                    counter = currentCounter;
                }
            }
    
            return counter.getAndIncrement();
        }
    
        public void close() {
        }
    }
    package cn.xiaojf.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.Cluster;
    import org.apache.kafka.common.PartitionInfo;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.common.utils.Utils;
    
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 消息生产者
     * @author xiaojf 2017/3/22 14:27
     */
    public class MsgProducer extends Thread {
    
        private final KafkaProducer<String, String> producer;
        private final String topic;
        private final boolean isAsync;
    
        public MsgProducer(String topic, boolean isAsync) {
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.59.130:9092");//broker 集群地址
            properties.put(ProducerConfig.CLIENT_ID_CONFIG, "MsgProducer");//自定义客户端id
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//key 序列号方式
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");//value 序列号方式
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CustomPartitioner.class.getCanonicalName());//自定义分区函数
    
    //        properties.load("properties配置文件");
    
            this.producer = new KafkaProducer<String, String>(properties);
            this.topic = topic;
            this.isAsync = isAsync;
        }
    
        @Override
        public void run() {
            int msgNo = 0;
    
            while (true) {
                String msg = "Msg: " + msgNo;
                String key = msgNo + "";
                if (isAsync) {//异步
                    producer.send(new ProducerRecord<String, String>(this.topic,msg));
    //                producer.send(new ProducerRecord<String, String>(this.topic, key, msg));
                } else {//同步
                    producer.send(new ProducerRecord<String, String>(this.topic, key, msg),
                            new MsgProducerCallback(System.currentTimeMillis(), key, msg));
                }
            }
        }
    
        /**
         * 消息发送后的回调函数
         */
        class MsgProducerCallback implements Callback {
    
            private final long startTime;
            private final String key;
            private final String msg;
    
            public MsgProducerCallback(long startTime, String key, String msg) {
                this.startTime = startTime;
                this.key = key;
                this.msg = msg;
            }
    
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                long elapsedTime = System.currentTimeMillis() - startTime;
                if (recordMetadata != null) {
                    System.out.println(msg + " be sended to partition no : " + recordMetadata.partition());
                }
            }
        }
    
        public static void main(String args[]) {
            new MsgProducer("my-replicated-topic",true).start();//开始发送消息
        }
    }
  • 相关阅读:
    MVC3 模板页页预留Section
    LINQ表达式总结笔记
    分布式事务管理器(MSDTC)的事务处理异常的排错
    ado。net的事物BeginTransaction demo
    TransactionScope类使用场景和方法介绍
    Linq中使用Left Join
    FullText Search in ASP.NET using Lucene.NET
    EF的BeginTransaction 用法
    mvc4 @Html.Partial,@Html.RenderPartial
    Android监听EditText内容变化
  • 原文地址:https://www.cnblogs.com/xiaojf/p/6602691.html
Copyright © 2011-2022 走看看