zoukankan      html  css  js  c++  java
  • RocketMQ之NameSever

    一、NameSever介绍

    1.1 NameSever是什么

    NameServer 的主要功能是为整个 MQ 集群提供服务协调与治理,具体就是记录维护 TopicBroker 的信息,及监控 Broker 的运行状态,为 client 提供路由能力。 NameServer 之间没有信息同步操作,主要通过 Broker 轮询修改信息。

    * Name Server 是一个无状态节点,可集群部署,节点之间无任何信息同步;
    * Broker 分为 Master 与 Slave,它们是一对多的关系,一个 Master 可以对应多个 Slave。 每个 Broker 与 Name Server 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 Name Server;
    * Producer与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Master Broker 发送心跳。Producer 完全无状态,可集群部署;
    * Consumer 与 Name Server 集群中的其中一个节点(随机选择)建立长连接,定期从 Name Server 取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave 发送心跳。Consumer 既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定。
    

    1.2 集群状态的存储结构

    集群的状态保存在 org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager 类的五个变量中:

    private final HashMap<String,List<QueueData> topicQueueTable;
    

    topicQueueTablekey 存放的是 Topic 的名称,它的值是一个 QueueData 队列,队列长度等于这个 Topic 数据存储的 MasterBroker 的个数,QueueData 存放着 Broker 的名称、读写 queue 的数量、同步标识等。

    private final HashMap<String,BrokeData> brokerAddrTable;
    

    存储着一个 BrokerNme 对应的属性信息,包括所属 Cluster 名称,MasterBrokerSlaveBroker 的地址信息

    private final HashMap<String,BrokerLiveInfo> brokerLiveTable;
    

    存储着一台 Broker 机器的实时状态,包括上次更新状态的时间戳等,NameServer 会定时检查这个时间戳,当超时之后会认为这个 Broker 无效,将其从列表中清除。

    private final  HashMap<String, Set<String>> clusterAddrTable;
    

    存储集群中 Cluster的信息,一个 Cluster 名称对应一个 BrokeName 组成的集合。

    private final HashMap<String, List<String>> filterServerTable;
    

    过滤服务器,存储 Broker 要过滤的多个服务。

    二、角色之间的通信流程

    2.1 NameSever和 Broker间的通信

    1) 在 NameServer启动后,启动 Broker。Broker 在启动时会加载配置中的 topic 信息。加载当前 Broker 上的所有 topic 信息:topic 名称;topic 的 Queue 权限(可读,可写,可继承等等)、Queue 的个数。然后将这些数据传输到 NameServer,美其名曰 registerBroker。
        
    2) NameServer 与 Broker 间维持着一个 SocketChannel,长连接,Broker 每隔30S向其配置的所有的 NameServer 执行 registerBroker 工作,这就是 Broker 和 NameServer 间的心跳。
    
    3) NameServer 在接收到 Broker传递的心跳信息时,若这次心跳是其第一次心跳,那么创建 BrokerData,创建BrokerLiveInfo,保存其 dataVersion和 lastUpdateTimestamp;如果不是第一次,那么更新其lastUpdateTimestamp 和 dataVersion。
        
    4) 如果这个 Broker 是 Master , 且这次心跳信息是其第一次心跳,那么会创建当前 Broker 的 QueueData 。如果不是第一次心跳,但当前 Broker 的 dataVersion 与 NameServer 上保存的不一致 (当 Broker上新增加了 topic 时会更新 dataVersion ,dataVersion 主要用当前时间戳表示),此时会用当前心跳的数据覆盖之前注册的数据。
    
    5) 如果当前 Broker 是 Slave,那么将 Master的 brokerAddr 放入心跳注册结果中,返回给 Slave,这样Slave 就能与 Master 间进行数据传输。
        
    6) NameServer 维护着与其他组件的 SocketChannel 对象,针对所有组件(Broker和Client)的长连接注册了ChannelEventListener,监听此 SocketChannel 的连接事件。当某个 SocketChannel 出现异常或断开时(注意是长连接断开而不是心跳停止!),会循环遍历所有 Broker 的长连接,如果发现断开长连接是属于某个 Broker 的,那么清除此 Broker 的 BrokerData和QueueData ,如果不属于 Broker ,则什么都不做。这样当 Client(Producer,Consumer) 下次请求指定 topic 的 TopicRouteData 时,就不会包含此 Broker 的的数据了,也就是MessageQueue 上不再包含此 Broker 上的 Queue。
    
    7) 因为 ChannelEventListener 的连接事件处理里只对 Broker 做相应处理,没有涉及到 Client。所以在Broker 宕机或者增加时,不会实时通知 Client,Client 最晚需要30S时间才能感知到这种变化,因为Client更新 TopicRouteData 的间隔是30S。
    
    8) NameServer 每隔30S对所有 Broker 的长连接进行扫描,当发现其 lastUpdateTimestamp 距离当前时间超过2m时,断开长连接,清空相应数据。
    

    2.2 Producer 和 Consumer 与 NameServer间的通信

    1) Producer 在发送消息时,首先根据消息的 topic 查看自身是否含有此 topic 相应的 MessageQueue ( 正常情况下第一次发送时是没有的 )。当没有 MessageQueue 时,会从 NameServer 处请求指定 topic 的TopicRouteData,也就是 List<BrokerData> 和 List<QueueData>。然后根据 QueueData 里的writeQueueNums 和 BrokerData 里的 topic, HashMap<brokerId,broker address> 生成MessageQueue,MessageQueue 的 id 从0开始,依次递增。注意如果 BrokerData 里不含有 Master 的 Adress,那么其对应的 QueueData 将会被废弃,因为只有 Master 才能写入消息。
        
    2) 生成 MessageQueue 后,会将此 topic 存在生产者客户端,客户端每隔30S向 NameServer 请求此 topic的 TopicRouteData,生成 MessageQueue,覆盖上次更新的值。
        
    3) Consumer 在启动之前就需要指定订阅的 topic,因此其在启动时就会向 NameServer 请求相应 topic的TopicRouteData,同样的形式生成 MessageQueue,但和 Producer不同的是,Consumer 可以从 Slave 处拉取消息,所以不会过滤 Master 宕机的 Broker 数据。
        
    4) Consumer 客户端也每隔 30S 从 NameServer 处更新当前 topic 的数据,覆盖上次的值。
    
  • 相关阅读:
    将训练好的Tensorflow模型部署到web应用中
    python pip升级
    python time
    python 队列模拟递归遍历目录(广度遍历)
    python 栈模拟递归遍历目录(深度遍历)
    python 递归遍历目录
    python 队列
    python 栈
    python 递归
    python 语音模块
  • 原文地址:https://www.cnblogs.com/markLogZhu/p/12585789.html
Copyright © 2011-2022 走看看