zoukankan      html  css  js  c++  java
  • RocketMq总结(五) -- 消息队列负载均衡和再分配

      RocketMQ 消息队列重新分布 由RebalanceService 来实现的 。一个 MQClientInstance持有一个RebalanceService现实,并随 MQClientlnstance 的启动而启动
    public class MQClientInstance {
        private final static long LOCK_TIMEOUT_MILLIS = 3000;
        private final InternalLogger log = ClientLogger.getLog();
        private final ClientConfig clientConfig;
        private final int instanceIndex;
        private final String clientId;
        private final long bootTimestamp = System.currentTimeMillis();
        private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
        private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
        private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
        private final NettyClientConfig nettyClientConfig;
        private final MQClientAPIImpl mQClientAPIImpl;
        private final MQAdminImpl mQAdminImpl;
        private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
        private final Lock lockNamesrv = new ReentrantLock();
        private final Lock lockHeartbeat = new ReentrantLock();
        private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
            new ConcurrentHashMap<String, HashMap<Long, String>>();
        private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
            new ConcurrentHashMap<String, HashMap<String, Integer>>();
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MQClientFactoryScheduledThread");
            }
        });
        private final ClientRemotingProcessor clientRemotingProcessor;
        private final PullMessageService pullMessageService;
        private final RebalanceService rebalanceService;
        private final DefaultMQProducer defaultMQProducer;
        private final ConsumerStatsManager consumerStatsManager;
      RebalanceService # run
    @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                this.waitForRunning(waitInterval);
                this.mqClientFactory.doRebalance();
            }
    
            log.info(this.getServiceName() + " service end");
        }

      RebalanceService 线程默 20s 执行 mqClientFactory.doRebalance()方法,精髓都在20s

    public void doRebalance() {
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }

      最终调用的是 Rebalancelmpl#rebalanceByTopic

    case CLUSTERING: {
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                    if (null == mqSet) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                        }
                    }
    
                    if (null == cidAll) {
                        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                    }
    
                    if (mqSet != null && cidAll != null) {
                        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                        mqAll.addAll(mqSet);
    
                        Collections.sort(mqAll);
                        Collections.sort(cidAll);
    
                        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
                        List<MessageQueue> allocateResult = null;
                        try {
                            allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                        } catch (Throwable e) {
                            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                e);
                            return;
                        }
    
                        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                        if (allocateResult != null) {
                            allocateResultSet.addAll(allocateResult);
                        }
    
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        if (changed) {
                            log.info(
                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                                allocateResultSet.size(), allocateResultSet);
                            this.messageQueueChanged(topic, mqSet, allocateResultSet);
                        }
                    }
                    break;
    Step1 :从主题订阅信息缓存表中获取主题的队列信息; 发送请求从 Broker中 该消费组内 当前所有 的消费者客户端 ID,主题 topic 的队列可能分布在多个 Broker上,那 请求发往哪个 Broker呢? RocketMQ 从主题的路由信息表中随机选择一个 Broker。 Broker什么 存在消费组内所有消费者的信息呢?我们不妨回忆一下消费者在启动的时候会向MQClientlnstance中注册消费者,然后 MQClientlnstance 会向所有的 Broker 送心跳包,心跳包中包含 MQClientlnstance 消费者信息。如果 mqSet, cidAll 任意一个为空则忽略本次消息队列负载。
    Step2 :首先对 cidAll,mqAll 排序,这个很重要,同 个消费组内看到的视图保持一致,
    确保同 个消费队列不会被多个消费者分配 RocketMQ 消息队列分配算法接口
     
    Rocketmq默认的分配算法是 AllocateMessageQueueAveragely
     

    Step3:根据分配到的MessageQueue更新本消费者客户端的消息

      Rebalancelmpl#updateProcessQueueTableInRebalance
      
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
            final boolean isOrder) {
            boolean changed = false;
    
            Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<MessageQueue, ProcessQueue> next = it.next();
                MessageQueue mq = next.getKey();
                ProcessQueue pq = next.getValue();
    
                if (mq.getTopic().equals(topic)) {
                    if (!mqSet.contains(mq)) {
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                        }
                    }
    Step3 : ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable ,当前消费者负载的消息队列缓存表,如果缓存表中的 MessageQueue 不包含在 mqSet 中,说明经过本次消息队列负载后,该 mq 被分配给其他消费者,故需要暂停该消息队列消息的消费,方法是将 ProccessQueue 的状态设置为 dropped=true,该 ProcessQueue 中的消息将不会再被消费,调用 removeUnnecessaryMessageQueue 方法判断是否将 MessageQueue,ProccessQueue 缓存
    表中移除。removeUnnecessaryMessageQueue 在Rebalancelmple 定义为抽象方法。 removeUnnecessaryMessageQueue 方法主要持久化待移除 MessageQueu 消息消费进度。 

      

    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
            for (MessageQueue mq : mqSet) {
                if (!this.processQueueTable.containsKey(mq)) {
                    if (isOrder && !this.lock(mq)) {
                        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                        continue;
                    }
    
                    this.removeDirtyOffset(mq);
                    ProcessQueue pq = new ProcessQueue();
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0) {
                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                        if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                        } else {
                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                        }
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                    }
                }
    Step4 :遍历本次负载分配到的队列集合,如果 processQueueTable 中没有包含该消息
    队列,表明这是本次新增加的消息队列, 首先从内存中移除该消息队列的消费进度,然后
    从磁盘中读取该消息队列的消费进度,创建 PullRequest 对象。 这里有一个关键,如果读取到的消费进度小于0,则需要校对消费进度。 RocketMQ 提供 CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM TIMESTAMP 方式,
    在创建消费者时可以通过调用 DefaultMQPushConsumer#setConsumeFromWhere 方法设置
    PullRequest的 nextOffset 计算逻辑位于 RebalancePushlmpl#computePullFromWhere

      用两个问题作为总结

      问题1 : PullRequest 对象在什么时候创建并加入到 pullRequestQueue 中以便唤PullMessageService 线程。

      RebalanceService 线程每隔 20s 对消费者订阅的主题进行一次队列重新分配, 每一次分配都会获取主题的所有队列、从 Broke 服务器实时查询当前该主题该消费组内消费者列表,对新分配的消息队列会创建对应的 PullRequest 对象。 在一个 JVM进 程中,同一个消费组同 一个队列只会存在一个 PullRequest对象。

      问题2:集群内多个消费者是如何负载主题下的多个消费队列 ,并且如果有新的消费者加入时,消息队列又会如何重新分布。
      由于每次进行队列重新负载时会从 Broker 实时查询出当前消费组内所有消费者,并且
    对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消
    费队列从而消费消息。
  • 相关阅读:
    单线制DS18B20温度传感器LED数码管显示当前的温度值
    AD转换器的主要指标
    关于swiper动态更改,无法更新的悖论
    在360的兼容模式下关于innerHTML=“”,引发的问题
    比较两个字符串的相似度
    WebSocket使用
    事件绑定addEventListener
    插件开发优缺点
    插件开发宗旨
    学会用博客
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15127356.html
Copyright © 2011-2022 走看看