zoukankan      html  css  js  c++  java
  • rocketmq消费负载均衡--push消费为例

    本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点。本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalanceByTopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述。
     
    介绍之前首先抛出几个问题:
    1. 要做负载均衡,首先要解决的一个问题是什么?
    2. 负载均衡是Client端处理还是Broker端处理?
    个人理解:
    1. 要做负载均衡,首先要做的就是信号收集。
    所谓信号收集,就是得知道每一个consumerGroup有哪些consumer,对应的topic是谁。信号收集分为Client端信号收集与Broker端信号收集两个部分。
    2. 负载均衡放在Client端处理。
    具体做法是:消费者客户端在启动时完善rebalanceImpl实例,同时拷贝订阅信息存放rebalanceImpl实例对象中,另外也是很重要的一个步骤 -- 通过心跳消息,不停的上报自己到所有Broker,注册RegisterConsumer,等待上述过程准备好之后在Client端不断执行的负载均衡服务线程从Broker端获取一份全局信息(该consumerGroup下所有的消费Client),然后分配这些全局信息,获取当前客户端分配到的消费队列。
     
    本文具体的内容:
    I. copySubscription
    Client端信号收集,拷贝订阅信息。
    在DefaultMQPushConsumerImpl.start()时,会将消费者的topic订阅关系设置到rebalanceImpl的SubscriptionInner的map中用于负载:
    private void copySubscription() throws MQClientException {
            try {
          //注:一个consumer对象可以订阅多个topic
                Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
                if (sub != null) {
                    for (final Map.Entry<String, String> entry : sub.entrySet()) {
                        final String topic = entry.getKey();
                        final String subString = entry.getValue();
                        SubscriptionData subscriptionData =
                                FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                                    topic, subString);
                        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                    }
                }
    
                if (null == this.messageListenerInner) {
                    this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
                }
    
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData =
                            FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                                retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
                }
            }
            catch (Exception e) {
                throw new MQClientException("subscription exception", e);
            }
        }
    FilterAPI.buildSubscriptionData接口将订阅关系转换为SubscriptionData 数据,其中subString包含订阅tag等信息。另外,如果该消费者的消费模式为集群消费,则会将retry的topic一并放到。
     
    II. 完善rebalanceImpl实例
    Client继续收集信息:
     this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
     this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
     this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
                    .getAllocateMessageQueueStrategy());
     this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    本文以DefaultMQPushConsumerImpl为例,因此this对象类型为DefaultMQPushConsumerImp。
     
    III. this.rebalanceService.start()
    开启负载均衡服务。this.rebalanceService是一个RebalanceService实例对象,它继承与ServiceThread,是一个线程类。 this.rebalanceService.start()执行时,也即执行RebalanceService线程体:
       @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStoped()) {
                this.waitForRunning(WaitInterval);
                this.mqClientFactory.doRebalance();
            }
    
            log.info(this.getServiceName() + " service end");
        }
    IV. this.mqClientFactory.doRebalance
    客户端遍历消费组table,对该客户端上所有消费者独立进行负载均衡,分发消费队列:
     public void doRebalance() {
            for (String group : this.consumerTable.keySet()) {
                MQConsumerInner impl = this.consumerTable.get(group);
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Exception e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }
    V. MQConsumerInner.doRebalance
    由于本文以DefaultMQPushConsumerImpl消费过程为例,即DefaultMQPushConsumerImpl.doRebalance:
    @Override
        public void doRebalance() {
            if (this.rebalanceImpl != null) {
                this.rebalanceImpl.doRebalance();
            }
        }
    步骤II 中完善了rebalanceImpl实例,为调用rebalanceImpl.doRebalance()提供了初始数据。
    rebalanceImpl.doRebalance()过程如下:
    public void doRebalance() {
         // 前文copySubscription中初始化了SubscriptionInner
            Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
            if (subTable != null) {
                for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                    final String topic = entry.getKey();
                    try {
                        this.rebalanceByTopic(topic);
                    } catch (Exception e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("rebalanceByTopic Exception", e);
                        }
                    }
                }
            }
    
            this.truncateMessageQueueNotMyTopic();
        }

    VI. rebalanceByTopic -- 核心步骤之一
    rebalanceByTopic方法中根据消费者的消费类型为BROADCASTING或CLUSTERING做不同的逻辑处理。CLUSTERING逻辑包括BROADCASTING逻辑,本部分只介绍集群消费负载均衡的逻辑。
    集群消费负载均衡逻辑主要代码如下(省略了log等代码):

    //1.从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列
    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    //2. 从broker端获取消费该消费组的所有客户端clientId
    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
     f (null == mqSet) { ... }
    if (null == cidAll) { ... }
    if (mqSet != null && cidAll != null) {
            List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
            mqAll.addAll(mqSet);
            Collections.sort(mqAll);
            Collections.sort(cidAll);
    
         // 3.创建DefaultMQPushConsumer对象时默认设置为AllocateMessageQueueAveragely
            AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
            List<MessageQueue> allocateResult = null;
            try {
              // 4.调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列
                    allocateResult = strategy.allocate(
                    this.consumerGroup, 
                    this.mQClientFactory.getClientId(), 
                    mqAll,
                    cidAll);
                } catch (Throwable e) {
                            return;
                        }
             // 5. 将分配得到的allocateResult 中的队列放入allocateResultSet 集合
                Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                if (allocateResult != null) {
                    allocateResultSet.addAll(allocateResult);
                }
    、
             //6. 更新updateProcessQueue
                boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
                if (changed) {
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                }
    }
    注:BROADCASTING逻辑只包含上述的1、6。
    集群消费负载均衡逻辑中的1、2、4这三个点相关知识为其核心过程,各个点相关知识如下:
     
    第1点:从topicSubscribeInfoTable列表中获取与该topic相关的所有消息队列
    第2点: 从broker端获取消费该消费组的所有客户端clientId
    首先,消费者对象不断地向所有broker发送心跳包,上报自己,注册并更新订阅关系以及客户端ChannelInfoTable;之后,客户端在做消费负载均衡时获取那些消费客户端,对这些客户端进行负载均衡,分发消费的队列。具体过程如下图所示:

    第4点:调用AllocateMessageQueueAveragely.allocate方法,获取当前client分配消费队列
     
    注:上图中cId1、cId2、...、cIdN通过 getConsumerIdListByGroup 获取,它们在这个ConsumerGroup下所有在线客户端列表中。
    当前消费对进行负载均衡策略后获取对应的消息消费队列。具体的算法很简单,可以看源码。
     
     
     
  • 相关阅读:
    正在与拖延症病魔抗争中
    Words For Today [20110724]
    短期目标[Till 20110805]
    Words For Today [20110804]
    Words For Today [20110731]
    Words For Today [20110801]
    Words For Today [20110723]
    《定位》一书
    马云的最近的话柱着拐杖跳高
    创业的人格
  • 原文地址:https://www.cnblogs.com/chenjunjie12321/p/7913323.html
Copyright © 2011-2022 走看看