zoukankan      html  css  js  c++  java
  • RocketMq总结(五) -- 消息队列负载均衡和再分配

      RocketMQ 消息队列重新分布 由RebalanceService 来实现的 。一个 MQClientInstance持有一个RebalanceService现实,并随 MQClientlnstance 的启动而启动
    public class MQClientInstance {
        private final static long LOCK_TIMEOUT_MILLIS = 3000;
        private final InternalLogger log = ClientLogger.getLog();
        private final ClientConfig clientConfig;
        private final int instanceIndex;
        private final String clientId;
        private final long bootTimestamp = System.currentTimeMillis();
        private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
        private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
        private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
        private final NettyClientConfig nettyClientConfig;
        private final MQClientAPIImpl mQClientAPIImpl;
        private final MQAdminImpl mQAdminImpl;
        private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
        private final Lock lockNamesrv = new ReentrantLock();
        private final Lock lockHeartbeat = new ReentrantLock();
        private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
            new ConcurrentHashMap<String, HashMap<Long, String>>();
        private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
            new ConcurrentHashMap<String, HashMap<String, Integer>>();
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "MQClientFactoryScheduledThread");
            }
        });
        private final ClientRemotingProcessor clientRemotingProcessor;
        private final PullMessageService pullMessageService;
        private final RebalanceService rebalanceService;
        private final DefaultMQProducer defaultMQProducer;
        private final ConsumerStatsManager consumerStatsManager;
      RebalanceService # run
    @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                this.waitForRunning(waitInterval);
                this.mqClientFactory.doRebalance();
            }
    
            log.info(this.getServiceName() + " service end");
        }

      RebalanceService 线程默 20s 执行 mqClientFactory.doRebalance()方法,精髓都在20s

    public void doRebalance() {
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }

      最终调用的是 Rebalancelmpl#rebalanceByTopic

    case CLUSTERING: {
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                    if (null == mqSet) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                        }
                    }
    
                    if (null == cidAll) {
                        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                    }
    
                    if (mqSet != null && cidAll != null) {
                        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                        mqAll.addAll(mqSet);
    
                        Collections.sort(mqAll);
                        Collections.sort(cidAll);
    
                        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
                        List<MessageQueue> allocateResult = null;
                        try {
                            allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                        } catch (Throwable e) {
                            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                e);
                            return;
                        }
    
                        Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                        if (allocateResult != null) {
                            allocateResultSet.addAll(allocateResult);
                        }
    
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        if (changed) {
                            log.info(
                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                                allocateResultSet.size(), allocateResultSet);
                            this.messageQueueChanged(topic, mqSet, allocateResultSet);
                        }
                    }
                    break;
    Step1 :从主题订阅信息缓存表中获取主题的队列信息; 发送请求从 Broker中 该消费组内 当前所有 的消费者客户端 ID,主题 topic 的队列可能分布在多个 Broker上,那 请求发往哪个 Broker呢? RocketMQ 从主题的路由信息表中随机选择一个 Broker。 Broker什么 存在消费组内所有消费者的信息呢?我们不妨回忆一下消费者在启动的时候会向MQClientlnstance中注册消费者,然后 MQClientlnstance 会向所有的 Broker 送心跳包,心跳包中包含 MQClientlnstance 消费者信息。如果 mqSet, cidAll 任意一个为空则忽略本次消息队列负载。
    Step2 :首先对 cidAll,mqAll 排序,这个很重要,同 个消费组内看到的视图保持一致,
    确保同 个消费队列不会被多个消费者分配 RocketMQ 消息队列分配算法接口
     
    Rocketmq默认的分配算法是 AllocateMessageQueueAveragely
     

    Step3:根据分配到的MessageQueue更新本消费者客户端的消息

      Rebalancelmpl#updateProcessQueueTableInRebalance
      
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
            final boolean isOrder) {
            boolean changed = false;
    
            Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry<MessageQueue, ProcessQueue> next = it.next();
                MessageQueue mq = next.getKey();
                ProcessQueue pq = next.getValue();
    
                if (mq.getTopic().equals(topic)) {
                    if (!mqSet.contains(mq)) {
                        pq.setDropped(true);
                        if (this.removeUnnecessaryMessageQueue(mq, pq)) {
                            it.remove();
                            changed = true;
                            log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
                        }
                    }
    Step3 : ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable ,当前消费者负载的消息队列缓存表,如果缓存表中的 MessageQueue 不包含在 mqSet 中,说明经过本次消息队列负载后,该 mq 被分配给其他消费者,故需要暂停该消息队列消息的消费,方法是将 ProccessQueue 的状态设置为 dropped=true,该 ProcessQueue 中的消息将不会再被消费,调用 removeUnnecessaryMessageQueue 方法判断是否将 MessageQueue,ProccessQueue 缓存
    表中移除。removeUnnecessaryMessageQueue 在Rebalancelmple 定义为抽象方法。 removeUnnecessaryMessageQueue 方法主要持久化待移除 MessageQueu 消息消费进度。 

      

    List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
            for (MessageQueue mq : mqSet) {
                if (!this.processQueueTable.containsKey(mq)) {
                    if (isOrder && !this.lock(mq)) {
                        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
                        continue;
                    }
    
                    this.removeDirtyOffset(mq);
                    ProcessQueue pq = new ProcessQueue();
                    long nextOffset = this.computePullFromWhere(mq);
                    if (nextOffset >= 0) {
                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
                        if (pre != null) {
                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
                        } else {
                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
                            PullRequest pullRequest = new PullRequest();
                            pullRequest.setConsumerGroup(consumerGroup);
                            pullRequest.setNextOffset(nextOffset);
                            pullRequest.setMessageQueue(mq);
                            pullRequest.setProcessQueue(pq);
                            pullRequestList.add(pullRequest);
                            changed = true;
                        }
                    } else {
                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                    }
                }
    Step4 :遍历本次负载分配到的队列集合,如果 processQueueTable 中没有包含该消息
    队列,表明这是本次新增加的消息队列, 首先从内存中移除该消息队列的消费进度,然后
    从磁盘中读取该消息队列的消费进度,创建 PullRequest 对象。 这里有一个关键,如果读取到的消费进度小于0,则需要校对消费进度。 RocketMQ 提供 CONSUME_FROM_LAST_OFFSET, CONSUME_FROM_FIRST_OFFSET, CONSUME_FROM TIMESTAMP 方式,
    在创建消费者时可以通过调用 DefaultMQPushConsumer#setConsumeFromWhere 方法设置
    PullRequest的 nextOffset 计算逻辑位于 RebalancePushlmpl#computePullFromWhere

      用两个问题作为总结

      问题1 : PullRequest 对象在什么时候创建并加入到 pullRequestQueue 中以便唤PullMessageService 线程。

      RebalanceService 线程每隔 20s 对消费者订阅的主题进行一次队列重新分配, 每一次分配都会获取主题的所有队列、从 Broke 服务器实时查询当前该主题该消费组内消费者列表,对新分配的消息队列会创建对应的 PullRequest 对象。 在一个 JVM进 程中,同一个消费组同 一个队列只会存在一个 PullRequest对象。

      问题2:集群内多个消费者是如何负载主题下的多个消费队列 ,并且如果有新的消费者加入时,消息队列又会如何重新分布。
      由于每次进行队列重新负载时会从 Broker 实时查询出当前消费组内所有消费者,并且
    对消息队列、消费者列表进行排序,这样新加入的消费者就会在队列重新分布时分配到消
    费队列从而消费消息。
  • 相关阅读:
    matlab矩阵中如何去掉重复的行;如何找到相同的行,并找到其位置
    Coursera 机器学习 第9章(下) Recommender Systems 学习笔记
    机器学习基石笔记1——在何时可以使用机器学习(1)
    Coursera 机器学习 第9章(上) Anomaly Detection 学习笔记
    matlab安装过程的被要求的配置程序
    jdk环境变量配置
    Coursera 机器学习 第8章(下) Dimensionality Reduction 学习笔记
    Coursera 机器学习 第8章(上) Unsupervised Learning 学习笔记
    Coursera 机器学习 第7章 Support Vector Machines 学习笔记
    linux服务器---squid限制
  • 原文地址:https://www.cnblogs.com/juniorMa/p/15127356.html
Copyright © 2011-2022 走看看