zoukankan      html  css  js  c++  java
  • RocketMQ(4.8.0)——消费者的Rebalance机制

    RocketMQ(4.8.0)——消费者的Rebalance机制

      客户端是通过 Rebalance 服务做到高可靠的。当发生 Broker 掉线、消费者实例掉线、Topic 扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的呢?

      RebalancePullImplRebalancePushImpl 两个重平衡实现类,分别 DefaltMQPullConsumerDefaultMQPushConsumer 使用。

      RebalanceImpl 的核心属性和方法如下:

    1、RebalanceImpl 核心属性:

      ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable:记录 MessageQueue 和 ProcessQueue 的关系。MessageQueue 可以简单地理解为 ConsumeQueue 的客户端实现;ProcessQueue 是保存 Pull 消息的本地容器。

      ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable:Topic 路由信息。保存 Topic 和 MessageQueue 的关系。

      ConcurrentMap<String /*topic */,SubcriptionData> subcriptionInner:真正的订阅关系,保存当前消费者组订阅了哪些 Topic 的哪些 Tag。

      AllocateMessageQueueStrategy allocateMessageQueueStrategy:MessageQueue 消费分配策略的实现。

      MQClientInstance mQClientFactory:client实例对象。

    2、RebalanceImpl 核心方法:

      boolean lock(final MessageQueue mq):为 MessageQueue 加锁。

      void doRebalance(final boolean isOrder): 执行 Rebalance 操作。

      void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,final Set<MessageQueue> mqDivided):通知 Message 发生变化,这个方法在 Push 和 Pull 两个类重写。

      boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pg):去掉不再需要的 MessageQueue。

      void dispathPullRequest(final List<PullRequest> pullRequestList):执行消息拉取请求。

      boolean updateProcessQueueTableInRebalance(final String topic, final Set <MessageQueue> mqSet, final boolean isOrder):在 Rebalance 中更新 processQueue 。

      RebalanceImpl、RebalancePushImpl、RebalancePullImpl 是 Rebalance 的核心实现,主要逻辑都在 RebalanceImpl 中,因为 Pull 消费者和 Push 消费者对 Rebalance 的需求不同,在各自的实现种重写了部分方法,以满足自身需求。

      如果有一个消费者实例下线了,Broker 和其他消费者是怎么做 Rebalance 的呢?下图展示了整个 Rebalance 的过程。

      消费者实例在接收到 Broker 通知后怎么执行 Rebalance的呢?这个操作是通过 MQClientInstance.rebalanceImmediately()来实现的,代码路径:D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientimplfactoryMQClientInstance.java,代码如下:

    1     public void rebalanceImmediately() {
    2         this.rebalanceService.wakeup();
    3     }
    rebalanceImmediately()

      这种设计是 RocketMQ 中典型的锁方式,执行 wakeup 命令后,this.rebalanceService.wakeup()就会暂停,再执行 this.mqClientFactory.doRebalance(),代码路径:D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientimplconsumerRebalanceService.java,代码如下:

     1     @Override
     2     public void run() {
     3         log.info(this.getServiceName() + " service started");
     4 
     5         while (!this.isStopped()) {
     6             this.waitForRunning(waitInterval);
     7             this.mqClientFactory.doRebalance();
     8         }
     9 
    10         log.info(this.getServiceName() + " service end");
    11     }

      下面介绍一下 doRebalance() 方法的实现逻辑,主要有以下几个步骤:

      第一步:查找当前 clientID 对应的全部的消息者组,全部执行一次 Rebalance。虽然消费者实现分为 Pull消费 和 Push 消费两种默认实现,调用的是不同实现类中的 Rebalance 方法,但是实现逻辑都差不多,这里以 Push 消费者为例。

      第二步:判断 Rebalance 开关,如果没有被暂停,则调用 RebalancePushImpl.rebalance()方法。

      第三步:在RebalancePushImpl.rebalance()方法中,获取当前消费者全部订阅关系中的 Topic,循环对每个 Topic 进行 Rebalance,待全部的 Rebalance 都执行完后,将不属于当前消费者的队列删除。

      第四步:Topic 队列重新分配。这里也就是客户端 Rebalance 的逻辑核心之处。根据是集群消费还是广播消费分别执行 MessageQueue 重新分配的逻辑。

    集群消费例子:

      (1) 获取当前 Topic 的全部 MessageQueue(代码是 mqSet)和该 Topic 的所有消费者的 clietId(代码是 cidAll)。只有当两者都不为空时,才执行 Rebalance,代码路径:D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientimplconsumerRebalanceService.java,代码如下:

     1     private void rebalanceByTopic(final String topic, final boolean isOrder) {
     2         switch (messageModel) {
     3             case BROADCASTING: {
     4                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
     5                 if (mqSet != null) {
     6                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
     7                     if (changed) {
     8                         this.messageQueueChanged(topic, mqSet, mqSet);
     9                         log.info("messageQueueChanged {} {} {} {}",
    10                             consumerGroup,
    11                             topic,
    12                             mqSet,
    13                             mqSet);
    14                     }
    15                 } else {
    16                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    17                 }
    18                 break;
    19             }
    20             case CLUSTERING: {
    21                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
    22                 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    23                 if (null == mqSet) {
    24                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    25                         log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    26                     }
    27                 }
    28 
    29                 if (null == cidAll) {
    30                     log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
    31                 }
    32 
    33                 if (mqSet != null && cidAll != null) {
    34                     List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
    35                     mqAll.addAll(mqSet);
    36 
    37                     Collections.sort(mqAll);
    38                     Collections.sort(cidAll);
    39 
    40                     AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    41 
    42                     List<MessageQueue> allocateResult = null;
    43                     try {
    44                         allocateResult = strategy.allocate(
    45                             this.consumerGroup,
    46                             this.mQClientFactory.getClientId(),
    47                             mqAll,
    48                             cidAll);
    49                     } catch (Throwable e) {
    50                         log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
    51                             e);
    52                         return;
    53                     }
    54 
    55                     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
    56                     if (allocateResult != null) {
    57                         allocateResultSet.addAll(allocateResult);
    58                     }
    59 
    60                     boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    61                     if (changed) {
    62                         log.info(
    63                             "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
    64                             strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
    65                             allocateResultSet.size(), allocateResultSet);
    66                         this.messageQueueChanged(topic, mqSet, allocateResultSet);
    67                     }
    68                 }
    69                 break;
    70             }
    71             default:
    72                 break;
    73         }
    74     }
    rebalanceByTopic()

      (2) 将全部的 MessageQueue(代码中是 mqAll)和消费者客户端(cidAll)进行排序。由于不是所有消费者的客户端都彼此通信,所以将 mqAll 和 cidAll 排序的目的在于,保证所有消费者客户端在做 Rebalance 的时候,看到的 MessageQueue 列表和消费者客户端都是一样的视图,做 Rebalance 时才不会分配错。

      (3) 按照当前设置的队列分配策略执行 Queue 分配。队列分配策略接口,代码路径: D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientconsumerAllocateMessageQueueStrategy.java,代码如下:

     1 /*
     2  * Licensed to the Apache Software Foundation (ASF) under one or more
     3  * contributor license agreements.  See the NOTICE file distributed with
     4  * this work for additional information regarding copyright ownership.
     5  * The ASF licenses this file to You under the Apache License, Version 2.0
     6  * (the "License"); you may not use this file except in compliance with
     7  * the License.  You may obtain a copy of the License at
     8  *
     9  *     http://www.apache.org/licenses/LICENSE-2.0
    10  *
    11  * Unless required by applicable law or agreed to in writing, software
    12  * distributed under the License is distributed on an "AS IS" BASIS,
    13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  * See the License for the specific language governing permissions and
    15  * limitations under the License.
    16  */
    17 package org.apache.rocketmq.client.consumer;
    18 
    19 import java.util.List;
    20 import org.apache.rocketmq.common.message.MessageQueue;
    21 
    22 /**
    23  * Strategy Algorithm for message allocating between consumers
    24  */
    25 public interface AllocateMessageQueueStrategy {
    26 
    27     /**
    28      * Allocating by consumer id
    29      *
    30      * @param consumerGroup current consumer group
    31      * @param currentCID current consumer id
    32      * @param mqAll message queue set in current topic
    33      * @param cidAll consumer set in current consumer group
    34      * @return The allocate result of given strategy
    35      */
    36     List<MessageQueue> allocate(
    37         final String consumerGroup,
    38         final String currentCID,
    39         final List<MessageQueue> mqAll,
    40         final List<String> cidAll
    41     );
    42 
    43     /**
    44      * Algorithm name
    45      *
    46      * @return The strategy name
    47      */
    48     String getName();
    49 }
    AllocateMessageQueueStrategy.java

      该接口中,有两个方法 allocate() 和 getName(),分别说明如下:

      allocate():执行队列分配操作,该方法必须满足全部队列都能分配到消费者。

      getName():获取当前分配算法的名字。

      目前队列分配策略有以下5种实现方法:

      • 平均分配策略(默认)(AllocateMessageQueueAveragely)
      • 环形分配策略(AllocateMessageQueueAveragelyByCircle)
      • 手动配置分配策略(AllocateMessageQueueByConfig)
      • 机房分配策略(AllocateMessageQueueByMachineRoom)
      • 一致性哈希分配策略(AllocateMessageQueueConsistentHash)

      (4) 动态更新 ProcessQueue。在队列重新分配后,当前消费者消费的队列可能不会发生变化,也可能发生变化,不管是增加了新的队列需要消费,还是减少了队列,都需要执行 updateProcessQueueTableInRebalance() 方法来更新 ProcessQueue。如果有 MessageQueue 不再分配给当前的消费者消费,则设置 ProcessQueue.setDroppend(true),表示放弃当前 MessageQueue 的 Pull 消息。updateProcessQueueTableInRebalance()方法代码路径:D: ocketmq-masterclientsrcmainjavaorgapache ocketmqclientimplconsumerRebalanceImpl.java,代码如下:

     1 private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
     2         final boolean isOrder) {
     3         boolean changed = false;
     4 
     5         Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
     6         while (it.hasNext()) {
     7             Entry<MessageQueue, ProcessQueue> next = it.next();
     8             MessageQueue mq = next.getKey();
     9             ProcessQueue pq = next.getValue();
    10 
    11             if (mq.getTopic().equals(topic)) {
    12                 if (!mqSet.contains(mq)) {
    13                     pq.setDropped(true);
    14                     if (this.removeUnnecessaryMessageQueue(mq, pq)) {
    15                         it.remove();
    16                         changed = true;
    17                         log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
    18                     }
    19                 } else if (pq.isPullExpired()) {
    20                     switch (this.consumeType()) {
    21                         case CONSUME_ACTIVELY:
    22                             break;
    23                         case CONSUME_PASSIVELY:
    24                             pq.setDropped(true);
    25                             if (this.removeUnnecessaryMessageQueue(mq, pq)) {
    26                                 it.remove();
    27                                 changed = true;
    28                                 log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",
    29                                     consumerGroup, mq);
    30                             }
    31                             break;
    32                         default:
    33                             break;
    34                     }
    35                 }
    36             }
    37         }
    38 
    39         List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
    40         for (MessageQueue mq : mqSet) {
    41             if (!this.processQueueTable.containsKey(mq)) {
    42                 if (isOrder && !this.lock(mq)) {
    43                     log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
    44                     continue;
    45                 }
    46 
    47                 this.removeDirtyOffset(mq);
    48                 ProcessQueue pq = new ProcessQueue();
    49                 long nextOffset = this.computePullFromWhere(mq);
    50                 if (nextOffset >= 0) {
    51                     ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
    52                     if (pre != null) {
    53                         log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
    54                     } else {
    55                         log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    56                         PullRequest pullRequest = new PullRequest();
    57                         pullRequest.setConsumerGroup(consumerGroup);
    58                         pullRequest.setNextOffset(nextOffset);
    59                         pullRequest.setMessageQueue(mq);
    60                         pullRequest.setProcessQueue(pq);
    61                         pullRequestList.add(pullRequest);
    62                         changed = true;
    63                     }
    64                 } else {
    65                     log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    66                 }
    67             }
    68         }
    69 
    70         this.dispatchPullRequest(pullRequestList);
    71 
    72         return changed;
    73     }
    updateProcessQueueTableInRebalance()

      如果在重新分配 MessageQueue 后,新增加了 MessageQueue,则添加一个对应的 ProcessQueue,查询 Queue 拉取位点,包装一个新的 PullRequest 来 Pull消息;同理,如果减少 MessageQueue,则将其对应的 ProcessQueue 删除。不管 MessageQueue 是新增还是减少,都会设置 changed 为 True,表示当前消费者消费的 MessageQueue 有变化。

      每个 MessageQueue 复用一个 PullRequest,这里也是唯一一个初始化 PullRequest 的地方。

     1         List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
     2         for (MessageQueue mq : mqSet) {
     3             if (!this.processQueueTable.containsKey(mq)) {
     4                 if (isOrder && !this.lock(mq)) {
     5                     log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
     6                     continue;
     7                 }
     8 
     9                 this.removeDirtyOffset(mq);
    10                 ProcessQueue pq = new ProcessQueue();
    11                 long nextOffset = this.computePullFromWhere(mq);
    12                 if (nextOffset >= 0) {
    13                     ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
    14                     if (pre != null) {
    15                         log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
    16                     } else {
    17                         log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
    18                         PullRequest pullRequest = new PullRequest();
    19                         pullRequest.setConsumerGroup(consumerGroup);
    20                         pullRequest.setNextOffset(nextOffset);
    21                         pullRequest.setMessageQueue(mq);
    22                         pullRequest.setProcessQueue(pq);
    23                         pullRequestList.add(pullRequest);
    24                         changed = true;
    25                     }
    26                 } else {
    27                     log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    28                 }
    29             }
    30         }
    31 
    32         this.dispatchPullRequest(pullRequestList);
    PullRequest初始化

      最后,新增的 PullRequest 对象将被分配出去拉取 MessageQueue 中的消息。

      (5) 执行 messageQueueChanged() 方法。如果有 MessageQueue 订阅发生变化,则更新本地订阅关系版本,修改本地消费者限流的一些参数,然后发送心跳,通知所有 Broker,当前订阅关系发生了变化。
      至此,结束。

  • 相关阅读:
    Js--小笔记
    Android Gson解析
    java格式化数字、货币、金钱
    关于Edittext默认弹出软键盘为数字键
    生日星座自动匹配
    ANDROID STUDIO系列教程六--GRADLE多渠道打包
    框架,简化了代码的同时,也让我们慢慢变蠢
    Android开发实现高德地图定位
    onNewIntent调用时机
    EditText输入手机号自动带空格
  • 原文地址:https://www.cnblogs.com/zuoyang/p/14419967.html
Copyright © 2011-2022 走看看