zoukankan      html  css  js  c++  java
  • RocketMQ(4.8.0)——RocketMQ的路由原理

    RocketMQ的路由原理

      生产者发送消息、消费者消费时都需要从 Namesrv 拉取 Topic 路由信息,那么这些路由信息是如何注册到 Namesrv 的呢?如果 Broker 异常宕机,路由信息又是如何更新的呢?

    一、路由注册

      Namesrv 获取的 Topic 路由信息来自 Broker 定时心跳,心跳时 Broker 将 Topic 信息和其他信息发送到 Namesrv。Namesrv 通过 RequestCode.REGISTER_BROKER 接口将心跳中的 Broker 信息和 Topic 信息存储在 Namesrv 中。

      Namesrv 接收请求后,以 3.0.11 版本作为分水岭,按照版本分别做不同的处理,代码路径:D: ocketmq-master amesrvsrcmainjavaorgapache ocketmq amesrvprocessorDefaultRequestProcessor.java,代码如下:

     1    @Override
     2     public RemotingCommand processRequest(ChannelHandlerContext ctx,
     3         RemotingCommand request) throws RemotingCommandException {
     4 
     5         if (ctx != null) {
     6             log.debug("receive request, {} {} {}",
     7                 request.getCode(),
     8                 RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
     9                 request);
    10         }
    11 
    12 
    13         switch (request.getCode()) {
    14             case RequestCode.PUT_KV_CONFIG:
    15                 return this.putKVConfig(ctx, request);
    16             case RequestCode.GET_KV_CONFIG:
    17                 return this.getKVConfig(ctx, request);
    18             case RequestCode.DELETE_KV_CONFIG:
    19                 return this.deleteKVConfig(ctx, request);
    20             case RequestCode.QUERY_DATA_VERSION:
    21                 return queryBrokerTopicConfig(ctx, request);
    22             case RequestCode.REGISTER_BROKER:
    23                 Version brokerVersion = MQVersion.value2Version(request.getVersion());
    24                 if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
    25                     return this.registerBrokerWithFilterServer(ctx, request);
    26                 } else {
    27                     return this.registerBroker(ctx, request);
    28                 }
    29             case RequestCode.UNREGISTER_BROKER:
    30                 return this.unregisterBroker(ctx, request);
    31             case RequestCode.GET_ROUTEINFO_BY_TOPIC:
    32                 return this.getRouteInfoByTopic(ctx, request);
    33             case RequestCode.GET_BROKER_CLUSTER_INFO:
    34                 return this.getBrokerClusterInfo(ctx, request);
    35             case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
    36                 return this.wipeWritePermOfBroker(ctx, request);
    37             case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
    38                 return getAllTopicListFromNameserver(ctx, request);
    39             case RequestCode.DELETE_TOPIC_IN_NAMESRV:
    40                 return deleteTopicInNamesrv(ctx, request);
    41             case RequestCode.GET_KVLIST_BY_NAMESPACE:
    42                 return this.getKVListByNamespace(ctx, request);
    43             case RequestCode.GET_TOPICS_BY_CLUSTER:
    44                 return this.getTopicsByCluster(ctx, request);
    45             case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
    46                 return this.getSystemTopicListFromNs(ctx, request);
    47             case RequestCode.GET_UNIT_TOPIC_LIST:
    48                 return this.getUnitTopicList(ctx, request);
    49             case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
    50                 return this.getHasUnitSubTopicList(ctx, request);
    51             case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
    52                 return this.getHasUnitSubUnUnitTopicList(ctx, request);
    53             case RequestCode.UPDATE_NAMESRV_CONFIG:
    54                 return this.updateConfig(ctx, request);
    55             case RequestCode.GET_NAMESRV_CONFIG:
    56                 return this.getConfig(ctx, request);
    57             default:
    58                 break;
    59         }
    60         return null;
    61     }

      因为当前 RocketMQ 的版本为 4.8.0,所以会 return this.registerBrokerWithFilterServer(ctx, request),我们主要看这个方法中的 this.namesrvController.getRouteInfoManager().registerBroker() 方法,该方法的主要功能是将 request 解码为路由对象,保存在 Namesrv 内存中。

      在路由信息注册完成后,Broker 会每隔 30s 发送一个注册请求给集群中全部的 Namesrv,俗称心跳信,会把最新的 Topic 路由信息注册到 Namesrv 中。

    二、路由剔除

      如果 Broker 长久没有心跳或者宕机,那么 Namesrv 会将这些不提供服务的 Broker 剔除。同时生产者和消费者在与 Namesrv 心跳后也会感知到被踢掉的 Broker,如此 Broker扩容或者宕机对生产、消费无感知的情况就处理完了。

      Namesrv 有两种剔除 Broker的方式:

    第一种:Broker 主动关闭时,会调用 Namesrv 的取消注册 Broker 的接口 RequestCode = RequestCode.UNREGISTER_BROKER,将自身从集群中删除。这个过程与路由注册的过程相反。

    第二种:Namesrv 通过定时扫描已经下线的 Broker,将其主动剔除,实现过程代码路径:D: ocketmq-master amesrvsrcmainjavaorgapache ocketmq amesrvNamesrvController.java 中initialize()方法,代码如下:

     1 public boolean initialize() {
     2 
     3         this.kvConfigManager.load();
     4 
     5         this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);
     6 
     7         this.remotingExecutor =
     8             Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
     9 
    10         this.registerProcessor();
    11 
    12         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    13 
    14             @Override
    15             public void run() {
    16                 NamesrvController.this.routeInfoManager.scanNotActiveBroker();
    17             }
    18         }, 5, 10, TimeUnit.SECONDS);
    19 
    20         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    21 
    22             @Override
    23             public void run() {
    24                 NamesrvController.this.kvConfigManager.printAllPeriodically();
    25             }
    26         }, 1, 10, TimeUnit.MINUTES);
    27 
    28         if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
    29             // Register a listener to reload SslContext
    30             try {
    31                 fileWatchService = new FileWatchService(
    32                     new String[] {
    33                         TlsSystemConfig.tlsServerCertPath,
    34                         TlsSystemConfig.tlsServerKeyPath,
    35                         TlsSystemConfig.tlsServerTrustCertPath
    36                     },
    37                     new FileWatchService.Listener() {
    38                         boolean certChanged, keyChanged = false;
    39                         @Override
    40                         public void onChanged(String path) {
    41                             if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {
    42                                 log.info("The trust certificate changed, reload the ssl context");
    43                                 reloadServerSslContext();
    44                             }
    45                             if (path.equals(TlsSystemConfig.tlsServerCertPath)) {
    46                                 certChanged = true;
    47                             }
    48                             if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {
    49                                 keyChanged = true;
    50                             }
    51                             if (certChanged && keyChanged) {
    52                                 log.info("The certificate and private key changed, reload the ssl context");
    53                                 certChanged = keyChanged = false;
    54                                 reloadServerSslContext();
    55                             }
    56                         }
    57                         private void reloadServerSslContext() {
    58                             ((NettyRemotingServer) remotingServer).loadSslContext();
    59                         }
    60                     });
    61             } catch (Exception e) {
    62                 log.warn("FileWatchService created error, can't load the certificate dynamically");
    63             }
    64         }
    65 
    66         return true;
    67     }

      这里定时执行 scanNotActiveBroker(),代码路径:D: ocketmq-master amesrvsrcmainjavaorgapache ocketmq amesrv outeinfoRouteInfoManager.java,代码如下:

     1     public void scanNotActiveBroker() {
     2         Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();
     3         while (it.hasNext()) {
     4             Entry<String, BrokerLiveInfo> next = it.next();
     5             long last = next.getValue().getLastUpdateTimestamp();
     6             if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
     7                 RemotingUtil.closeChannel(next.getValue().getChannel());
     8                 it.remove();
     9                 log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
    10                 this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
    11             }
    12         }
    13     }

      该方法会扫描全部已经注册的 Broker,依次将每一个 Broker 心跳的最后更新时间和当前时间做对比,如果 Broker 心跳的最后更新时间超过 BROKER_CHANNEL_EXPIRED_TIME(1000 * 60 * 2 = 120s),则将 Broker 剔除。至此没有心跳的 Broker 从路由中被剔除,而客户端无任何感知。

  • 相关阅读:
    shared pointer
    realsense数据分析
    cmake log
    Qt自定义类添加qvector报错
    Java csv
    opencv C++ mask_rcnn
    realsense data
    Test
    ubuntu18 bluebooth
    LSTM_Model
  • 原文地址:https://www.cnblogs.com/zuoyang/p/14441879.html
Copyright © 2011-2022 走看看