zoukankan      html  css  js  c++  java
  • 消息队列(二)--- RocketMQ-NameServer阅读

    概述

      所有broker在启动的时候都会向NameServer进行注册,对它进行发送心跳包。

    源码阅读

    我们先从 NamesrvStartup这个类分析
    
        public static void main(String[] args) {
            main0(args);
        }
    
        public static NamesrvController main0(String[] args) {
    
            try {
            	//创建NamesrvController
                NamesrvController controller = createNamesrvController(args);
                //启动
                start(controller);
                String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer();
                log.info(tip);
                System.out.printf("%s%n", tip);
                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }
    
            return null;
        }
    
    
    
    看一下 NamesrvController 这个类有什么东西
    
    public class NamesrvController {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    
        private final NamesrvConfig namesrvConfig;
    
        private final NettyServerConfig nettyServerConfig;
    
        
        //时间线程池
        private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
            "NSScheduledThread"));
        private final KVConfigManager kvConfigManager;
        //路由相关
        private final RouteInfoManager routeInfoManager;
        //远程管理相关 
        private RemotingServer remotingServer;
    
        private BrokerHousekeepingService brokerHousekeepingService;
    
        //线程池
        private ExecutorService remotingExecutor;
    
        private Configuration configuration;
        private FileWatchService fileWatchService;
    
    
        ...
    }
    
    
    主要是 配置 + manager + 线程池
    

    路由注册

    我们先来看一下 RouteInfoManager . 
    
    public class RouteInfoManager {
        private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
        //过期时间
        private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
        
        //读写锁(适合读多写少), 读的时候可以获取写锁吗?
        //一个线程获取读锁的时候,另外一个线程是否可以获取到写锁
        //答 : 可以
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
    
        //主题对应信息
        private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    
        //Broker 基础信息,包括所有的 broker
        private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    
        //Broker集群信息,存储集群中所有broker名称(注意是集群中!!)
        private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    
        //存活的 Broker
        //收到心跳包时会更新信息会激活,key 是 ip 地址
        private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    
        //用于过滤
        private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
        ...
        ...
    
    }
    
    

      上面的字段在可以通过下面两张图来理解 路由信息1.PNG 路由信息2.PNG

    路由注册的逻辑在 BrokerController 的 start方法内,
    
            this.registerBrokerAll(true, false);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        //向 NameServer 注册所有的 broker,发送心跳包给 NameServer
                        BrokerController.this.registerBrokerAll(true, false);
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
    
    

    registerBrokerAll 方法

        public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
            TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
    
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
                for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
                    TopicConfig tmp =
                        new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
                    topicConfigTable.put(topicConfig.getTopicName(), tmp);
                }
                topicConfigWrapper.setTopicConfigTable(topicConfigTable);
            }
    
            //注册方法
            RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills());
    
            if (registerBrokerResult != null) {
                if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
                    this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
                }
    
                this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
    
                if (checkOrderConfig) {
                    this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
                }
            }
        }
    

    brokerOuterAPI 的 registerBrokerAll 方法

        /**
         * broker 向 NS 的注册方法
         */
        public RegisterBrokerResult registerBrokerAll(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills) {
            RegisterBrokerResult registerBrokerResult = null;
    
            List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
            if (nameServerAddressList != null) {
                for (String namesrvAddr : nameServerAddressList) {
                    try {
                        RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                            haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
                        if (result != null) {
                            registerBrokerResult = result;
                        }
    
                        log.info("register broker to name server {} OK", namesrvAddr);
                    } catch (Exception e) {
                        log.warn("registerBroker Exception, {}", namesrvAddr, e);
                    }
                }
            }
    
            return registerBrokerResult;
        }
        
    
        private RegisterBrokerResult registerBroker(
            final String namesrvAddr,
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final String haServerAddr,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final List<String> filterServerList,
            final boolean oneway,
            final int timeoutMills
        ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
            InterruptedException {
            //No.1 封装请求头和请求body
            RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            requestHeader.setHaServerAddr(haServerAddr);
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
    
            RegisterBrokerBody requestBody = new RegisterBrokerBody();
            requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
            requestBody.setFilterServerList(filterServerList);
            request.setBody(requestBody.encode());
    
            //No.2 发送
            if (oneway) {
                try {
                    //发送,remotingClient
                    this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
                } catch (RemotingTooMuchRequestException e) {
                    // Ignore
                }
                return null;
            }
    
            RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    RegisterBrokerResponseHeader responseHeader =
                        (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
                    RegisterBrokerResult result = new RegisterBrokerResult();
                    result.setMasterAddr(responseHeader.getMasterAddr());
                    result.setHaServerAddr(responseHeader.getHaServerAddr());
                    if (response.getBody() != null) {
                        result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
                    }
                    return result;
                }
                default:
                    break;
            }
    
            throw new MQBrokerException(response.getCode(), response.getRemark());
        }
    
    

      其中 RemotingClient 是个接口,它的结构图如下 remoteClient类结构.PNG   具体的子类是 NettyRemotingClient ,这个类留着后续分析。   NS 处理心跳包的逻辑在 RouteInfoManager 的 registerBroker 方法,这里不再分析源码实现(对字段保存的对应信息进行增删改)。

    路由发现

      发现路由变化不会主动push到 producer ,而是 producer 主动到 NS 中去获取。RocketMQ路由实体 TopicRouteData

    public class TopicRouteData extends RemotingSerializable {
        //顺序消息配置信息,来自于 kvConfig
        private String orderTopicConf;
        //多个broker 订阅了某个 topic ,所以一个 topic可能对应着多个 broker
        private List<QueueData> queueDatas;
        //多个broker 的信息
        private List<BrokerData> brokerDatas;
        //过滤
        private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    
        ...
        ...
    }
    
    

      NameServer 路由发现实 现类 : DefaultRequestProcessor#getRoutelnfoByTopic

        public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
            RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final GetRouteInfoRequestHeader requestHeader =
                (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
    
            TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
    
            if (topicRouteData != null) {
                if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {
                    String orderTopicConf =
                        this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                            requestHeader.getTopic());
                    topicRouteData.setOrderTopicConf(orderTopicConf);
                }
    
                byte[] content = topicRouteData.encode();
                response.setBody(content);
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
                return response;
            }
    
            response.setCode(ResponseCode.TOPIC_NOT_EXIST);
            response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
                + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
            return response;
        }
    
        //RouteInfoManager#pickupTopicRouteData 方法 
    
        public TopicRouteData pickupTopicRouteData(final String topic) {
            TopicRouteData topicRouteData = new TopicRouteData();
            boolean foundQueueData = false;
            boolean foundBrokerData = false;
            Set<String> brokerNameSet = new HashSet<String>();
            List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
            topicRouteData.setBrokerDatas(brokerDataList);
    
            HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>();
            topicRouteData.setFilterServerTable(filterServerMap);
    
            try {
                try {
                    this.lock.readLock().lockInterruptibly();
                    List<QueueData> queueDataList = this.topicQueueTable.get(topic);
                    if (queueDataList != null) {
                        topicRouteData.setQueueDatas(queueDataList);
                        foundQueueData = true;
    
                        Iterator<QueueData> it = queueDataList.iterator();
                        while (it.hasNext()) {
                            QueueData qd = it.next();
                            brokerNameSet.add(qd.getBrokerName());
                        }
    
                        for (String brokerName : brokerNameSet) {
                            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                            if (null != brokerData) {
                                BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData
                                    .getBrokerAddrs().clone());
                                brokerDataList.add(brokerDataClone);
                                foundBrokerData = true;
                                for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                    List<String> filterServerList = this.filterServerTable.get(brokerAddr);
                                    filterServerMap.put(brokerAddr, filterServerList);
                                }
                            }
                        }
                    }
                } finally {
                    this.lock.readLock().unlock();
                }
            } catch (Exception e) {
                log.error("pickupTopicRouteData Exception", e);
            }
    
            if (log.isDebugEnabled()) {
                log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
            }
    
            if (foundBrokerData && foundQueueData) {
                return topicRouteData;
            }
    
            return null;
        } 
    
    

    总结

      文章总结 NS 相关的路由管理逻辑。

  • 相关阅读:
    C语言探索之旅 | 第二部分第十一课:练习题和习作
    C语言探索之旅 | 第二部分第十课: 实战"悬挂小人"游戏答案
    C语言探索之旅 | 第二部分第九课: 实战"悬挂小人"游戏
    C语言探索之旅 | 第二部分第八课:动态分配
    C语言探索之旅 | 第二部分第七课:文件读写
    最近迫切应学的编程语言
    C语言探索之旅 | 第二部分第五课:预处理
    封装axios方法之一
    react前置路由守卫
    React Router 4.0 实现路由守卫
  • 原文地址:https://www.cnblogs.com/Benjious/p/11635667.html
Copyright © 2011-2022 走看看