RocketMQ 消息队列重新分布 由RebalanceService 来实现的 。一个 MQClientInstance持有一个RebalanceService现实,并随 MQClientlnstance 的启动而启动
public class MQClientInstance { private final static long LOCK_TIMEOUT_MILLIS = 3000; private final InternalLogger log = ClientLogger.getLog(); private final ClientConfig clientConfig; private final int instanceIndex; private final String clientId; private final long bootTimestamp = System.currentTimeMillis(); private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>(); private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>(); private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>(); private final NettyClientConfig nettyClientConfig; private final MQClientAPIImpl mQClientAPIImpl; private final MQAdminImpl mQAdminImpl; private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>(); private final Lock lockNamesrv = new ReentrantLock(); private final Lock lockHeartbeat = new ReentrantLock(); private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable = new ConcurrentHashMap<String, HashMap<Long, String>>(); private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable = new ConcurrentHashMap<String, HashMap<String, Integer>>(); private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, "MQClientFactoryScheduledThread"); } }); private final ClientRemotingProcessor clientRemotingProcessor; private final PullMessageService pullMessageService; private final RebalanceService rebalanceService; private final DefaultMQProducer defaultMQProducer; private final ConsumerStatsManager consumerStatsManager;
RebalanceService # run
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStopped()) { this.waitForRunning(waitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
RebalanceService 线程默 20s 执行 mqClientFactory.doRebalance()方法,精髓都在20s
public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }
最终调用的是 Rebalancelmpl#rebalanceByTopic
case CLUSTERING: { Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { log.info( "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}", strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(), allocateResultSet.size(), allocateResultSet); this.messageQueueChanged(topic, mqSet, allocateResultSet); } } break;
Step1 :从主题订阅信息缓存表中获取主题的队列信息; 发送请求从 Broker中 该消费组内 当前所有 的消费者客户端 ID,主题 topic 的队列可能分布在多个 Broker上,那 请求发往哪个 Broker呢? RocketMQ 从主题的路由信息表中随机选择一个 Broker。 Broker什么 存在消费组内所有消费者的信息呢?我们不妨回忆一下消费者在启动的时候会向MQClientlnstance中注册消费者,然后 MQClientlnstance 会向所有的 Broker 送心跳包,心跳包中包含 MQClientlnstance 消费者信息。如果 mqSet, cidAll 任意一个为空则忽略本次消息队列负载。
Step2 :首先对 cidAll,mqAll 排序,这个很重要,同 个消费组内看到的视图保持一致,
确保同 个消费队列不会被多个消费者分配 RocketMQ 消息队列分配算法接口
Rocketmq默认的分配算法是 AllocateMessageQueueAveragely
Step3:根据分配到的MessageQueue更新本消费者客户端的消息
Rebalancelmpl#updateProcessQueueTableInRebalance
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } }
Step3 : ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable ,当前消费者负载的消息队列缓存表,如果缓存表中的 MessageQueue 不包含在 mqSet 中,说明经过本次消息队列负载后,该 mq 被分配给其他消费者,故需要暂停该消息队列消息的消费,方法是将 ProccessQueue 的状态设置为 dropped=true,该 ProcessQueue 中的消息将不会再被消费,调用 removeUnnecessaryMessageQueue 方法判断是否将 MessageQueue,ProccessQueue 缓存
表中移除。removeUnnecessaryMessageQueue 在Rebalancelmple 定义为抽象方法。 removeUnnecessaryMessageQueue 方法主要持久化待移除 MessageQueu 消息消费进度。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } this.removeDirtyOffset(mq); ProcessQueue pq = new ProcessQueue(); long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } }
Step4 :遍历本次负载分配到的队列集合,如果 processQueueTable 中没有包含该消息
队列,表明这是本次新增加的消息队列, 首先从内存中移除该消息队列的消费进度,然后
从磁盘中读取该消息队列的消费进度,创建 PullRequest 对象。 这里有一个关键,如果读取到的消费进度小于0,则需要校对消费进度。 RocketMQ 提供 CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM TIMESTAMP 方式,
在创建消费者时可以通过调用 DefaultMQPushConsumer#setConsumeFromWhere 方法设置
PullRequest的 nextOffset 计算逻辑位于 RebalancePushlmpl#computePullFromWhere
用两个问题作为总结
问题1 : PullRequest 对象在什么时候创建并加入到 pullRequestQueue 中以便唤PullMessageService 线程。
RebalanceService 线程每隔 20s 对消费者订阅的主题进行一次队列重新分配, 每一次分配都会获取主题的所有队列、从 Broke 服务器实时查询当前该主题该消费组内消费者列表,对新分配的消息队列会创建对应的 PullRequest 对象。 在一个 JVM进 程中,同一个消费组同 一个队列只会存在一个 PullRequest对象。
问题2:集群内多个消费者是如何负载主题下的多个消费队列 ,并且如果有新的消费者加入时,消息队列又会如何重新分布。
由于每次进行队列重新负载时会从 Broker 实时查询出当前消费组内所有消费者,并且
对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消
费队列从而消费消息。