zoukankan      html  css  js  c++  java
  • 消息队列(四)--- RocketMQ-消息发送2

    概述

      接着我们上一篇继续分析消息发送,上节讲到消息发送前有可能遇到 broker 失效的情况,RocketMQ 主要是采用两种策略 :

    • 重试发送
    • broker 故障延迟机制   后者指的是当发送给某一broker失败后,会将该broker暂时排除在消息队列的选择范围内,到达某个时间点后再继续重试发送,发送的时候消耗的时长越多,那么延迟的时长就越多(就像缓存算法一样,使用得越少,越容易给淘汰)。下面介绍broker 故障延时机制

    LatencyFaultToleranceImpl

    接着昨天选取某一个消息队列的代码 
    
        /**
         * Broker 故障延迟机制
         * 获取,判断是否失效,失效则移存记录
         *
         */
        public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            if (this.sendLatencyFaultEnable) {
                try {
                    //同一个线程,index 增加 1
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        //取余,使同个线程发送到不同的 messagequeue
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        //判断该 broker 是否失效
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                                return mq;
                        }
                    }
                    // 获取的 mq 失效
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (notBestBroker != null) {
                            mq.setBrokerName(notBestBroker);
                            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                        }
                        return mq;
                    } else {
                        latencyFaultTolerance.remove(notBestBroker);
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
    
                return tpInfo.selectOneMessageQueue();
            }
    
            return tpInfo.selectOneMessageQueue(lastBrokerName);
        }
    
    broker 故障延迟机制相关类如下
    

    lftimpl.PNG

    broker故障延时核心类.PNG

    其中LatencyFaultToleranceImpl 定义内部类 FaultItem ,从名字也可以看出这是放置失效broker用的,内部存放 currentLatency表示延时时间,startTimestamp 表示失效时间终点时间戳。
    
        // LatencyFaultToleranceImpl#updateFaultItem 
    
        @Override
        /**
         *
         * @param name broker名字
         * @param currentLatency 当前延迟时长
         * @param notAvailableDuration 延迟时长
         */
        @Override
        public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
            FaultItem old = this.faultItemTable.get(name);
            if (null == old) {
                final FaultItem faultItem = new FaultItem(name);
                faultItem.setCurrentLatency(currentLatency);
                //可以看到 下次可用的时间 =  现在的时间 + 延迟的时间
                faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    
                old = this.faultItemTable.putIfAbsent(name, faultItem);
                if (old != null) {
                    old.setCurrentLatency(currentLatency);
                    old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
                }
            } else {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        }
    
    
        /**
         * 尝试从规避的broker 中选择一个可用的较好的 broker ,如果没找到,将返回 null
         * 哪一个较好呢? 肯定是规避时间较短的
         */
        @Override
        public String pickOneAtLeast() {
            final Enumeration<FaultItem> elements = this.faultItemTable.elements();
            List<FaultItem> tmpList = new LinkedList<FaultItem>();
            while (elements.hasMoreElements()) {
                final FaultItem faultItem = elements.nextElement();
                tmpList.add(faultItem);
            }
    
            if (!tmpList.isEmpty()) {
                //洗牌后又排序,这里看不懂为啥要洗牌后排序,为什么不直接排序?
                Collections.shuffle(tmpList);
                //排序的根据看 FaultItem 的 compareTo 方法,根据时间排序
                Collections.sort(tmpList);
    
                final int half = tmpList.size() / 2;
                if (half <= 0) {
                    return tmpList.get(0).getName();
                } else {
                    // whichItemWorst 记录本次选取的 broker ,加若选去后还是发送失败,那么同个线程下次再次调用 pickOneAtLeast 的时候
                    //就会调用选用另外一个broker
                    final int i = this.whichItemWorst.getAndIncrement() % half;
                    return tmpList.get(i).getName();
                }
            }
    
            return null;
        }
    
    
        // MQFaultStrategy#updateFaultItem
        // 其中 currentLatency 是发送时长(发送花了多少时间)
        public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
            if (this.sendLatencyFaultEnable) {
                long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
                this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
            }
        }
    
        //计算出需要延迟的时长
        private long computeNotAvailableDuration(final long currentLatency) {
            for (int i = latencyMax.length - 1; i >= 0; i--) {
                if (currentLatency >= latencyMax[i])
                    return this.notAvailableDuration[i];
            }
    
            return 0;
        }
    
    
        private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
        private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    
    

    消息发送核心方法

      假如是一个异步发送方法

        public static void main(
            String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
    
            DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
            producer.start();
            producer.setRetryTimesWhenSendAsyncFailed(0);
    
            for (int i = 0; i < 10000000; i++) {
                try {
                    final int index = i;
                    Message msg = new Message("Jodie_topic_1023",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            System.out.printf("%-10d Exception %s %n", index, e);
                            e.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            producer.shutdown();
        }
    
    
    DefaultMQProducerimpl#sendKernelImpl是消息发送的核心方法,我们看一下
    
        /**
         *
         * @param msg 发送的消息
         * @param mq  选定的消息队列
         * @param communicationMode 交互模式
         * @param sendCallback  异步消息 callback 回调
         * @param topicPublishInfo topic消息
         * @param timeout 超时时长
         * @return
         * @throws MQClientException
         * @throws RemotingException
         * @throws MQBrokerException
         * @throws InterruptedException
         */
        private SendResult sendKernelImpl(final Message msg,
            final MessageQueue mq,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final TopicPublishInfo topicPublishInfo,
            final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            //No.1 获取地址
            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
            if (null == brokerAddr) {
                tryToFindTopicPublishInfo(mq.getTopic());
                brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
            }
    
            SendMessageContext context = null;
            if (brokerAddr != null) {
                //
                brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    
                byte[] prevBody = msg.getBody();
                try {
                    //for MessageBatch,ID has been set in the generating process
                    // 为消息分配全局唯一 ID ,如果消息体默认超过 4K ,会对消息采用 zip 压缩,设置标识位
                    if (!(msg instanceof MessageBatch)) {
                        MessageClientIDSetter.setUniqID(msg);
                    }
    
                    int sysFlag = 0;
                    if (this.tryToCompressMessage(msg)) {
                        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    }
    
                    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                    }
    
                    // 如果有钩子函数,执行钩子函数
                    if (hasCheckForbiddenHook()) {
                        CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                        checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                        checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                        checkForbiddenContext.setCommunicationMode(communicationMode);
                        checkForbiddenContext.setBrokerAddr(brokerAddr);
                        checkForbiddenContext.setMessage(msg);
                        checkForbiddenContext.setMq(mq);
                        checkForbiddenContext.setUnitMode(this.isUnitMode());
                        this.executeCheckForbiddenHook(checkForbiddenContext);
                    }
    
                    if (this.hasSendMessageHook()) {
                        context = new SendMessageContext();
                        context.setProducer(this);
                        context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                        context.setCommunicationMode(communicationMode);
                        context.setBornHost(this.defaultMQProducer.getClientIP());
                        context.setBrokerAddr(brokerAddr);
                        context.setMessage(msg);
                        context.setMq(mq);
                        String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                        if (isTrans != null && isTrans.equals("true")) {
                            context.setMsgType(MessageType.Trans_Msg_Half);
                        }
    
                        if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                            context.setMsgType(MessageType.Delay_Msg);
                        }
                        this.executeSendMessageHookBefore(context);
                    }
    
                    //==================== 构造request请求体 ==================
                    
                    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    requestHeader.setTopic(msg.getTopic());
                    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                    requestHeader.setQueueId(mq.getQueueId());
                    requestHeader.setSysFlag(sysFlag);
                    requestHeader.setBornTimestamp(System.currentTimeMillis());
                    requestHeader.setFlag(msg.getFlag());
                    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                    requestHeader.setReconsumeTimes(0);
                    requestHeader.setUnitMode(this.isUnitMode());
                    requestHeader.setBatch(msg instanceof MessageBatch);
                    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                        if (reconsumeTimes != null) {
                            requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                        }
    
                        String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                        if (maxReconsumeTimes != null) {
                            requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                        }
                    }
    
                    SendResult sendResult = null;
    
                    //==================== 构造request请求体 ==================
    
                    //(重要)根据不同的交互模式,发送请求,核心发送逻辑
                    switch (communicationMode) {
                        case ASYNC:
                            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                msg,
                                requestHeader,
                                timeout,
                                communicationMode,
                                sendCallback,
                                topicPublishInfo,
                                this.mQClientFactory,
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                                context,
                                this);
                            break;
                        case ONEWAY:
                        case SYNC:
                            sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                msg,
                                requestHeader,
                                timeout,
                                communicationMode,
                                context,
                                this);
                            break;
                        default:
                            assert false;
                            break;
                    }
    
                    if (this.hasSendMessageHook()) {
                        context.setSendResult(sendResult);
                        this.executeSendMessageHookAfter(context);
                    }
    
                    return sendResult;
                    ....
                    ....
                    (异常处理)
    
        }           
    
    

    底层发送

      MQ 客户端发送消息的入口是 MQClientAPIImpl#sendMessage,MQClientAPIImpl 持有 RemotingClient 字段,它是个接口,实现类是 NettyRemotingClient ,即是它就是真正执行发送的对象,请求命令是 RequestCode.SEND_MESSAGE,我们可以找到该命令的处理类: org . apache.rocketmq. broker.processor.SendMessageProcessor。人口方法在 SendMessageProcessor#sendMessage。

        //SendMessageProcessor#sendMessage 
    
        private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
            final RemotingCommand request,
            final SendMessageContext sendMessageContext,
            final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
    
            final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
            final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
    
            response.setOpaque(request.getOpaque());
    
            response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
            response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
    
            log.debug("receive SendMessage request command, {}", request);
    
            final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
            if (this.brokerController.getMessageStore().now() < startTimstamp) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
                return response;
            }
    
            response.setCode(-1);
            // No.1 先检查
            super.msgCheck(ctx, requestHeader, response);
            if (response.getCode() != -1) {
                return response;
            }
    
            final byte[] body = request.getBody();
    
            int queueIdInt = requestHeader.getQueueId();
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
    
            if (queueIdInt < 0) {
                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
            }
    
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic(requestHeader.getTopic());
            msgInner.setQueueId(queueIdInt);
            // 如果消息重试次数超过允许的最大重试次数,消息将进入DLD延迟队列
            if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
                return response;
            }
    
            msgInner.setBody(body);
            msgInner.setFlag(requestHeader.getFlag());
            MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
            msgInner.setPropertiesString(requestHeader.getProperties());
            msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
            msgInner.setBornHost(ctx.channel().remoteAddress());
            msgInner.setStoreHost(this.getStoreHost());
            msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    
            if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (traFlag != null) {
                    response.setCode(ResponseCode.NO_PERMISSION);
                    response.setRemark(
                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
                    return response;
                }
            }
    
            //(重要)进行消息存储,后续分析
            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    
            return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);
    
        }
    
    
        //
    
        protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
            final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
            // 检查是否可写
            if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
                && this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                    + "] sending message is forbidden");
                return response;
            }
            // 检查该Topic是否可以进行消息发送
            if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
                String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
                log.warn(errorMsg);
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(errorMsg);
                return response;
            }
    
            //在 NameServer端存储主题的配置信息
            TopicConfig topicConfig =
                this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
            if (null == topicConfig) {
                int topicSysFlag = 0;
                if (requestHeader.isUnitMode()) {
                    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
                    } else {
                        topicSysFlag = TopicSysFlag.buildSysFlag(true, false);
                    }
                }
    
                log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
                    requestHeader.getTopic(),
                    requestHeader.getDefaultTopic(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
    
                if (null == topicConfig) {
                    if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        topicConfig =
                            this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                                requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
                                topicSysFlag);
                    }
                }
    
                if (null == topicConfig) {
                    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
                    response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
                        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
                    return response;
                }
            }
            // 检查队列,如果队列不合法,返回错误码
            int queueIdInt = requestHeader.getQueueId();
            int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
            if (queueIdInt >= idValid) {
                String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
                    queueIdInt,
                    topicConfig.toString(),
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
    
                log.warn(errorInfo);
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark(errorInfo);
    
                return response;
            }
            return response;
        }
    
    
    

    总结

      发送的逻辑我们可以归纳一下

    1. 创建 DefaultProducer,启动其 start 方法
    2. send 方法调用会,利用 DefaultProducer 内部的 DefaultProducerImpl 执行发送操作,DefaultProducerImpl 使用 MQClient 执行发送,这中间有个broker 故障延迟发送机制,最后到了MQClientAPIImpl 的发送方法,MQClientAPIImpl 使用 RemotingClient 发送command 的,消息检查,最后到达DefaultMessageStore#putMessage 进行消息存储 。   再有一个,我们知道了brokerController 和 NamerServerController 这两个类存放这各种重要的信息,使得消息在发送模块等其他模块(只要该模块持有controller)就可以方法得到各种消息。   同时类在设计的时候通过设计接口,功能扩展封装在抽象类中,而实际使用不要持有最终实现,而是持有接口类,方便以后扩展。

    参考资料

    • 《RocketMQ技术内幕》
  • 相关阅读:
    iOS 解决导航栏左右 BarButtonItem偏移位置的问题
    iOS UIButton 图片文字上下垂直布局 解决方案
    Android开发实战(十八):Android Studio 优秀插件:GsonFormat
    Android项目实战(十七):QQ空间实现(二)—— 分享功能 / 弹出PopupWindow
    AsyncTask.cancel()的结束问题
    Android项目实战(十五):自定义不可滑动的ListView和GridView
    RecyclerView解析--onViewDetachedFromWindow()/onViewAttachedToWindow()
    RecyclerView添加头部和底部视图的实现
    浅谈TabLayout(ViewPager+Tab联动)
    Android项目实战(十四):TextView显示html样式的文字
  • 原文地址:https://www.cnblogs.com/Benjious/p/11653817.html
Copyright © 2011-2022 走看看