zoukankan      html  css  js  c++  java
  • RocketMQ源码分析之Producer发送消息(五)

    本节主要讲Producer同步发送消息的流程,异步的暂时不打算讲。因为还要讲Broker接受消息,存储消息,以及RemotingCommand。

    老规矩电路图送上,为什么我喜欢电路图,因为UML比时序图或其他任何图都更能反映出类之间的调用。

    发送消息时序图:下图带有浅绿色背景区域的部分为循环调用。retry send,你们懂得。

    从时序图中可以看到整个发送消息都是围绕DefaultMQProducerImpl类展开的。

    1、DefaultMQProducerImpl.sendDefaultImpl()方法,这里只贴出来主要代码

     1 private SendResult sendDefaultImpl(Message msg, final CommunicationMode communicationMode,
     2         final SendCallback sendCallback, final long timeout ){
     3     this.makeSureStateOK();//判断服务是否是Running状态,不是抛异常出来
     4     Validators.checkMessage(msg, this.defaultMQProducer);//检查消息正确性,包括topic,消息大小等
     5 
     6     TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//获取topic队列配置
     7     for (; times < timesTotal; times++) {
     8         MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择消息队列,后面详细说
     9         sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
    10     }
    11 }

     2、DefaultMQProducerImpl.makeSureStateOK()方法

     这个方法就是检查服务状态是不是running,很多地方都会调用这个方法。

    1 private void makeSureStateOK() throws MQClientException {
    2     if (this.serviceState != ServiceState.RUNNING) {
    3         throw new MQClientException("The producer service state not OK, "
    4                 + this.serviceState
    5                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null);
    6     }
    7 }

    3、DefaultMQProducerImpl..tryToFindTopicPublishInfo()方法

    获取Topic的队列配置
    该方法主要是获取主题的队列信息,当从本地内存中获取不到时,从NameServer服务器获取主题配置信息。

     1 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
     2     TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
     3     if (null == topicPublishInfo || !topicPublishInfo.ok()) {
     4         this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
       //当从本地内存中获取不到时,从NameServer服务器获取主题的队列配置信息
    5 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 6 topicPublishInfo = this.topicPublishInfoTable.get(topic); 7 } 8 9 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { 10 return topicPublishInfo; 11 } else { 12 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); 13 topicPublishInfo = this.topicPublishInfoTable.get(topic); 14 return topicPublishInfo; 15 } 16 }

     4、MQFaultStrategy.selectOneMessageQueue()方法

    RcoketMQ中一个topic对应多个队列,该方法从topic的队列列表中选中一个Queue返回,选择队列的算法为轮询。

     1 public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
     2     if (this.sendLatencyFaultEnable) {
     3         try {
     4             int index = tpInfo.getSendWhichQueue().getAndIncrement();//记录选择累计次数
     5             for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
     6                 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();//总次数对队列大小取余
     7                 if (pos < 0)
     8                     pos = 0;
     9                 MessageQueue mq = tpInfo.getMessageQueueList().get(pos);//根据余数取出一个队列返回,这样就保证了发送端几乎是均衡把消息发送到各个队列里面。
    10                 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
    11                     if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
    12                         return mq;
    13                 }
    14             }
    15 
    16             final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    17             int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    18             if (writeQueueNums > 0) {
    19                 final MessageQueue mq = tpInfo.selectOneMessageQueue();
    20                 if (notBestBroker != null) {
    21                     mq.setBrokerName(notBestBroker);
    22                     mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
    23                 }
    24                 return mq;
    25             } else {
    26                 latencyFaultTolerance.remove(notBestBroker);
    27             }
    28         } catch (Exception e) {
    29             log.error("Error occurred when selecting message queue", e);
    30         }
    31 
    32         return tpInfo.selectOneMessageQueue();
    33     }
    34 
    35     return tpInfo.selectOneMessageQueue(lastBrokerName);
    36 }

     getSendWhichQueue()说一下,看下代码:

    这里用到了volatile关键字和ThreadLocal类,volatile保证了sendWhichQueue一旦变化,Group下其他Producer马上就能看到最新数据。而之所以用ThreadLocal<Integer>不用AtomicInteger类。因为这里只需要定义一个线程内的局部变量,只供本线程使用。而AtomicInteger主要是为了解决兵法问题。

    1 public class TopicPublishInfo {
    2     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    3 }
    4 
    5 public class ThreadLocalIndex {
    6     private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    7 }

    5、DefaultMQProducerImpl.sendKernelImpl()方法,只贴出了主要代码,具体代码大家还是去看源码吧。

    sendKernelImpl方法主要作用就是封包,发送消息给Broker。
     1 private SendResult sendKernelImpl(final Message msg,
     2                                   final MessageQueue mq,
     3                                   final CommunicationMode communicationMode,
     4                                   final SendCallback sendCallback,
     5                                   final TopicPublishInfo topicPublishInfo,
     6                                   final long timeout) {
     7     //获取broker地址
     8     String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
     9     //给消息设置唯一ID
    10     MessageClientIDSetter.setUniqID(msg);
    11     //当消息体大于4M时,进行压缩,注意只压缩消息体
    12     UtilAll.compress(body, zipCompressLevel);
    13     //使用是否VIPChannel通道发送数据,vip channel的端口号为普通端口号-2。
    14     brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
    15     //发送消息校验
    16     CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
    this.executeCheckForbiddenHook(checkForbiddenContext);
        
    17     //发送消息前逻辑
    18     SendMessageContext context = new SendMessageContext();
    this.executeSendMessageHookBefore(context);
    19     //构建发送消息请求
    20     SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
    21     //底层调用netty发送消息给broker。
    22     sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage();
    23     return sendResult;
    24 }
     
  • 相关阅读:
    打开服务器的文档
    笔记
    centos6.5 编译openssl-1.1.1k
    搭建自己的低代码平台
    防火墙ACL配置自动化
    防火墙ACL配置自动化
    【树莓派】读取新大陆(newland)USB条码扫描器数据
    解决eclipse或sts闪退的办法(转)
    浅谈数据库迁移类项目功能测试的基本思路
    ATM取款机优化需求的用例设计
  • 原文地址:https://www.cnblogs.com/shileibrave/p/9890369.html
Copyright © 2011-2022 走看看