zoukankan      html  css  js  c++  java
  • rocketmq producer 高可用原理

    rocketmq 分为 namesrv、broker、producer、consumer

    broker分为MASTER/SLAVE,对producer而言,会发message到broker上,写只会发送到brokerId=0 主broker上

    对consumer而言,是读,既可以是主broker,也可以是备broker

    下面来分析一下producer发送message源码过程,以下以顺序消费为例

    DefaultMQProducerImpl.sendSelectImpl

    其中tryToFindTopicPublishInfo负责查找当前可用的broker

     1     private SendResult sendSelectImpl(
     2         Message msg,
     3         MessageQueueSelector selector,
     4         Object arg,
     5         final CommunicationMode communicationMode,
     6         final SendCallback sendCallback, final long timeout
     7     ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
     8         long beginStartTime = System.currentTimeMillis();
     9         this.makeSureStateOK();
    10         Validators.checkMessage(msg, this.defaultMQProducer);
    11 
    12         TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    13         if (topicPublishInfo != null && topicPublishInfo.ok()) {
    14             MessageQueue mq = null;
    15             try {
    16                 mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
    17             } catch (Throwable e) {
    18                 throw new MQClientException("select message queue throwed exception.", e);
    19             }
    20 
    21             long costTime = System.currentTimeMillis() - beginStartTime;
    22             if (timeout < costTime) {
    23                 throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
    24             }
    25             if (mq != null) {
    26                 return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
    27             } else {
    28                 throw new MQClientException("select message queue return null.", null);
    29             }
    30         }
    31 
    32         throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
    33     }

    在tryToFindTopicPublishInfo中,调用

    1> this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 当第一次调用时,NameServer中没有该topic、通过NameServer查topic时是查不到的
    2> this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); 调用该方法
     
     1     private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
           //第一次进入时,topicPublishInfoTable中没有该topic的信息
    2 TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic); 3 if (null == topicPublishInfo || !topicPublishInfo.ok()) { 4 this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo()); 5 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic); 6 topicPublishInfo = this.topicPublishInfoTable.get(topic); 7 } 8 9 if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) { 10 return topicPublishInfo; 11 } else { 12 this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer); 13 topicPublishInfo = this.topicPublishInfoTable.get(topic); 14 return topicPublishInfo; 15 } 16 }

    MQClientInstance.updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer)

    这个方法中 isDefault= true, defaultMQProducer!=null时

     1     public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
     2         DefaultMQProducer defaultMQProducer) {
     3         try {
     4             if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
     5                 try {
     6                     TopicRouteData topicRouteData;
     7                     if (isDefault && defaultMQProducer != null) {//当isDefault=true, defaultMQProducer时会走这个分支
     8                         topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
     9                             1000 * 3);
    10                         if (topicRouteData != null) {
    11                             for (QueueData data : topicRouteData.getQueueDatas()) {
    12                                 int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
    13                                 data.setReadQueueNums(queueNums);
    14                                 data.setWriteQueueNums(queueNums);
    15                             }
    16                         }
    17                     } else {
    18                         topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
    19                     }
    20                     if (topicRouteData != null) {
    21                         TopicRouteData old = this.topicRouteTable.get(topic);
    22                         boolean changed = topicRouteDataIsChange(old, topicRouteData);
    23                         if (!changed) {
    24                             changed = this.isNeedUpdateTopicRouteInfo(topic);
    25                         } else {
    26                             log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    27                         }
    28 
    29                         if (changed) {
    30                             TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    31 
    32                             for (BrokerData bd : topicRouteData.getBrokerDatas()) {
    33                                 this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    34                             }
    35 
    36                             // Update Pub info
    37                             {    //在该方法中,过滤到master broker
    38                                 TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
    39                                 publishInfo.setHaveTopicRouterInfo(true);
    40                                 Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
    41                                 while (it.hasNext()) {
    42                                     Entry<String, MQProducerInner> entry = it.next();
    43                                     MQProducerInner impl = entry.getValue();
    44                                     if (impl != null) {//update topicPublishInfoTable
    45                                         impl.updateTopicPublishInfo(topic, publishInfo);
    46                                     }
    47                                 }
    48                             }
    49 
    50                             // Update sub info
    51                             {    //拿到readealbe queue
    52                                 Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
    53                                 Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    54                                 while (it.hasNext()) {
    55                                     Entry<String, MQConsumerInner> entry = it.next();
    56                                     MQConsumerInner impl = entry.getValue();
    57                                     if (impl != null) {
    58                                         impl.updateTopicSubscribeInfo(topic, subscribeInfo);
    59                                     }
    60                                 }
    61                             }
    62                             log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    63                             this.topicRouteTable.put(topic, cloneTopicRouteData);
    64                             return true;
    65                         }
    66                     } else {
    67                         log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}", topic);
    68                     }
    69                 } catch (Exception e) {
    70                     if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
    71                         log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    72                     }
    73                 } finally {
    74                     this.lockNamesrv.unlock();
    75                 }
    76             } else {
    77                 log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms", LOCK_TIMEOUT_MILLIS);
    78             }
    79         } catch (InterruptedException e) {
    80             log.warn("updateTopicRouteInfoFromNameServer Exception", e);
    81         }
    82 
    83         return false;
    84     }

    DefaultMQProducerImpl.updateTopicPublishInfo(String, TopicPublishInfo)

    该方法中会 更新 DefaultMQProducerImpl.topicPublishInfoTable

    1 @Override
    2     public void updateTopicPublishInfo(final String topic, final TopicPublishInfo info) {
    3         if (info != null && topic != null) {
    4             TopicPublishInfo prev = this.topicPublishInfoTable.put(topic, info);
    5             if (prev != null) {
    6                 log.info("updateTopicPublishInfo prev is not null, " + prev.toString());
    7             }
    8         }
    9     }

    另外,如果某一个broker异常,还有定时任务检查

    MQClientInstance.startScheduledTask()

     1     private void startScheduledTask() {
     2         if (null == this.clientConfig.getNamesrvAddr()) {
     3             this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
     4 
     5                 @Override
     6                 public void run() {
     7                     try {
     8                         MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
     9                     } catch (Exception e) {
    10                         log.error("ScheduledTask fetchNameServerAddr exception", e);
    11                     }
    12                 }
    13             }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
    14         }
    15 
    16         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    17 
    18             @Override
    19             public void run() {
    20                 try {
    21                     MQClientInstance.this.updateTopicRouteInfoFromNameServer();
    22                 } catch (Exception e) {
    23                     log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
    24                 }
    25             }
    26         }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
    27 
    28         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    29 
    30             @Override
    31             public void run() {
    32                 try {
    33                     MQClientInstance.this.cleanOfflineBroker();
    34                     MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
    35                 } catch (Exception e) {
    36                     log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
    37                 }
    38             }
    39         }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
    40 
    41         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    42 
    43             @Override
    44             public void run() {
    45                 try {
    46                     MQClientInstance.this.persistAllConsumerOffset();
    47                 } catch (Exception e) {
    48                     log.error("ScheduledTask persistAllConsumerOffset exception", e);
    49                 }
    50             }
    51         }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    52 
    53         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    54 
    55             @Override
    56             public void run() {
    57                 try {
    58                     MQClientInstance.this.adjustThreadPool();
    59                 } catch (Exception e) {
    60                     log.error("ScheduledTask adjustThreadPool exception", e);
    61                 }
    62             }
    63         }, 1, 1, TimeUnit.MINUTES);
    64     }

    调用链

      1. MQClientInstance.updateTopicRouteInfoFromNameServer()

            2.MQClientInstance.updateTopicRouteInfoFromNameServer(String)

            3. MQClientInstance.updateTopicRouteInfoFromNameServer(String, boolean, DefaultMQProducer)

    这样就可以最大限度的保证获取到最新的broker

    即使这样,还是有可能发送失败,

      1、因为当定时更新任务30秒内执行完,这时broker也有可能出现问题。会发送失败。

      2、当失败重试时,MessageQueueSelector.select(List<MessageQueue>, Message, Object)中List<MessageQueue>

    list个数会发生变化,对顺序发送来说,会发送到不同的queue中

  • 相关阅读:
    PHP商品秒杀倒计时
    【SAS NOTES】两个数据集merge
    【SAS NOTE】在proc means中根据某变量的范围进行统计+proc format
    【SAS NOTES】if then和if的区别
    【SAS NOTES】kindex判断字符串中是否含某子字符串& 用where在data步中选择部分数据
    【SAS NOTES】宏
    【SAS NOTES】两个数据集直接合并不考虑关键字匹配
    【SAS NOTES】在一个data中生成多个数据集
    【SAS NOTES】update
    【SAS NOTES】系统自带变量+%if
  • 原文地址:https://www.cnblogs.com/toUpdating/p/9965003.html
Copyright © 2011-2022 走看看