zoukankan      html  css  js  c++  java
  • RocketMQ源码(一)Message 发送与接收

    1、概述
    Producer 发送消息。主要是同步发送消息源码,涉及到 异步/Oneway发送消息,事务消息会跳过。
    Broker 接收消息。

    2、Producer 发送消息

    21.DefaultMQProducerImpl#sendDefaultImpl()

      1: public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      2:     return send(msg, this.defaultMQProducer.getSendMsgTimeout());
      3: }
      4: 
      5: public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      6:     return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
      7: }
      8: 
      9: private SendResult sendDefaultImpl(//
     10:     Message msg, //
     11:     final CommunicationMode communicationMode, //
     12:     final SendCallback sendCallback, //
     13:     final long timeout//
     14: ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
     15:     // 校验 Producer 处于运行状态
     16:     this.makeSureStateOK();
     17:     // 校验消息格式
     18:     Validators.checkMessage(msg, this.defaultMQProducer);
     19:     //
     20:     final long invokeID = random.nextLong(); // 调用编号;用于下面打印日志,标记为同一次发送消息
     21:     long beginTimestampFirst = System.currentTimeMillis();
     22:     long beginTimestampPrev = beginTimestampFirst;
     23:     long endTimestamp = beginTimestampFirst;
     24:     // 获取 Topic路由信息
     25:     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
     26:     if (topicPublishInfo != null && topicPublishInfo.ok()) {
     27:         MessageQueue mq = null; // 最后选择消息要发送到的队列
     28:         Exception exception = null;
     29:         SendResult sendResult = null; // 最后一次发送结果
     30:         int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; // 同步多次调用
     31:         int times = 0; // 第几次发送
     32:         String[] brokersSent = new String[timesTotal]; // 存储每次发送消息选择的broker名
     33:         // 循环调用发送消息,直到成功
     34:         for (; times < timesTotal; times++) {
     35:             String lastBrokerName = null == mq ? null : mq.getBrokerName();
     36:             MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 选择消息要发送到的队列
     37:             if (tmpmq != null) {
     38:                 mq = tmpmq;
     39:                 brokersSent[times] = mq.getBrokerName();
     40:                 try {
     41:                     beginTimestampPrev = System.currentTimeMillis();
     42:                     // 调用发送消息核心方法
     43:                     sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
     44:                     endTimestamp = System.currentTimeMillis();
     45:                     // 更新Broker可用性信息
     46:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
     47:                     switch (communicationMode) {
     48:                         case ASYNC:
     49:                             return null;
     50:                         case ONEWAY:
     51:                             return null;
     52:                         case SYNC:
     53:                             if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
     54:                                 if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { // 同步发送成功但存储有问题时 && 配置存储异常时重新发送开关 时,进行重试
     55:                                     continue;
     56:                                 }
     57:                             }
     58:                             return sendResult;
     59:                         default:
     60:                             break;
     61:                     }
     62:                 } catch (RemotingException e) { // 打印异常,更新Broker可用性信息,更新继续循环
     63:                     endTimestamp = System.currentTimeMillis();
     64:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
     65:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
     66:                     log.warn(msg.toString());
     67:                     exception = e;
     68:                     continue;
     69:                 } catch (MQClientException e) { // 打印异常,更新Broker可用性信息,继续循环
     70:                     endTimestamp = System.currentTimeMillis();
     71:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
     72:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
     73:                     log.warn(msg.toString());
     74:                     exception = e;
     75:                     continue;
     76:                 } catch (MQBrokerException e) { // 打印异常,更新Broker可用性信息,部分情况下的异常,直接返回,结束循环
     77:                     endTimestamp = System.currentTimeMillis();
     78:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
     79:                     log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
     80:                     log.warn(msg.toString());
     81:                     exception = e;
     82:                     switch (e.getResponseCode()) {
     83:                         // 如下异常continue,进行发送消息重试
     84:                         case ResponseCode.TOPIC_NOT_EXIST:
     85:                         case ResponseCode.SERVICE_NOT_AVAILABLE:
     86:                         case ResponseCode.SYSTEM_ERROR:
     87:                         case ResponseCode.NO_PERMISSION:
     88:                         case ResponseCode.NO_BUYER_ID:
     89:                         case ResponseCode.NOT_IN_CURRENT_UNIT:
     90:                             continue;
     91:                         // 如果有发送结果,进行返回,否则,抛出异常;
     92:                         default:
     93:                             if (sendResult != null) {
     94:                                 return sendResult;
     95:                             }
     96:                             throw e;
     97:                     }
     98:                 } catch (InterruptedException e) {
     99:                     endTimestamp = System.currentTimeMillis();
    100:                     this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
    101:                     log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
    102:                     log.warn(msg.toString());
    103:                     throw e;
    104:                 }
    105:             } else {
    106:                 break;
    107:             }
    108:         }
    109:         // 返回发送结果
    110:         if (sendResult != null) {
    111:             return sendResult;
    112:         }
    113:         // 根据不同情况,抛出不同的异常
    114:         String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", times, System.currentTimeMillis() - beginTimestampFirst,
    115:                 msg.getTopic(), Arrays.toString(brokersSent)) + FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
    116:         MQClientException mqClientException = new MQClientException(info, exception);
    117:         if (exception instanceof MQBrokerException) {
    118:             mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
    119:         } else if (exception instanceof RemotingConnectException) {
    120:             mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
    121:         } else if (exception instanceof RemotingTimeoutException) {
    122:             mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
    123:         } else if (exception instanceof MQClientException) {
    124:             mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
    125:         }
    126:         throw mqClientException;
    127:     }
    128:     // Namesrv找不到异常
    129:     List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
    130:     if (null == nsList || nsList.isEmpty()) {
    131:         throw new MQClientException(
    132:             "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
    133:     }
    134:     // 消息路由找不到异常
    135:     throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
    136:         null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    137: }
    

    说明 :发送消息。步骤:获取消息路由信息,选择要发送到的消息队列,执行消息发送核心方法,并对发送结果进行封装返回。
    第 1 至 7 行:对sendsendDefaultImpl(...)进行封装。
    第 20 行 :invokeID仅仅用于打印日志,无实际的业务用途。
    第 25 行 :获取 Topic路由信息, 详细解析见:DefaultMQProducerImpl#tryToFindTopicPublishInfo()
    第 30 & 34 行 :计算调用发送消息到成功为止的最大次数,并进行循环。同步或异步发送消息会调用多次,默认配置为3次。
    第 36 行 :选择消息要发送到的队列,详细解析见:MQFaultStrategy
    第 43 行 :调用发送消息核心方法,详细解析见:DefaultMQProducerImpl#sendKernelImpl()
    第 46 行 :更新Broker可用性信息。在选择发送到的消息队列时,会参考Broker发送消息的延迟,详细解析见:MQFaultStrategy
    第 62 至 68 行:当抛出RemotingException时,如果进行消息发送失败重试,则可能导致消息发送重复。例如,发送消息超时(RemotingTimeoutException),实际Broker接收到该消息并处理成功。因此,Consumer在消费时,需要保证幂等性。

    2.1DefaultMQProducerImpl#tryToFindTopicPublishInfo()
    ···
    1: private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    2: // 缓存中获取 Topic发布信息
    3: TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    4: // 当无可用的 Topic发布信息时,从Namesrv获取一次
    5: if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    6: this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    7: this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    8: topicPublishInfo = this.topicPublishInfoTable.get(topic);
    9: }
    10: // 若获取的 Topic发布信息时候可用,则返回
    11: if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    12: return topicPublishInfo;
    13: } else { // 使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。用于 Topic发布信息不存在 && Broker支持自动创建Topic
    14: this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    15: topicPublishInfo = this.topicPublishInfoTable.get(topic);
    16: return topicPublishInfo;
    17: }
    18: }
    ···

    说明 :获得 Topic发布信息。优先从缓存topicPublishInfoTable,其次从Namesrv中获得。
    第 3 行 :从缓存topicPublishInfoTable中获得 Topic发布信息。
    第 5 至 9 行 :从 Namesrv 中获得 Topic发布信息。
    第 13 至 17 行 :当从 Namesrv 无法获取时,使用 {@link DefaultMQProducer#createTopicKey} 对应的 Topic发布信息。目的是当 Broker 开启自动创建 Topic开关时,Broker 接收到消息后自动创建Topic,详细解析见《RocketMQ 源码分析 —— Topic》。

    3.1 MQFaultStrategy

     1: public class MQFaultStrategy {
     2:     private final static Logger log = ClientLogger.getLog();
     3: 
     4:     
     8:     private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
     9:     
    12:     private boolean sendLatencyFaultEnable = false;
    13:     
    16:     private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    17:     
    20:     private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    21: 
    22:     
    29:     public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    30:         if (this.sendLatencyFaultEnable) {
    31:             try {
    32:                 // 获取 brokerName=lastBrokerName && 可用的一个消息队列
    33:                 int index = tpInfo.getSendWhichQueue().getAndIncrement();
    34:                 for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    35:                     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    36:                     if (pos < 0)
    37:                         pos = 0;
    38:                     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    39:                     if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
    40:                         if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
    41:                             return mq;
    42:                     }
    43:                 }
    44:                 // 选择一个相对好的broker,并获得其对应的一个消息队列,不考虑该队列的可用性
    45:                 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    46:                 int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    47:                 if (writeQueueNums > 0) {
    48:                     final MessageQueue mq = tpInfo.selectOneMessageQueue();
    49:                     if (notBestBroker != null) {
    50:                         mq.setBrokerName(notBestBroker);
    51:                         mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
    52:                     }
    53:                     return mq;
    54:                 } else {
    55:                     latencyFaultTolerance.remove(notBestBroker);
    56:                 }
    57:             } catch (Exception e) {
    58:                 log.error("Error occurred when selecting message queue", e);
    59:             }
    60:             // 选择一个消息队列,不考虑队列的可用性
    61:             return tpInfo.selectOneMessageQueue();
    62:         }
    63:         // 获得 lastBrokerName 对应的一个消息队列,不考虑该队列的可用性
    64:         return tpInfo.selectOneMessageQueue(lastBrokerName);
    65:     }
    66: 
    67:     
    74:     public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    75:         if (this.sendLatencyFaultEnable) {
    76:             long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
    77:             this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    78:         }
    79:     }
    80: 
    81:     
    87:     private long computeNotAvailableDuration(final long currentLatency) {
    88:         for (int i = latencyMax.length - 1; i >= 0; i--) {
    89:             if (currentLatency >= latencyMax[i])
    90:                 return this.notAvailableDuration[i];
    91:         }
    92:         return 0;
    93:     }
    

    说明 :Producer消息发送容错策略。默认情况下容错策略关闭,即sendLatencyFaultEnable=false。
    第 30 至 62 行 :容错策略选择消息队列逻辑。优先获取可用队列,其次选择一个broker获取队列,最差返回任意broker的一个队列。
    第 64 行 :未开启容错策略选择消息队列逻辑。
    第 74 至 79 行 :更新延迟容错信息。当 Producer 发送消息时间过长,则逻辑认为N秒内不可用。按照latencyMax,notAvailableDuration的配置,对应如下:

    | Producer发送消息消耗时长 | Broker不可用时长 |
    | — | — |
    | >= 15000 ms | 600 1000 ms |
    | >= 3000 ms | 180 1000 ms |
    | >= 2000 ms | 120 1000 ms |
    | >= 1000 ms | 60 1000 ms |
    | >= 550 ms | 30 * 1000 ms |
    | >= 100 ms | 0 ms |
    | >= 50 ms | 0 ms |

    2.4DefaultMQProducerImpl#sendKernelImpl()

    1: private SendResult sendKernelImpl(final Message msg, //
      2:     final MessageQueue mq, //
      3:     final CommunicationMode communicationMode, //
      4:     final SendCallback sendCallback, //
      5:     final TopicPublishInfo topicPublishInfo, //
      6:     final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
      7:     // 获取 broker地址
      8:     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
      9:     if (null == brokerAddr) {
     10:         tryToFindTopicPublishInfo(mq.getTopic());
     11:         brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
     12:     }
     13:     //
     14:     SendMessageContext context = null;
     15:     if (brokerAddr != null) {
     16:         // 是否使用broker vip通道。broker会开启两个端口对外服务。
     17:         brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
     18:         byte[] prevBody = msg.getBody(); // 记录消息内容。下面逻辑可能改变消息内容,例如消息压缩。
     19:         try {
     20:             // 设置唯一编号
     21:             MessageClientIDSetter.setUniqID(msg);
     22:             // 消息压缩
     23:             int sysFlag = 0;
     24:             if (this.tryToCompressMessage(msg)) {
     25:                 sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
     26:             }
     27:             // 事务
     28:             final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
     29:             if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
     30:                 sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
     31:             }
     32:             // hook:发送消息校验
     33:             if (hasCheckForbiddenHook()) {
     34:                 CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
     35:                 checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
     36:                 checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
     37:                 checkForbiddenContext.setCommunicationMode(communicationMode);
     38:                 checkForbiddenContext.setBrokerAddr(brokerAddr);
     39:                 checkForbiddenContext.setMessage(msg);
     40:                 checkForbiddenContext.setMq(mq);
     41:                 checkForbiddenContext.setUnitMode(this.isUnitMode());
     42:                 this.executeCheckForbiddenHook(checkForbiddenContext);
     43:             }
     44:             // hook:发送消息前逻辑
     45:             if (this.hasSendMessageHook()) {
     46:                 context = new SendMessageContext();
     47:                 context.setProducer(this);
     48:                 context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
     49:                 context.setCommunicationMode(communicationMode);
     50:                 context.setBornHost(this.defaultMQProducer.getClientIP());
     51:                 context.setBrokerAddr(brokerAddr);
     52:                 context.setMessage(msg);
     53:                 context.setMq(mq);
     54:                 String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
     55:                 if (isTrans != null && isTrans.equals("true")) {
     56:                     context.setMsgType(MessageType.Trans_Msg_Half);
     57:                 }
     58:                 if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
     59:                     context.setMsgType(MessageType.Delay_Msg);
     60:                 }
     61:                 this.executeSendMessageHookBefore(context);
     62:             }
     63:             // 构建发送消息请求
     64:             SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
     65:             requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
     66:             requestHeader.setTopic(msg.getTopic());
     67:             requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
     68:             requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
     69:             requestHeader.setQueueId(mq.getQueueId());
     70:             requestHeader.setSysFlag(sysFlag);
     71:             requestHeader.setBornTimestamp(System.currentTimeMillis());
     72:             requestHeader.setFlag(msg.getFlag());
     73:             requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
     74:             requestHeader.setReconsumeTimes(0);
     75:             requestHeader.setUnitMode(this.isUnitMode());
     76:             if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { // 消息重发Topic
     77:                 String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
     78:                 if (reconsumeTimes != null) {
     79:                     requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
     80:                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
     81:                 }
     82:                 String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
     83:                 if (maxReconsumeTimes != null) {
     84:                     requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
     85:                     MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
     86:                 }
     87:             }
     88:             // 发送消息
     89:             SendResult sendResult = null;
     90:             switch (communicationMode) {
     91:                 case ASYNC:
     92:                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
     93:                         brokerAddr, // 1
     94:                         mq.getBrokerName(), // 2
     95:                         msg, // 3
     96:                         requestHeader, // 4
     97:                         timeout, // 5
     98:                         communicationMode, // 6
     99:                         sendCallback, // 7
    100:                         topicPublishInfo, // 8
    101:                         this.mQClientFactory, // 9
    102:                         this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), // 10
    103:                         context, //
    104:                         this);
    105:                     break;
    106:                 case ONEWAY:
    107:                 case SYNC:
    108:                     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
    109:                         brokerAddr,
    110:                         mq.getBrokerName(),
    111:                         msg,
    112:                         requestHeader,
    113:                         timeout,
    114:                         communicationMode,
    115:                         context,
    116:                         this);
    117:                     break;
    118:                 default:
    119:                     assert false;
    120:                     break;
    121:             }
    122:             // hook:发送消息后逻辑
    123:             if (this.hasSendMessageHook()) {
    124:                 context.setSendResult(sendResult);
    125:                 this.executeSendMessageHookAfter(context);
    126:             }
    127:             // 返回发送结果
    128:             return sendResult;
    129:         } catch (RemotingException e) {
    130:             if (this.hasSendMessageHook()) {
    131:                 context.setException(e);
    132:                 this.executeSendMessageHookAfter(context);
    133:             }
    134:             throw e;
    135:         } catch (MQBrokerException e) {
    136:             if (this.hasSendMessageHook()) {
    137:                 context.setException(e);
    138:                 this.executeSendMessageHookAfter(context);
    139:             }
    140:             throw e;
    141:         } catch (InterruptedException e) {
    142:             if (this.hasSendMessageHook()) {
    143:                 context.setException(e);
    144:                 this.executeSendMessageHookAfter(context);
    145:             }
    146:             throw e;
    147:         } finally {
    148:             msg.setBody(prevBody);
    149:         }
    150:     }
    151:     // broker为空抛出异常
    152:     throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    153: }
    

    说明 :发送消息核心方法。该方法真正发起网络请求,发送消息给 Broker。
    第 21 行 :生产消息编号,详细解析见《RocketMQ 源码分析 —— Message 基础》。
    第 64 至 121 行 :构建发送消息请求SendMessageRequestHeader。
    第 107 至 117 行 :执行 MQClientInstance#sendMessage(...) 发起网络请求。

    3、Broker 接收消息

      1: @Override
      2: public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
      3:     SendMessageContext mqtraceContext;
      4:     switch (request.getCode()) {
      5:         case RequestCode.CONSUMER_SEND_MSG_BACK:
      6:             return this.consumerSendMsgBack(ctx, request);
      7:         default:
      8:             // 解析请求
      9:             SendMessageRequestHeader requestHeader = parseRequestHeader(request);
     10:             if (requestHeader == null) {
     11:                 return null;
     12:             }
     13:             // 发送请求Context。在 hook 场景下使用
     14:             mqtraceContext = buildMsgContext(ctx, requestHeader);
     15:             // hook:处理发送消息前逻辑
     16:             this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
     17:             // 处理发送消息逻辑
     18:             final RemotingCommand response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
     19:             // hook:处理发送消息后逻辑
     20:             this.executeSendMessageHookAfter(response, mqtraceContext);
     21:             return response;
     22:     }
     23: }
     24: 
     25: private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //
     26:     final RemotingCommand request, //
     27:     final SendMessageContext sendMessageContext, //
     28:     final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
     29: 
     30:     // 初始化响应
     31:     final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
     32:     final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
     33:     response.setOpaque(request.getOpaque());
     34:     response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
     35:     response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
     36: 
     37:     if (log.isDebugEnabled()) {
     38:         log.debug("receive SendMessage request command, {}", request);
     39:     }
     40: 
     41:     // 如果未开始接收消息,抛出系统异常
     42:     @SuppressWarnings("SpellCheckingInspection")
     43:     final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
     44:     if (this.brokerController.getMessageStore().now() < startTimstamp) {
     45:         response.setCode(ResponseCode.SYSTEM_ERROR);
     46:         response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
     47:         return response;
     48:     }
     49: 
     50:     // 消息配置(Topic配置)校验
     51:     response.setCode(-1);
     52:     super.msgCheck(ctx, requestHeader, response);
     53:     if (response.getCode() != -1) {
     54:         return response;
     55:     }
     56: 
     57:     final byte[] body = request.getBody();
     58: 
     59:     // 如果队列小于0,从可用队列随机选择
     60:     int queueIdInt = requestHeader.getQueueId();
     61:     TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
     62:     if (queueIdInt < 0) {
     63:         queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
     64:     }
     65: 
     66:     //
     67:     int sysFlag = requestHeader.getSysFlag();
     68:     if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
     69:         sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
     70:     }
     71: 
     72:     // 对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成"%DLQ%" + 分组名,即加入 死信队列(Dead Letter Queue)
     73:     String newTopic = requestHeader.getTopic();
     74:     if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
     75:         // 获取订阅分组配置
     76:         String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
     77:         SubscriptionGroupConfig subscriptionGroupConfig =
     78:             this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
     79:         if (null == subscriptionGroupConfig) {
     80:             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
     81:             response.setRemark("subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
     82:             return response;
     83:         }
     84:         // 计算最大可消费次数
     85:         int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
     86:         if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
     87:             maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
     88:         }
     89:         int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
     90:         if (reconsumeTimes >= maxReconsumeTimes) { // 超过最大消费次数
     91:             newTopic = MixAll.getDLQTopic(groupName);
     92:             queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
     93:             topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //
     94:                 DLQ_NUMS_PER_GROUP, //
     95:                 PermName.PERM_WRITE, 0
     96:             );
     97:             if (null == topicConfig) {
     98:                 response.setCode(ResponseCode.SYSTEM_ERROR);
     99:                 response.setRemark("topic[" + newTopic + "] not exist");
    100:                 return response;
    101:             }
    102:         }
    103:     }
    104: 
    105:     // 创建MessageExtBrokerInner
    106:     MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
    107:     msgInner.setTopic(newTopic);
    108:     msgInner.setBody(body);
    109:     msgInner.setFlag(requestHeader.getFlag());
    110:     MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
    111:     msgInner.setPropertiesString(requestHeader.getProperties());
    112:     msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));
    113:     msgInner.setQueueId(queueIdInt);
    114:     msgInner.setSysFlag(sysFlag);
    115:     msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
    116:     msgInner.setBornHost(ctx.channel().remoteAddress());
    117:     msgInner.setStoreHost(this.getStoreHost());
    118:     msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
    119: 
    120:     // 校验是否不允许发送事务消息
    121:     if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
    122:         String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    123:         if (traFlag != null) {
    124:             response.setCode(ResponseCode.NO_PERMISSION);
    125:             response.setRemark(
    126:                 "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");
    127:             return response;
    128:         }
    129:     }
    130: 
    131:     // 添加消息
    132:     PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    133:     if (putMessageResult != null) {
    134:         boolean sendOK = false;
    135: 
    136:         switch (putMessageResult.getPutMessageStatus()) {
    137:             // Success
    138:             case PUT_OK:
    139:                 sendOK = true;
    140:                 response.setCode(ResponseCode.SUCCESS);
    141:                 break;
    142:             case FLUSH_DISK_TIMEOUT:
    143:                 response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);
    144:                 sendOK = true;
    145:                 break;
    146:             case FLUSH_SLAVE_TIMEOUT:
    147:                 response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);
    148:                 sendOK = true;
    149:                 break;
    150:             case SLAVE_NOT_AVAILABLE:
    151:                 response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
    152:                 sendOK = true;
    153:                 break;
    154: 
    155:             // Failed
    156:             case CREATE_MAPEDFILE_FAILED:
    157:                 response.setCode(ResponseCode.SYSTEM_ERROR);
    158:                 response.setRemark("create mapped file failed, server is busy or broken.");
    159:                 break;
    160:             case MESSAGE_ILLEGAL:
    161:             case PROPERTIES_SIZE_EXCEEDED:
    162:                 response.setCode(ResponseCode.MESSAGE_ILLEGAL);
    163:                 response.setRemark(
    164:                     "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");
    165:                 break;
    166:             case SERVICE_NOT_AVAILABLE:
    167:                 response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);
    168:                 response.setRemark(
    169:                     "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");
    170:                 break;
    171:             case OS_PAGECACHE_BUSY:
    172:                 response.setCode(ResponseCode.SYSTEM_ERROR);
    173:                 response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
    174:                 break;
    175:             case UNKNOWN_ERROR:
    176:                 response.setCode(ResponseCode.SYSTEM_ERROR);
    177:                 response.setRemark("UNKNOWN_ERROR");
    178:                 break;
    179:             default:
    180:                 response.setCode(ResponseCode.SYSTEM_ERROR);
    181:                 response.setRemark("UNKNOWN_ERROR DEFAULT");
    182:                 break;
    183:         }
    184: 
    185:         String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
    186:         if (sendOK) {
    187:             // 统计
    188:             this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
    189:             this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
    190:             this.brokerController.getBrokerStatsManager().incBrokerPutNums();
    191: 
    192:             // 响应
    193:             response.setRemark(null);
    194:             responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
    195:             responseHeader.setQueueId(queueIdInt);
    196:             responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
    197:             doResponse(ctx, request, response);
    198: 
    199:             // hook:设置发送成功到context
    200:             if (hasSendMessageHook()) {
    201:                 sendMessageContext.setMsgId(responseHeader.getMsgId());
    202:                 sendMessageContext.setQueueId(responseHeader.getQueueId());
    203:                 sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
    204: 
    205:                 int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
    206:                 int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
    207:                 int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
    208: 
    209:                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
    210:                 sendMessageContext.setCommercialSendTimes(incValue);
    211:                 sendMessageContext.setCommercialSendSize(wroteSize);
    212:                 sendMessageContext.setCommercialOwner(owner);
    213:             }
    214:             return null;
    215:         } else {
    216:             // hook:设置发送失败到context
    217:             if (hasSendMessageHook()) {
    218:                 int wroteSize = request.getBody().length;
    219:                 int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
    220: 
    221:                 sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
    222:                 sendMessageContext.setCommercialSendTimes(incValue);
    223:                 sendMessageContext.setCommercialSendSize(wroteSize);
    224:                 sendMessageContext.setCommercialOwner(owner);
    225:             }
    226:         }
    227:     } else {
    228:         response.setCode(ResponseCode.SYSTEM_ERROR);
    229:         response.setRemark("store putMessage return null");
    230:     }
    231: 
    232:     return response;
    233: }
    

    processRequest() 说明 :处理消息请求。

    sendMessage() 说明 :发送消息,并返回发送消息结果。

    第 51 至 55 行 :消息配置(Topic配置)校验,详细解析见:AbstractSendMessageProcessor#msgCheck()。
    第 60 至 64 行 :消息队列编号小于0时,Broker 可以设置随机选择一个消息队列。
    第 72 至 103 行 :对RETRY类型的消息处理。如果超过最大消费次数,则topic修改成”%DLQ%” + 分组名, 即加 死信队 (Dead Letter Queue),详细解析见:《RocketMQ 源码分析 —— Topic》。
    第 105 至 118 行 :创建MessageExtBrokerInner。
    第 132 :存储消息,详细解析见:DefaultMessageStore#putMessage()。
    第 133 至 183 行 :处理消息发送结果,设置响应结果和提示。
    第 186 至 214 行 :发送成功,响应。这里doResponse(ctx, request, response)进行响应,最后return null,原因是:响应给 Producer 可能发生异常,#doResponse(ctx, request, response)捕捉了该异常并输出日志。这样做的话,我们进行排查 Broker 接收消息成功后响应是否存在异常会方便很多。

    • DefaultMessageStore#putMessage
     public PutMessageResult putMessage(MessageExtBrokerInner msg) {
     2:     if (this.shutdown) {
     3:         log.warn("message store has shutdown, so putMessage is forbidden");
     4:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
     5:     }
     6: 
     7:     // 从节点不允许写入
     8:     if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
     9:         long value = this.printTimes.getAndIncrement();
    10:         if ((value % 50000) == 0) {
    11:             log.warn("message store is slave mode, so putMessage is forbidden ");
    12:         }
    13: 
    14:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    15:     }
    16: 
    17:     // store是否允许写入
    18:     if (!this.runningFlags.isWriteable()) {
    19:         long value = this.printTimes.getAndIncrement();
    20:         if ((value % 50000) == 0) {
    21:             log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
    22:         }
    23: 
    24:         return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
    25:     } else {
    26:         this.printTimes.set(0);
    27:     }
    28: 
    29:     // 消息过长
    30:     if (msg.getTopic().length() > Byte.MAX_VALUE) {
    31:         log.warn("putMessage message topic length too long " + msg.getTopic().length());
    32:         return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
    33:     }
    34: 
    35:     // 消息附加属性过长
    36:     if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
    37:         log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
    38:         return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
    39:     }
    40: 
    41:     if (this.isOSPageCacheBusy()) {
    42:         return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
    43:     }
    44: 
    45:     long beginTime = this.getSystemClock().now();
    46:     // 添加消息到commitLog
    47:     PutMessageResult result = this.commitLog.putMessage(msg);
    48: 
    49:     long eclipseTime = this.getSystemClock().now() - beginTime;
    50:     if (eclipseTime > 500) {
    51:         log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
    52:     }
    53:     this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
    54: 
    55:     if (null == result || !result.isOk()) {
    56:         this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
    57:     }
    58: 
    59:     return result;
    60: }
    

    说明:存储消息封装,最终存储需要 CommitLog 实现。
    第 7 至 27 行 :校验 Broker 是否可以写入。
    第 29 至 39 行 :消息格式与大小校验。
    第 47 行 :调用 CommitLong 进行存储,详细逻辑见:《RocketMQ 源码分析 —— Message 存储》

    转自:http://www.iocoder.cn/RocketMQ/message-send-and-receive/#mqfaultstrategy

  • 相关阅读:
    PAT (Advanced Level) 1114. Family Property (25)
    PAT (Advanced Level) 1113. Integer Set Partition (25)
    PAT (Advanced Level) 1112. Stucked Keyboard (20)
    PAT (Advanced Level) 1111. Online Map (30)
    PAT (Advanced Level) 1110. Complete Binary Tree (25)
    PAT (Advanced Level) 1109. Group Photo (25)
    PAT (Advanced Level) 1108. Finding Average (20)
    PAT (Advanced Level) 1107. Social Clusters (30)
    PAT (Advanced Level) 1106. Lowest Price in Supply Chain (25)
    PAT (Advanced Level) 1105. Spiral Matrix (25)
  • 原文地址:https://www.cnblogs.com/jiangjun-x/p/9136431.html
Copyright © 2011-2022 走看看