zoukankan      html  css  js  c++  java
  • kafka2.5.0分区再均衡监听器java例子

    什么是分区再均衡:

    如果该topic的分区大于1,那么生产者生产的数据存放到哪个分区,完全取决于key值,比如key=A,那么存到分区0,key=B,那么存到分区1,如果key为null,那么负载均衡存储到每个分区!

    分区再均衡监听器代码:

    import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import java.util.Collection;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 类说明:再均衡监听器
     */
    public class HandlerRebalance implements ConsumerRebalanceListener {
    
        /*模拟一个保存分区偏移量的数据库表*/
        public final static ConcurrentHashMap<TopicPartition,Long>
                partitionOffsetMap = new ConcurrentHashMap<TopicPartition,Long>();
    
        private final Map<TopicPartition, OffsetAndMetadata> currOffsets;
        private final KafkaConsumer<String,String> consumer;
        //private final Transaction  tr事务类的实例
    
        public HandlerRebalance(Map<TopicPartition, OffsetAndMetadata> currOffsets,
                                KafkaConsumer<String, String> consumer) {
            this.currOffsets = currOffsets;
            this.consumer = consumer;
        }
    
        //分区再均衡之前
        public void onPartitionsRevoked(
                Collection<TopicPartition> partitions) {
            final String id = Thread.currentThread().getId()+"";
            System.out.println(id+"-onPartitionsRevoked参数值为:"+partitions);
            System.out.println(id+"-服务器准备分区再均衡,提交偏移量。当前偏移量为:"
                    +currOffsets);
            //我们可以不使用consumer.commitSync(currOffsets);
            //提交偏移量到kafka,由我们自己维护*/
            //开始事务
            //偏移量写入数据库
            System.out.println("分区偏移量表中:"+partitionOffsetMap);
            for(TopicPartition topicPartition:partitions){
                partitionOffsetMap.put(topicPartition,
                        currOffsets.get(topicPartition).offset());
            }
            consumer.commitSync(currOffsets);
            //提交业务数和偏移量入库  tr.commit
        }
    
        //分区再均衡完成以后
        public void onPartitionsAssigned(
                Collection<TopicPartition> partitions) {
            final String id = Thread.currentThread().getId()+"";
            System.out.println(id+"-再均衡完成,onPartitionsAssigned参数值为:"+partitions);
            System.out.println("分区偏移量表中:"+partitionOffsetMap);
            for(TopicPartition topicPartition:partitions){
                System.out.println(id+"-topicPartition"+topicPartition);
                //模拟从数据库中取得上次的偏移量
                Long offset = partitionOffsetMap.get(topicPartition);
                if(offset==null) continue;
                //TODO 从特定偏移量处开始记录 (从指定分区中的指定偏移量开始消费)
                //TODO 这样就可以确保分区再均衡中的数据不错乱
                consumer.seek(topicPartition,partitionOffsetMap.get(topicPartition));
            }
    
        }
    }

    将该监听器注册到spring容器中:

    @Bean
    public void getKafkaConsumer(){
       KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
         //TODO 偏移量
         this.currOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
         //TODO 消费者订阅是加入再均衡监听器(HandlerRebalance)
         consumer.subscribe(Collections.singletonList(BusiConst.REBALANCE_TOPIC), new HandlerRebalance(currOffsets,consumer));
       return consumer;
    }

    end.

  • 相关阅读:
    离职or not 离职
    RelativeLayout总结
    MVC中小试了一下Jquery
    tricks about andor in python
    【回旋数字】c语言实现
    退役?
    HDU4546 比赛难度
    WEB页面导出为EXCEL文档的方法
    开始→运行→命令
    控制Repeater显示列数
  • 原文地址:https://www.cnblogs.com/zhuwenjoyce/p/13192795.html
Copyright © 2011-2022 走看看