zoukankan      html  css  js  c++  java
  • rocketmq发送消息的期间的broker选择

    DefaultMQProducerImpl文件中有一个sendDefaultImpl,发送消息的时候就是从这里走的,路由信息怎么拿的?
    刚刚启动的时候,没有topic信息的,所以需要取注册中心拿,拿到以后缓存在MQclientInstance的topicRouteTable、BrokerAddrTable。


    在这个方法里面,同步模式下,消息一次没有发送成功就会按照重试次数继续走
    selectOneMessageQueue逻辑进行重试。
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            long costTime = beginTimestampPrev - beginTimestampFirst;
                            if (timeout < costTime) {
                                callTimeout = true;
                                break;
                            }
    
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            switch (communicationMode) {
                                case ASYNC:
                                    return null;
                                case ONEWAY:
                                    return null;
                                case SYNC:
                                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            continue;
                                        }
                                    }
    
                                    return sendResult;
                                default:
                                    break;
                            }
                        } catch (RemotingException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
    

    这里的selectOneMessageQueue的其实内部调用MQFaultStrategy内部对象的selectOneMessageQueue:

    我个人看来,这个估算功能倒不是特别重要,所以mq默认是不使用这个逻辑,不过这个不妨碍我们研究下。下面是MQFaultStrategy的selectOneMessageQueue

        public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            if (this.sendLatencyFaultEnable) {
                try {
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                                return mq;
                        }
                    }
    
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (notBestBroker != null) {
                            mq.setBrokerName(notBestBroker);
                            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                        }
                        return mq;
                    } else {
                        latencyFaultTolerance.remove(notBestBroker);
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
    
                return tpInfo.selectOneMessageQueue();
            }
    
            return tpInfo.selectOneMessageQueue(lastBrokerName);
        }
    

      

    如果sendLatencyFaultEnable是false,默认也是false。那么每次所有队列号+1取出消息队列(消息队列说白了就是每个broker单位有一个队列,队列长度由每个broker配置指定)里面的消息,同时剔除掉上次失败的brokername。

    这里有一个问题是,如果只有两个broker那么可以解决大部分问题,但是如果broker很多,那么我们希望mq有一个时间维度上、可以估算出来一个broker什么时候可用。尤其对于rocketmq来说,因为broker发生变化的时候,producer不是第一时间被通知,而是异步轮训得到的。另外nameserver跟broker之间也是异步轮询探活。

    打开sendLatencyFaultEnable的话,也就是在发送消息前,估算下这个broker是否可用的,如果是可用的那么直接返回。上面代码:

                            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))

    我感觉应该是写错了,应该是mq.getBrokerName().notEquals(lastBrokerName)

    这里有一个调用latencyFaultTolerance.isAvailable来判断broker是否可用,这个怎么来的呢?

    实际上,在sendDefaultImpl的时候,无论消息是否发送成功与否,都会调用producer内部MQFaultStrategy的updateFaultItem,在这里会去更新latencyFaultTolerance

    下面是MQFaultStrategy一些重要成员和重要方法:

        private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
        private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    
    
        public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
            if (this.sendLatencyFaultEnable) {
                long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
                this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
            }
        }
    
        private long computeNotAvailableDuration(final long currentLatency) {
            for (int i = latencyMax.length - 1; i >= 0; i--) {
                if (currentLatency >= latencyMax[i])
                    return this.notAvailableDuration[i];
            }
    
            return 0;
        }
    

      

    在sendDefaultImpl的发送消息期间,只有发送成,这个isolation才是false,这个时候通过computeNotAvailableDuration拿到的duration一般就是0,否则发送消息消耗时间越大,从latencyMax拿到的序列号越大,从notAvailableDuration拿到的duration也就越大。

    如果有故障,isolation是true,那么认为这个broker不可用时间是180000L,也就是3分钟

    继续进入LatencyFaultToleranceImpl的updateFaultItem:

        @Override
        public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
            FaultItem old = this.faultItemTable.get(name);
            if (null == old) {
                final FaultItem faultItem = new FaultItem(name);
                faultItem.setCurrentLatency(currentLatency);
                faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    
                old = this.faultItemTable.putIfAbsent(name, faultItem);
                if (old != null) {
                    old.setCurrentLatency(currentLatency);
                    old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
                }
            } else {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        }
    

      这里构造一个faultitem,顾名思义就是错误的、有问题的科目,name就是broker-name,currentLatency就是上次发送消息从开始到结束的消耗时间,starttimestamp就是估算的下次可用的时间戳。

    继续看FaultItem各个重要方法:

          @Override
            public int compareTo(final FaultItem other) {
                if (this.isAvailable() != other.isAvailable()) {
                    if (this.isAvailable())
                        return -1;
    
                    if (other.isAvailable())
                        return 1;
                }
    
                if (this.currentLatency < other.currentLatency)
                    return -1;
                else if (this.currentLatency > other.currentLatency) {
                    return 1;
                }
    
                if (this.startTimestamp < other.startTimestamp)
                    return -1;
                else if (this.startTimestamp > other.startTimestamp) {
                    return 1;
                }
    
                return 0;
            }
    
            public boolean isAvailable() {
                return (System.currentTimeMillis() - startTimestamp) >= 0;
            }
    

      

    再回到策略MQFaultStrategy的selectOneMessageQueue,结合上面的代码,如果找到一个可用broker那么直接返回。如果找不到调用pickOneAtLeast找一个差不多的返回

    public String pickOneAtLeast() {
            final Enumeration<FaultItem> elements = this.faultItemTable.elements();
            List<FaultItem> tmpList = new LinkedList<FaultItem>();
            while (elements.hasMoreElements()) {
                final FaultItem faultItem = elements.nextElement();
                tmpList.add(faultItem);
            }
    
            if (!tmpList.isEmpty()) {
                Collections.shuffle(tmpList);
    
                Collections.sort(tmpList);
    
                final int half = tmpList.size() / 2;
                if (half <= 0) {
                    return tmpList.get(0).getName();
                } else {
                    final int i = this.whichItemWorst.getAndIncrement() % half;
                    return tmpList.get(i).getName();
                }
            }
    
            return null;
        }
    

      faultiitem已经支持按照好坏排序,那么排好序后,从好的前半部分再进行随机选一个brokername

  • 相关阅读:
    C语言库函数大全及应用实例十一
    Oracle数据库游标使用大全
    搂来的menu
    vagerent的Asp.net笔记
    小别
    图解Oracle 11g physical standby Rolling Upgrade物理备库滚动升级特性
    图解MySQL Replication的几种拓扑
    MySQL企业版VS社区版
    图解揭秘Oracle Buffer Header数据结构
    Oracle、MySQL、SQL Server架构大对比
  • 原文地址:https://www.cnblogs.com/notlate/p/11615878.html
Copyright © 2011-2022 走看看