zoukankan      html  css  js  c++  java
  • rocketmq的message里面的路由信息是啥

    对于一个生产者来说,在进行sendmessage的时候,需要知道这个topic应该发给哪个broker。如果没有路由信息的话,需要取注册中心,通过GET_ROUTEINTO_BY_TOPIC去注册中心拿到消息。

    介绍下面具体流程之前,还是先介绍注册额中心里面路由管理者RouteInfoManager:

      private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
    

    broker-name:可以有多个broker-id,broker-id为0的就是master,否则是slave

    clusterAddrTable:多个broker-name可以放在同一个cluster下面

    topicqueueTable:一个topic下面可能有多个broker对应,QueueData里面存放每个broker-name的属性。所以一个topic下面可能有多个broker-name在贡献。

    public class QueueData implements Comparable<QueueData> {
        private String brokerName;
        private int readQueueNums;
        private int writeQueueNums;
        private int perm;
        private int topicSynFlag;
    
    
    

    brokerAddrTable:一个broker-name对应的信息,QueueData是真正描述一个broker-name的属性,BrokerData描述的是这个broker的上下级关系,上级cluster是谁,下级brokerAddrs描述这个broker-name下面所有broker-id对应的ip地址。

    public class BrokerData implements Comparable<BrokerData> {
        private String cluster;
        private String brokerName;
        private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

    RouteInfoManager主要存储的信息就是这么多。这些消息都是在一个broker启动的时候,都会到注册中心注册broker,在注册的时候把RouteInfoManager里面的信息进行填充。同时如果有变化的时候RouteInfoManager里面的数据也会跟着刷新,QueueData和BrokerData的equal方法都被覆盖了,这里面的属性任何一个有变化都会被认为有变化,然后被更新。

    对于备机来说,注册完成以后,还可以从注册中心拿到主机的haServer-addrhaServer-port地址,也就是主机broker的这个地址:

        public String getHAServerAddr() {
            return this.brokerConfig.getBrokerIP2() + ":" + this.messageStoreConfig.getHaListenPort();
        }

    拿到这个地址以后才能启动HaService里面的Haclient,上报自己的ack-offset,然后拿到同步数据。

    对于一个生产者来说,在进行sendmessage的时候,需要知道这个topic应该发给哪个broker。如果没有路由信息的话,需要取注册中心,通过GET_ROUTEINTO_BY_TOPIC去注册中心拿到消息。

    所谓的路由信息其实就是这个数据结构:

    public class TopicRouteData extends RemotingSerializable {
        private String orderTopicConf;
        private List<QueueData> queueDatas;
        private List<BrokerData> brokerDatas;
        private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

    具体拿的方法在pickupTopicRouteData里面通过topic可以从RouteInfoManager拿到

    拿到以后需要更新发布信息和订阅信息,其中发布信息就是针对生产者来说的,具体更新就是:(route,就是TopicRouteData)

       List<QueueData> qds = route.getQueueDatas();
                Collections.sort(qds);
                for (QueueData qd : qds) {
                    if (PermName.isWriteable(qd.getPerm())) {
                        BrokerData brokerData = null;
                        for (BrokerData bd : route.getBrokerDatas()) {
                            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                                brokerData = bd;
                                break;
                            }
                        }
    
                        if (null == brokerData) {
                            continue;
                        }
    
                        if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
                            continue;
                        }
    
                        for (int i = 0; i < qd.getWriteQueueNums(); i++) {
                            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                            info.getMessageQueueList().add(mq);
                        }
                    }
     public MessageQueue(String topic, String brokerName, int queueId) {
            this.topic = topic;
            this.brokerName = brokerName;
            this.queueId = queueId;
        }

    也就是一个topic可能对应多个broker-name,同时每个broker-name也有多个QueueId,这个Queueid个数是getWriteQueueNums决定。

    这里的info就是publishInfo,会被塞入this.topicPublishInfoTable.put(topic, info)中保存下来。

    重点来了,这个有啥用?

    后面生产者在发送消息的时候,需要有一个broker-addr,毕竟要知道一个broker地址,为了拿到这个broker-addr,其实是在这里拿到的:

    private SendResult sendKernelImpl(final Message msg,
                                          final MessageQueue mq,
                                          final CommunicationMode communicationMode,
                                          final SendCallback sendCallback,
                                          final TopicPublishInfo topicPublishInfo,
                                          final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            long beginStartTime = System.currentTimeMillis();
            String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());

    也就是broker-addr是MessageQueue给出来的,这个MessageQueue怎么取得?

    public MessageQueue selectOneMessageQueue() {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            return this.messageQueueList.get(pos);
        }

    也就是每次从前面提的info.getMessageQueueList().add(mq)这个里面每次取一个messageQueue(第一次随机,后面每次加1),这个MessageQueue就是针对同一个broker-name多个QueueId。

    在实际发送的时候,拿到一个MessageQueue就会直接发出去。对于一个topic有多个broker-name的情况,这种做法只是给一个broker-name的master发了信息,其他broker-name没有发送。这个就是topic层面的分片,不同broker分摊相同topic下的不同内容,而同一个broker通过主备完成信息冗余。而QueueId就是broker层面的再次分片。对于多个消费者的情况,消费相同broker-name的时候,可以根据queue-id并发消费。

    
    
  • 相关阅读:
    《TCP/IP 详解 卷1:协议》第 10 章:用户数据报协议
    《TCP/IP 详解 卷1:协议》第 9 章:广播和本地组播(IGMP 和 MLD)
    在新的电脑上部署 Hexo,保留原有博客的方法
    当你不知道变量类型的完整定义时可以采取的操作
    python learning GUI
    python learning Network Programming.py
    python learning Process and Thread.py
    【2017级面向对象程序设计】第2次成绩排行
    python learning IO.py
    python learning Exception & Debug.py
  • 原文地址:https://www.cnblogs.com/notlate/p/10327242.html
Copyright © 2011-2022 走看看