zoukankan      html  css  js  c++  java
  • rocketmq consumer 负载均衡

      DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个

    1、DefaultMQPushConsumer启动后,会马上触发一个deRebalance动作;

          1.1、DefaultMQPushConsumerImpl.start()

     1     public synchronized void start() throws MQClientException {
     2         switch (this.serviceState) {
     3             case CREATE_JUST:
     4                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
     5                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
     6                 this.serviceState = ServiceState.START_FAILED;
     7 
     8                 this.checkConfig();
     9 
    10                 this.copySubscription();
    11 
    12                 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    13                     this.defaultMQPushConsumer.changeInstanceNameToPID();
    14                 }
    15 
    16                 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    17 
    18                 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
    19                 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
    20                 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
    21                 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    22 
    23                 this.pullAPIWrapper = new PullAPIWrapper(
    24                     mQClientFactory,
    25                     this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
    26                 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    27 
    28                 if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    29                     this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
    30                 } else {
    31                     switch (this.defaultMQPushConsumer.getMessageModel()) {
    32                         case BROADCASTING:
    33                             this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    34                             break;
    35                         case CLUSTERING:
    36                             this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    37                             break;
    38                         default:
    39                             break;
    40                     }
    41                     this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
    42                 }
    43                 this.offsetStore.load();
    44 
    45                 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    46                     this.consumeOrderly = true;
    47                     this.consumeMessageService =
    48                         new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
    49                 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    50                     this.consumeOrderly = false;
    51                     this.consumeMessageService =
    52                         new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
    53                 }
    54 
    55                 this.consumeMessageService.start();
    56 
    57                 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
    58                 if (!registerOK) {
    59                     this.serviceState = ServiceState.CREATE_JUST;
    60                     this.consumeMessageService.shutdown();
    61                     throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
    62                         + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
    63                         null);
    64                 }
    65 
    66                 mQClientFactory.start();
    67                 log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
    68                 this.serviceState = ServiceState.RUNNING;
    69                 break;
    70             case RUNNING:
    71             case START_FAILED:
    72             case SHUTDOWN_ALREADY:
    73                 throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
    74                     + this.serviceState
    75                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
    76                     null);
    77             default:
    78                 break;
    79         }

          1.2、MQClientInstance.start()

     1     public void start() throws MQClientException {
     2 
     3         synchronized (this) {
     4             switch (this.serviceState) {
     5                 case CREATE_JUST:
     6                     this.serviceState = ServiceState.START_FAILED;
     7                     // If not specified,looking address from name server
     8                     if (null == this.clientConfig.getNamesrvAddr()) {
     9                         this.mQClientAPIImpl.fetchNameServerAddr();
    10                     }
    11                     // Start request-response channel
    12                     this.mQClientAPIImpl.start();
    13                     // Start various schedule tasks
    14                     this.startScheduledTask();
    15                     // Start pull service
    16                     this.pullMessageService.start();
    17                     // Start rebalance service
    18                     this.rebalanceService.start();
    19                     // Start push service
    20                     this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    21                     log.info("the client factory [{}] start OK", this.clientId);
    22                     this.serviceState = ServiceState.RUNNING;
    23                     break;
    24                 case RUNNING:
    25                     break;
    26                 case SHUTDOWN_ALREADY:
    27                     break;
    28                 case START_FAILED:
    29                     throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
    30                 default:
    31                     break;
    32             }
    33         }
    34     }

          1.3、org.apache.rocketmq.common.ServiceThread.start()

                  RebalanceService.run()

     1     @Override
     2     public void run() {
     3         log.info(this.getServiceName() + " service started");
     4 
     5         while (!this.isStopped()) {
     6             this.waitForRunning(waitInterval);
     7             this.mqClientFactory.doRebalance();
     8         }
     9 
    10         log.info(this.getServiceName() + " service end");
    11     }

      1.4、MQClientInstance.doRebalance()‘

     1     public void doRebalance() {
     2         for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
     3             MQConsumerInner impl = entry.getValue();
     4             if (impl != null) {
     5                 try {
     6                     impl.doRebalance();
     7                 } catch (Throwable e) {
     8                     log.error("doRebalance exception", e);
     9                 }
    10             }
    11         }
    12     }

    2、而且在同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,

    各个Consumer都会被触发doRebalance动作

    ClientRemotingProcessor.processRequest(ChannelHandlerContext, RemotingCommand)

     1     @Override
     2     public RemotingCommand processRequest(ChannelHandlerContext ctx,
     3         RemotingCommand request) throws RemotingCommandException {
     4         switch (request.getCode()) {
     5             case RequestCode.CHECK_TRANSACTION_STATE:
     6                 return this.checkTransactionState(ctx, request);
     7             case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
     8                 return this.notifyConsumerIdsChanged(ctx, request);
     9             case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
    10                 return this.resetOffset(ctx, request);
    11             case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
    12                 return this.getConsumeStatus(ctx, request);
    13 
    14             case RequestCode.GET_CONSUMER_RUNNING_INFO:
    15                 return this.getConsumerRunningInfo(ctx, request);
    16 
    17             case RequestCode.CONSUME_MESSAGE_DIRECTLY:
    18                 return this.consumeMessageDirectly(ctx, request);
    19             default:
    20                 break;
    21         }
    22         return null;
    23     }

     consumer负载均衡策略接口AllocateMessageQueueStrategy

     1 /**
     2  * Strategy Algorithm for message allocating between consumers
     3  */
     4 public interface AllocateMessageQueueStrategy {
     5 
     6     /**
     7      * Allocating by consumer id
     8      *
     9      * @param consumerGroup current consumer group
    10      * @param currentCID current consumer id
    11      * @param mqAll message queue set in current topic
    12      * @param cidAll consumer set in current consumer group
    13      * @return The allocate result of given strategy
    14      */
    15     List<MessageQueue> allocate(
    16         final String consumerGroup,
    17         final String currentCID,
    18         final List<MessageQueue> mqAll,
    19         final List<String> cidAll
    20     );
    21 
    22     /**
    23      * Algorithm name
    24      *
    25      * @return The strategy name
    26      */
    27     String getName();
    28 }
    View Code

    具体的负载均衡有六种 ,

    默认使用AllocateMessageQueueAveragely,负载均衡的结果与Topic的Message Queue数量,以及

    ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到Message Queue,把Topic下的所有

    Message Queue分配到不同Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系

    影响负载均衡结果

    3、以下以AllocateMessageQueueAveragely为例讲解

     3.1 RebalanceImpl.doRebalance(boolean)

     1     public void doRebalance(final boolean isOrder) {
     2         Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
     3         if (subTable != null) {
     4             for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
     5                 final String topic = entry.getKey();
     6                 try {
     7                     this.rebalanceByTopic(topic, isOrder);
     8                 } catch (Throwable e) {
     9                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    10                         log.warn("rebalanceByTopic Exception", e);
    11                     }
    12                 }
    13             }
    14         }
    15 
    16         this.truncateMessageQueueNotMyTopic();
    17     }

      3.2、RebalanceImpl.rebalanceByTopic(String, boolean)

     1     private void rebalanceByTopic(final String topic, final boolean isOrder) {
     2         switch (messageModel) {
     3             case BROADCASTING: {
     4                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
     5                 if (mqSet != null) {
     6                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
     7                     if (changed) {
     8                         this.messageQueueChanged(topic, mqSet, mqSet);
     9                         log.info("messageQueueChanged {} {} {} {}",
    10                             consumerGroup,
    11                             topic,
    12                             mqSet,
    13                             mqSet);
    14                     }
    15                 } else {
    16                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    17                 }
    18                 break;
    19             }
    20             case CLUSTERING: {
                //获取该Topic下所有的MessageQueue,包括不同broker下的
    21 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
               //查询该consumerGroup,topic下consumerIdList
    22 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 23 if (null == mqSet) { 24 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 25 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 26 } 27 } 28 29 if (null == cidAll) { 30 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 31 } 32 33 if (mqSet != null && cidAll != null) { 34 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 35 mqAll.addAll(mqSet); 36 37 Collections.sort(mqAll); 38 Collections.sort(cidAll); 39 40 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 41 42 List<MessageQueue> allocateResult = null; 43 try { 44 allocateResult = strategy.allocate( 45 this.consumerGroup, 46 this.mQClientFactory.getClientId(), 47 mqAll, 48 cidAll); 49 } catch (Throwable e) { 50 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 51 e); 52 return; 53 } 54 55 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 56 if (allocateResult != null) { 57 allocateResultSet.addAll(allocateResult); 58 } 59 60 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 61 if (changed) { 62 log.info( 63 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 64 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 65 allocateResultSet.size(), allocateResultSet); 66 this.messageQueueChanged(topic, mqSet, allocateResultSet); 67 } 68 } 69 break; 70 } 71 default: 72 break; 73 } 74 }

      3.3、AllocateMessageQueueStrategy.allocate(String, String, List<MessageQueue>, List<String>)

     1 /**
     2  * Average Hashing queue algorithm
     3  */
     4 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
     5     private final InternalLogger log = ClientLogger.getLog();
     6 
     7     @Override
     8     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
     9         List<String> cidAll) {
    10         if (currentCID == null || currentCID.length() < 1) {
    11             throw new IllegalArgumentException("currentCID is empty");
    12         }
    13         if (mqAll == null || mqAll.isEmpty()) {
    14             throw new IllegalArgumentException("mqAll is null or mqAll empty");
    15         }
    16         if (cidAll == null || cidAll.isEmpty()) {
    17             throw new IllegalArgumentException("cidAll is null or cidAll empty");
    18         }
    19 
    20         List<MessageQueue> result = new ArrayList<MessageQueue>();
    21         if (!cidAll.contains(currentCID)) {
    22             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
    23                 consumerGroup,
    24                 currentCID,
    25                 cidAll);
    26             return result;
    27         }
    28 
    29         int index = cidAll.indexOf(currentCID);
    30         int mod = mqAll.size() % cidAll.size();
    31         int averageSize =
    32             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
    33                 + 1 : mqAll.size() / cidAll.size());
    34         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    35         int range = Math.min(averageSize, mqAll.size() - startIndex);
    36         for (int i = 0; i < range; i++) {
    37             result.add(mqAll.get((startIndex + i) % mqAll.size()));
    38         }
    39         return result;
    40     }
    41 
    42     @Override
    43     public String getName() {
    44         return "AVG";
    45     }
    46 }

    分配算法

    平均分配策略(默认)(AllocateMessageQueueAveragely)
    环形分配策略(AllocateMessageQueueAveragelyByCircle)
    手动配置分配策略(AllocateMessageQueueByConfig)
    机房分配策略(AllocateMessageQueueByMachineRoom)
    一致性哈希分配策略(AllocateMessageQueueConsistentHash)
    靠近机房策略(AllocateMachineRoomNearby)

    平均分配、环形分配如下

    普通消费方式

    Message Queue

    ConsumerId
    消息队列[0] Consumer[0]
    消息队列[1] Consumer[0]
    消息队列[2] Consumer[0]
    消息队列[3] Consumer[1]
    消息队列[4] Consumer[1]
    消息队列[5] Consumer[1]
    消息队列[6] Consumer[2]
    消息队列[7] Consumer[2]


    - 环形消费方式

    Message Queue  ConsumerId
    消息队列[0]  Consumer[0]
    消息队列[1]  Consumer[1]
    消息队列[2]  Consumer[2]
    消息队列[3]  Consumer[0]
    消息队列[4]  Consumer[1]
    消息队列[5]  Consumer[2]
    消息队列[6]  Consumer[0]
    消息队列[7]  Consumer[1]

    机房分配策略

     1     @Override
     2     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
     3         List<String> cidAll) {
     4         List<MessageQueue> result = new ArrayList<MessageQueue>();
     5         int currentIndex = cidAll.indexOf(currentCID);
     6         if (currentIndex < 0) {
     7             return result;
     8         }
     9         List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
    10         for (MessageQueue mq : mqAll) {
    11             String[] temp = mq.getBrokerName().split("@");
    12             if (temp.length == 2 && consumeridcs.contains(temp[0])) {
    13                 premqAll.add(mq);
    14             }
    15         }
    16 
    17         int mod = premqAll.size() / cidAll.size();
    18         int rem = premqAll.size() % cidAll.size();
    19         int startIndex = mod * currentIndex;
    20         int endIndex = startIndex + mod;
    21         for (int i = startIndex; i < endIndex; i++) {
    22             result.add(mqAll.get(i));
    23         }
    24         if (rem > currentIndex) {
    25             result.add(premqAll.get(currentIndex + mod * cidAll.size()));
    26         }
    27         return result;
    28     }

    第4-7行, 计算当前消费者在消费者集合中的下标(index), 如果下标 < 0 , 则直接返回

    第8-14行, 根据brokerName解析出所有有效机房信息(其实是有效mq), 结果存储在premqAll中

    第17行, 计算消息整除的平均结果mod

    第18行, 计算消息是否能够被平均消费rem,(即消息平均消费后还剩多少消息队列(remaing))

    第19行, 计算当前消费者开始消费的下标(startIndex)

    第20行, 计算当前消费者结束消费的下标(endIndex)

    第21-26行, 将消息的消费分为两部分, 第一部分 – (cidAllSize * mod) , 第二部分 – (premqAll - cidAllSize * mod) ;

    从第一部分中查询startIndex ~ endIndex之间所有的消息, 从第二部分中查询 currentIndex + mod * cidAll.size() , 最后返回查询的结果result

    可以通过下面的例子进一步了解,假设有三个消费者, 八个消息队列

    Message Queue Consumer

    Consumer
    消息队列[0] Consumer[0]
    消息队列[1] Consumer[0]
    消息队列[2] Consumer[1]
    消息队列[3] Consumer[1]
    消息队列[4] Consumer[2]
    消息队列[5] Consumer[2]
    消息队列[6] Consumer[0]
    消息队列[7] Consumer[1]

    靠近机房算法

     1 

    /**
    * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
    * specified.
    *
    * If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
    * should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
    * no alive consumer to monopolize them.
    */

        @Override
     2     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
     3         List<String> cidAll) {
     4         if (currentCID == null || currentCID.length() < 1) {
     5             throw new IllegalArgumentException("currentCID is empty");
     6         }
     7         if (mqAll == null || mqAll.isEmpty()) {
     8             throw new IllegalArgumentException("mqAll is null or mqAll empty");
     9         }
    10         if (cidAll == null || cidAll.isEmpty()) {
    11             throw new IllegalArgumentException("cidAll is null or cidAll empty");
    12         }
    13 
    14         List<MessageQueue> result = new ArrayList<MessageQueue>();
    15         if (!cidAll.contains(currentCID)) {
    16             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
    17                 consumerGroup,
    18                 currentCID,
    19                 cidAll);
    20             return result;
    21         }
    22 
    23         //group mq by machine room
    24         Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
    25         for (MessageQueue mq : mqAll) {
    26             String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
    27             if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    28                 if (mr2Mq.get(brokerMachineRoom) == null) {
    29                     mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
    30                 }
    31                 mr2Mq.get(brokerMachineRoom).add(mq);
    32             } else {
    33                 throw new IllegalArgumentException("Machine room is null for mq " + mq);
    34             }
    35         }
    36 
    37         //group consumer by machine room
    38         Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
    39         for (String cid : cidAll) {
    40             String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
    41             if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    42                 if (mr2c.get(consumerMachineRoom) == null) {
    43                     mr2c.put(consumerMachineRoom, new ArrayList<String>());
    44                 }
    45                 mr2c.get(consumerMachineRoom).add(cid);
    46             } else {
    47                 throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
    48             }
    49         }
    50 
    51         List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
    52 
    53         //1.allocate the mq that deploy in the same machine room with the current consumer
    54         String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
    55         List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
    56         List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
    57         if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
    58             allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    59         }
    60 
    61         //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
    62         for (String machineRoom : mr2Mq.keySet()) {
    63             if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
    64                 allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
    65             }
    66         }
    67 
    68         return allocateResults;
    69     }
  • 相关阅读:
    Educational Codeforces Round 10 C. Foe Pairs 水题
    Educational Codeforces Round 10 B. z-sort 构造
    CDOJ 1048 Bob's vector 三分
    Educational Codeforces Round 10 A. Gabriel and Caterpillar 模拟
    第14届电子科大初赛民间盗版部分题目题解
    HDU 5654 xiaoxin and his watermelon candy 离线树状数组 区间不同数的个数
    HDU 5653 Bomber Man wants to bomb an Array. dp
    HDU 5652 India and China Origins 二分+并查集
    HDU 5651 xiaoxin juju needs help 数学
    HDU 5650 so easy 数学
  • 原文地址:https://www.cnblogs.com/toUpdating/p/9989477.html
Copyright © 2011-2022 走看看