概述
所有broker在启动的时候都会向NameServer进行注册,对它进行发送心跳包。
源码阅读
我们先从 NamesrvStartup这个类分析
public static void main(String[] args) { main0(args); } public static NamesrvController main0(String[] args) { try { //创建NamesrvController NamesrvController controller = createNamesrvController(args); //启动 start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null; }
看一下 NamesrvController 这个类有什么东西
public class NamesrvController { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final NamesrvConfig namesrvConfig; private final NettyServerConfig nettyServerConfig; //时间线程池 private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( "NSScheduledThread")); private final KVConfigManager kvConfigManager; //路由相关 private final RouteInfoManager routeInfoManager; //远程管理相关 private RemotingServer remotingServer; private BrokerHousekeepingService brokerHousekeepingService; //线程池 private ExecutorService remotingExecutor; private Configuration configuration; private FileWatchService fileWatchService; ... }
主要是 配置 + manager + 线程池
路由注册
我们先来看一下 RouteInfoManager .
public class RouteInfoManager { private static final Logger log = LoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); //过期时间 private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; //读写锁(适合读多写少), 读的时候可以获取写锁吗? //一个线程获取读锁的时候,另外一个线程是否可以获取到写锁 //答 : 可以 private final ReadWriteLock lock = new ReentrantReadWriteLock(); //主题对应信息 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //Broker 基础信息,包括所有的 broker private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //Broker集群信息,存储集群中所有broker名称(注意是集群中!!) private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //存活的 Broker //收到心跳包时会更新信息会激活,key 是 ip 地址 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //用于过滤 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; ... ... }
路由注册的逻辑在 BrokerController 的 start方法内,
this.registerBrokerAll(true, false); this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { //向 NameServer 注册所有的 broker,发送心跳包给 NameServer BrokerController.this.registerBrokerAll(true, false); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);
registerBrokerAll 方法
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } //注册方法 RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll( this.brokerConfig.getBrokerClusterName(), this.getBrokerAddr(), this.brokerConfig.getBrokerName(), this.brokerConfig.getBrokerId(), this.getHAServerAddr(), topicConfigWrapper, this.filterServerManager.buildNewFilterServerList(), oneway, this.brokerConfig.getRegisterBrokerTimeoutMills()); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } }
brokerOuterAPI 的 registerBrokerAll 方法
/** * broker 向 NS 的注册方法 */ public RegisterBrokerResult registerBrokerAll( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills) { RegisterBrokerResult registerBrokerResult = null; List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null) { for (String namesrvAddr : nameServerAddressList) { try { RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); if (result != null) { registerBrokerResult = result; } log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } } } return registerBrokerResult; } private RegisterBrokerResult registerBroker( final String namesrvAddr, final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final boolean oneway, final int timeoutMills ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { //No.1 封装请求头和请求body RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); request.setBody(requestBody.encode()); //No.2 发送 if (oneway) { try { //发送,remotingClient this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { // Ignore } return null; } RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }
其中 RemotingClient 是个接口,它的结构图如下 具体的子类是 NettyRemotingClient ,这个类留着后续分析。 NS 处理心跳包的逻辑在 RouteInfoManager 的 registerBroker 方法,这里不再分析源码实现(对字段保存的对应信息进行增删改)。
路由发现
发现路由变化不会主动push到 producer ,而是 producer 主动到 NS 中去获取。RocketMQ路由实体 TopicRouteData
public class TopicRouteData extends RemotingSerializable { //顺序消息配置信息,来自于 kvConfig private String orderTopicConf; //多个broker 订阅了某个 topic ,所以一个 topic可能对应着多个 broker private List<QueueData> queueDatas; //多个broker 的信息 private List<BrokerData> brokerDatas; //过滤 private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; ... ... }
NameServer 路由发现实 现类 : DefaultRequestProcessor#getRoutelnfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class); TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic()); if (topicRouteData != null) { if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) { String orderTopicConf = this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG, requestHeader.getTopic()); topicRouteData.setOrderTopicConf(orderTopicConf); } byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; } response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response; } //RouteInfoManager#pickupTopicRouteData 方法 public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; Set<String> brokerNameSet = new HashSet<String>(); List<BrokerData> brokerDataList = new LinkedList<BrokerData>(); topicRouteData.setBrokerDatas(brokerDataList); HashMap<String, List<String>> filterServerMap = new HashMap<String, List<String>>(); topicRouteData.setFilterServerTable(filterServerMap); try { try { this.lock.readLock().lockInterruptibly(); List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator<QueueData> it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } if (log.isDebugEnabled()) { log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); } if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; }
总结
文章总结 NS 相关的路由管理逻辑。