zoukankan      html  css  js  c++  java
  • rocketmq consumer 负载均衡

      DefaultMQPushConsumer的负载均衡过程不需要使用者操心,客户端程序会自动处理,每个

    1、DefaultMQPushConsumer启动后,会马上触发一个deRebalance动作;

          1.1、DefaultMQPushConsumerImpl.start()

     1     public synchronized void start() throws MQClientException {
     2         switch (this.serviceState) {
     3             case CREATE_JUST:
     4                 log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
     5                     this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
     6                 this.serviceState = ServiceState.START_FAILED;
     7 
     8                 this.checkConfig();
     9 
    10                 this.copySubscription();
    11 
    12                 if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    13                     this.defaultMQPushConsumer.changeInstanceNameToPID();
    14                 }
    15 
    16                 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    17 
    18                 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
    19                 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
    20                 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
    21                 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    22 
    23                 this.pullAPIWrapper = new PullAPIWrapper(
    24                     mQClientFactory,
    25                     this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
    26                 this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    27 
    28                 if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    29                     this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
    30                 } else {
    31                     switch (this.defaultMQPushConsumer.getMessageModel()) {
    32                         case BROADCASTING:
    33                             this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    34                             break;
    35                         case CLUSTERING:
    36                             this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    37                             break;
    38                         default:
    39                             break;
    40                     }
    41                     this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
    42                 }
    43                 this.offsetStore.load();
    44 
    45                 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    46                     this.consumeOrderly = true;
    47                     this.consumeMessageService =
    48                         new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
    49                 } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    50                     this.consumeOrderly = false;
    51                     this.consumeMessageService =
    52                         new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
    53                 }
    54 
    55                 this.consumeMessageService.start();
    56 
    57                 boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
    58                 if (!registerOK) {
    59                     this.serviceState = ServiceState.CREATE_JUST;
    60                     this.consumeMessageService.shutdown();
    61                     throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
    62                         + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
    63                         null);
    64                 }
    65 
    66                 mQClientFactory.start();
    67                 log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
    68                 this.serviceState = ServiceState.RUNNING;
    69                 break;
    70             case RUNNING:
    71             case START_FAILED:
    72             case SHUTDOWN_ALREADY:
    73                 throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
    74                     + this.serviceState
    75                     + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
    76                     null);
    77             default:
    78                 break;
    79         }

          1.2、MQClientInstance.start()

     1     public void start() throws MQClientException {
     2 
     3         synchronized (this) {
     4             switch (this.serviceState) {
     5                 case CREATE_JUST:
     6                     this.serviceState = ServiceState.START_FAILED;
     7                     // If not specified,looking address from name server
     8                     if (null == this.clientConfig.getNamesrvAddr()) {
     9                         this.mQClientAPIImpl.fetchNameServerAddr();
    10                     }
    11                     // Start request-response channel
    12                     this.mQClientAPIImpl.start();
    13                     // Start various schedule tasks
    14                     this.startScheduledTask();
    15                     // Start pull service
    16                     this.pullMessageService.start();
    17                     // Start rebalance service
    18                     this.rebalanceService.start();
    19                     // Start push service
    20                     this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
    21                     log.info("the client factory [{}] start OK", this.clientId);
    22                     this.serviceState = ServiceState.RUNNING;
    23                     break;
    24                 case RUNNING:
    25                     break;
    26                 case SHUTDOWN_ALREADY:
    27                     break;
    28                 case START_FAILED:
    29                     throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
    30                 default:
    31                     break;
    32             }
    33         }
    34     }

          1.3、org.apache.rocketmq.common.ServiceThread.start()

                  RebalanceService.run()

     1     @Override
     2     public void run() {
     3         log.info(this.getServiceName() + " service started");
     4 
     5         while (!this.isStopped()) {
     6             this.waitForRunning(waitInterval);
     7             this.mqClientFactory.doRebalance();
     8         }
     9 
    10         log.info(this.getServiceName() + " service end");
    11     }

      1.4、MQClientInstance.doRebalance()‘

     1     public void doRebalance() {
     2         for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
     3             MQConsumerInner impl = entry.getValue();
     4             if (impl != null) {
     5                 try {
     6                     impl.doRebalance();
     7                 } catch (Throwable e) {
     8                     log.error("doRebalance exception", e);
     9                 }
    10             }
    11         }
    12     }

    2、而且在同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,

    各个Consumer都会被触发doRebalance动作

    ClientRemotingProcessor.processRequest(ChannelHandlerContext, RemotingCommand)

     1     @Override
     2     public RemotingCommand processRequest(ChannelHandlerContext ctx,
     3         RemotingCommand request) throws RemotingCommandException {
     4         switch (request.getCode()) {
     5             case RequestCode.CHECK_TRANSACTION_STATE:
     6                 return this.checkTransactionState(ctx, request);
     7             case RequestCode.NOTIFY_CONSUMER_IDS_CHANGED:
     8                 return this.notifyConsumerIdsChanged(ctx, request);
     9             case RequestCode.RESET_CONSUMER_CLIENT_OFFSET:
    10                 return this.resetOffset(ctx, request);
    11             case RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT:
    12                 return this.getConsumeStatus(ctx, request);
    13 
    14             case RequestCode.GET_CONSUMER_RUNNING_INFO:
    15                 return this.getConsumerRunningInfo(ctx, request);
    16 
    17             case RequestCode.CONSUME_MESSAGE_DIRECTLY:
    18                 return this.consumeMessageDirectly(ctx, request);
    19             default:
    20                 break;
    21         }
    22         return null;
    23     }

     consumer负载均衡策略接口AllocateMessageQueueStrategy

     1 /**
     2  * Strategy Algorithm for message allocating between consumers
     3  */
     4 public interface AllocateMessageQueueStrategy {
     5 
     6     /**
     7      * Allocating by consumer id
     8      *
     9      * @param consumerGroup current consumer group
    10      * @param currentCID current consumer id
    11      * @param mqAll message queue set in current topic
    12      * @param cidAll consumer set in current consumer group
    13      * @return The allocate result of given strategy
    14      */
    15     List<MessageQueue> allocate(
    16         final String consumerGroup,
    17         final String currentCID,
    18         final List<MessageQueue> mqAll,
    19         final List<String> cidAll
    20     );
    21 
    22     /**
    23      * Algorithm name
    24      *
    25      * @return The strategy name
    26      */
    27     String getName();
    28 }
    View Code

    具体的负载均衡有六种 ,

    默认使用AllocateMessageQueueAveragely,负载均衡的结果与Topic的Message Queue数量,以及

    ConsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只到Message Queue,把Topic下的所有

    Message Queue分配到不同Consumer中,所以Message Queue和Consumer的数量关系,或者整除关系

    影响负载均衡结果

    3、以下以AllocateMessageQueueAveragely为例讲解

     3.1 RebalanceImpl.doRebalance(boolean)

     1     public void doRebalance(final boolean isOrder) {
     2         Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
     3         if (subTable != null) {
     4             for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
     5                 final String topic = entry.getKey();
     6                 try {
     7                     this.rebalanceByTopic(topic, isOrder);
     8                 } catch (Throwable e) {
     9                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    10                         log.warn("rebalanceByTopic Exception", e);
    11                     }
    12                 }
    13             }
    14         }
    15 
    16         this.truncateMessageQueueNotMyTopic();
    17     }

      3.2、RebalanceImpl.rebalanceByTopic(String, boolean)

     1     private void rebalanceByTopic(final String topic, final boolean isOrder) {
     2         switch (messageModel) {
     3             case BROADCASTING: {
     4                 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
     5                 if (mqSet != null) {
     6                     boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
     7                     if (changed) {
     8                         this.messageQueueChanged(topic, mqSet, mqSet);
     9                         log.info("messageQueueChanged {} {} {} {}",
    10                             consumerGroup,
    11                             topic,
    12                             mqSet,
    13                             mqSet);
    14                     }
    15                 } else {
    16                     log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
    17                 }
    18                 break;
    19             }
    20             case CLUSTERING: {
                //获取该Topic下所有的MessageQueue,包括不同broker下的
    21 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
               //查询该consumerGroup,topic下consumerIdList
    22 List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); 23 if (null == mqSet) { 24 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { 25 log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); 26 } 27 } 28 29 if (null == cidAll) { 30 log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); 31 } 32 33 if (mqSet != null && cidAll != null) { 34 List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); 35 mqAll.addAll(mqSet); 36 37 Collections.sort(mqAll); 38 Collections.sort(cidAll); 39 40 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; 41 42 List<MessageQueue> allocateResult = null; 43 try { 44 allocateResult = strategy.allocate( 45 this.consumerGroup, 46 this.mQClientFactory.getClientId(), 47 mqAll, 48 cidAll); 49 } catch (Throwable e) { 50 log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), 51 e); 52 return; 53 } 54 55 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); 56 if (allocateResult != null) { 57 allocateResultSet.addAll(allocateResult); 58 } 59 60 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); 61 if (changed) { 62 log.info( 63 "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", 64 strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), 65 allocateResultSet.size(), allocateResultSet); 66 this.messageQueueChanged(topic, mqSet, allocateResultSet); 67 } 68 } 69 break; 70 } 71 default: 72 break; 73 } 74 }

      3.3、AllocateMessageQueueStrategy.allocate(String, String, List<MessageQueue>, List<String>)

     1 /**
     2  * Average Hashing queue algorithm
     3  */
     4 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
     5     private final InternalLogger log = ClientLogger.getLog();
     6 
     7     @Override
     8     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
     9         List<String> cidAll) {
    10         if (currentCID == null || currentCID.length() < 1) {
    11             throw new IllegalArgumentException("currentCID is empty");
    12         }
    13         if (mqAll == null || mqAll.isEmpty()) {
    14             throw new IllegalArgumentException("mqAll is null or mqAll empty");
    15         }
    16         if (cidAll == null || cidAll.isEmpty()) {
    17             throw new IllegalArgumentException("cidAll is null or cidAll empty");
    18         }
    19 
    20         List<MessageQueue> result = new ArrayList<MessageQueue>();
    21         if (!cidAll.contains(currentCID)) {
    22             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
    23                 consumerGroup,
    24                 currentCID,
    25                 cidAll);
    26             return result;
    27         }
    28 
    29         int index = cidAll.indexOf(currentCID);
    30         int mod = mqAll.size() % cidAll.size();
    31         int averageSize =
    32             mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
    33                 + 1 : mqAll.size() / cidAll.size());
    34         int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
    35         int range = Math.min(averageSize, mqAll.size() - startIndex);
    36         for (int i = 0; i < range; i++) {
    37             result.add(mqAll.get((startIndex + i) % mqAll.size()));
    38         }
    39         return result;
    40     }
    41 
    42     @Override
    43     public String getName() {
    44         return "AVG";
    45     }
    46 }

    分配算法

    平均分配策略(默认)(AllocateMessageQueueAveragely)
    环形分配策略(AllocateMessageQueueAveragelyByCircle)
    手动配置分配策略(AllocateMessageQueueByConfig)
    机房分配策略(AllocateMessageQueueByMachineRoom)
    一致性哈希分配策略(AllocateMessageQueueConsistentHash)
    靠近机房策略(AllocateMachineRoomNearby)

    平均分配、环形分配如下

    普通消费方式

    Message Queue

    ConsumerId
    消息队列[0] Consumer[0]
    消息队列[1] Consumer[0]
    消息队列[2] Consumer[0]
    消息队列[3] Consumer[1]
    消息队列[4] Consumer[1]
    消息队列[5] Consumer[1]
    消息队列[6] Consumer[2]
    消息队列[7] Consumer[2]


    - 环形消费方式

    Message Queue  ConsumerId
    消息队列[0]  Consumer[0]
    消息队列[1]  Consumer[1]
    消息队列[2]  Consumer[2]
    消息队列[3]  Consumer[0]
    消息队列[4]  Consumer[1]
    消息队列[5]  Consumer[2]
    消息队列[6]  Consumer[0]
    消息队列[7]  Consumer[1]

    机房分配策略

     1     @Override
     2     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
     3         List<String> cidAll) {
     4         List<MessageQueue> result = new ArrayList<MessageQueue>();
     5         int currentIndex = cidAll.indexOf(currentCID);
     6         if (currentIndex < 0) {
     7             return result;
     8         }
     9         List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
    10         for (MessageQueue mq : mqAll) {
    11             String[] temp = mq.getBrokerName().split("@");
    12             if (temp.length == 2 && consumeridcs.contains(temp[0])) {
    13                 premqAll.add(mq);
    14             }
    15         }
    16 
    17         int mod = premqAll.size() / cidAll.size();
    18         int rem = premqAll.size() % cidAll.size();
    19         int startIndex = mod * currentIndex;
    20         int endIndex = startIndex + mod;
    21         for (int i = startIndex; i < endIndex; i++) {
    22             result.add(mqAll.get(i));
    23         }
    24         if (rem > currentIndex) {
    25             result.add(premqAll.get(currentIndex + mod * cidAll.size()));
    26         }
    27         return result;
    28     }

    第4-7行, 计算当前消费者在消费者集合中的下标(index), 如果下标 < 0 , 则直接返回

    第8-14行, 根据brokerName解析出所有有效机房信息(其实是有效mq), 结果存储在premqAll中

    第17行, 计算消息整除的平均结果mod

    第18行, 计算消息是否能够被平均消费rem,(即消息平均消费后还剩多少消息队列(remaing))

    第19行, 计算当前消费者开始消费的下标(startIndex)

    第20行, 计算当前消费者结束消费的下标(endIndex)

    第21-26行, 将消息的消费分为两部分, 第一部分 – (cidAllSize * mod) , 第二部分 – (premqAll - cidAllSize * mod) ;

    从第一部分中查询startIndex ~ endIndex之间所有的消息, 从第二部分中查询 currentIndex + mod * cidAll.size() , 最后返回查询的结果result

    可以通过下面的例子进一步了解,假设有三个消费者, 八个消息队列

    Message Queue Consumer

    Consumer
    消息队列[0] Consumer[0]
    消息队列[1] Consumer[0]
    消息队列[2] Consumer[1]
    消息队列[3] Consumer[1]
    消息队列[4] Consumer[2]
    消息队列[5] Consumer[2]
    消息队列[6] Consumer[0]
    消息队列[7] Consumer[1]

    靠近机房算法

     1 

    /**
    * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
    * specified.
    *
    * If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
    * should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
    * no alive consumer to monopolize them.
    */

        @Override
     2     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
     3         List<String> cidAll) {
     4         if (currentCID == null || currentCID.length() < 1) {
     5             throw new IllegalArgumentException("currentCID is empty");
     6         }
     7         if (mqAll == null || mqAll.isEmpty()) {
     8             throw new IllegalArgumentException("mqAll is null or mqAll empty");
     9         }
    10         if (cidAll == null || cidAll.isEmpty()) {
    11             throw new IllegalArgumentException("cidAll is null or cidAll empty");
    12         }
    13 
    14         List<MessageQueue> result = new ArrayList<MessageQueue>();
    15         if (!cidAll.contains(currentCID)) {
    16             log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
    17                 consumerGroup,
    18                 currentCID,
    19                 cidAll);
    20             return result;
    21         }
    22 
    23         //group mq by machine room
    24         Map<String/*machine room */, List<MessageQueue>> mr2Mq = new TreeMap<String, List<MessageQueue>>();
    25         for (MessageQueue mq : mqAll) {
    26             String brokerMachineRoom = machineRoomResolver.brokerDeployIn(mq);
    27             if (StringUtils.isNoneEmpty(brokerMachineRoom)) {
    28                 if (mr2Mq.get(brokerMachineRoom) == null) {
    29                     mr2Mq.put(brokerMachineRoom, new ArrayList<MessageQueue>());
    30                 }
    31                 mr2Mq.get(brokerMachineRoom).add(mq);
    32             } else {
    33                 throw new IllegalArgumentException("Machine room is null for mq " + mq);
    34             }
    35         }
    36 
    37         //group consumer by machine room
    38         Map<String/*machine room */, List<String/*clientId*/>> mr2c = new TreeMap<String, List<String>>();
    39         for (String cid : cidAll) {
    40             String consumerMachineRoom = machineRoomResolver.consumerDeployIn(cid);
    41             if (StringUtils.isNoneEmpty(consumerMachineRoom)) {
    42                 if (mr2c.get(consumerMachineRoom) == null) {
    43                     mr2c.put(consumerMachineRoom, new ArrayList<String>());
    44                 }
    45                 mr2c.get(consumerMachineRoom).add(cid);
    46             } else {
    47                 throw new IllegalArgumentException("Machine room is null for consumer id " + cid);
    48             }
    49         }
    50 
    51         List<MessageQueue> allocateResults = new ArrayList<MessageQueue>();
    52 
    53         //1.allocate the mq that deploy in the same machine room with the current consumer
    54         String currentMachineRoom = machineRoomResolver.consumerDeployIn(currentCID);
    55         List<MessageQueue> mqInThisMachineRoom = mr2Mq.remove(currentMachineRoom);
    56         List<String> consumerInThisMachineRoom = mr2c.get(currentMachineRoom);
    57         if (mqInThisMachineRoom != null && !mqInThisMachineRoom.isEmpty()) {
    58             allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mqInThisMachineRoom, consumerInThisMachineRoom));
    59         }
    60 
    61         //2.allocate the rest mq to each machine room if there are no consumer alive in that machine room
    62         for (String machineRoom : mr2Mq.keySet()) {
    63             if (!mr2c.containsKey(machineRoom)) { // no alive consumer in the corresponding machine room, so all consumers share these queues
    64                 allocateResults.addAll(allocateMessageQueueStrategy.allocate(consumerGroup, currentCID, mr2Mq.get(machineRoom), cidAll));
    65             }
    66         }
    67 
    68         return allocateResults;
    69     }
  • 相关阅读:
    SharedPreferences(转)
    android操作XML的几种方式(转)
    adb 卸载APP命令和杀死APP命令
    Android判断App是否在前台运行(转)
    Java中的Timer和TimerTask在Android中的用法(转)
    Android: 启动另外的APP及传递参数(转)
    BroadcastReceiver应用详解(转)
    Android---让你的APK程序开机自动运行(转)
    adb shell 命令详解(转)
    C#串口介绍以及简单串口通信程序设计实现
  • 原文地址:https://www.cnblogs.com/toUpdating/p/9989477.html
Copyright © 2011-2022 走看看