zoukankan      html  css  js  c++  java
  • Rocketmq 总结 (二) -- 消息发送

    源码入口在DefaultMQProducerImpl.sendKernelImpl

    第一步是选择要发送的messageQueue

    这里又分为两步

      1 获取该topic当时的发布信息,也就是该topic都落在哪些broker

      DefaultMQProducerImpl # tryToFindTopicPublishInfo

    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {//如果缓存过了,并且可用
                return topicPublishInfo;
            } else {
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);//从nameServer获取
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }

     2 选择messgeQueue

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
        }
    MQFaultStrategy # selectOneMessageQueue
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            if (this.sendLatencyFaultEnable) {//如果开启了 延时故障延时机制 默认关闭
                try {
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();//就是累加然后取模
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                            return mq;
                    }
    
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();//从上次失败发送broker里选一个
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (notBestBroker != null) {
                            mq.setBrokerName(notBestBroker);
                            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                        }
                        return mq;
                    } else {
                        latencyFaultTolerance.remove(notBestBroker);//走到这里说明没有失败得了,那就把这个brokerName从缓存中去掉
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
    
                return tpInfo.selectOneMessageQueue();
            }
    
            return tpInfo.selectOneMessageQueue(lastBrokerName);
        }

    有一个细节,就是MessageQueue的信息时放在了 SendMessageRequestHeader

    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    requestHeader.setTopic(msg.getTopic());
                    requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                    requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                    requestHeader.setQueueId(mq.getQueueId());
                    requestHeader.setSysFlag(sysFlag);
                    requestHeader.setBornTimestamp(System.currentTimeMillis());
                    requestHeader.setFlag(msg.getFlag());
                    requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                    requestHeader.setReconsumeTimes(0);
                    requestHeader.setUnitMode(this.isUnitMode());
                    requestHeader.setBatch(msg instanceof MessageBatch);

    真正进行网络发送是在 

    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                                brokerAddr,
                                mq.getBrokerName(),
                                tmpMessage,
                                requestHeader,
                                timeout - costTimeAsync,
                                communicationMode,
                                sendCallback,
                                topicPublishInfo,
                                this.mQClientFactory,
                                this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                                context,
                                this);
  • 相关阅读:
    P2121 拆地毯
    ※P1194 买礼物
    P1991 无线通讯网
    P2872 [USACO07DEC]Building Roads S
    python+pycharm+selenium+谷歌浏览器驱动 自动化环境部署(一)
    from bs4 import BeautifulSoup 引入需要安装的文件和步骤
    小菜鸟的python3.8.5+RobotFramework+RIDE的安装之路
    IntelliJ IDEA 之 工具箱(Tool Box)
    RobotFrameWork Web自动化测试环境搭建
    Python pip 安装与使用
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14958423.html
Copyright © 2011-2022 走看看