zoukankan      html  css  js  c++  java
  • RocketMQ角色介绍

    Broker集群:

      接收生产者发送的消息和消费者消费的请求。Master可读可写,Slave只读。

      每个Broker节点,启动时遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,定时上报

    Producer集群:

      通过NameServer集群获得Topic的路由信息,包括Topic下的Queue,以及Queue分布在哪些Broker上。Producer只需要跟Broker集群的主节点建立连接,因为只将消息发送给Master

    Consumer集群:

      通过NameServer集群获得Topic的路由信息,连接到对应的Broker上,主从节点都会连接。   

    三者的关系:https://blog.csdn.net/linyaogai/article/details/77876078

    一、NameServer

      功能:管理Broker,相当于注册中心,对外提供路由注册,路由剔除和路由发现,但NameServer内部节点之间不通信。RocketMQ早期版本参考Kafka使用Zookeeper中间件作为NameServer,它保证了强一致性。RocketMQ的设计只需要保证最终一致性,为了维护成本减少依赖,自己开发NameServer。

      1.1 节点互不通信,如何保证最终一致性

      1.1.1 路由注册

      Broker节点启动时,遍历NameServer列表,对每个都建立长连接,并进行初始注册。每个NameServer维护一个Broker表,存储Broker信息。

      每隔30秒,Broker节点像NameServer集群发送心跳(包含BrokerId,地址,名称,以及所属集群名称),NameServer收到后更新存活时间戳,存在并发修改同一个表的问题,加入了读写锁,消费者可以共享读取配置信息,但NameServer写入时只能串行写,并会阻塞读。

      1.1.2 路由剔除

      正常情况,Broker关闭与NameServer断开连接,关闭监听器监测到断开会将这个Broker信息剔除。

      异常情况,NameServer有一个定时任务轮询每个Broker的心跳同步时间,如果距离上次同步时间过长,则判定失效并移除Broker

      1.1.3 路由发现

      正常情况,生产者在发送陌生Topic之前会从NameServer获取路由信息。而消费者在启动时从NameServer拉取。

      当NameServer中的路由信息发生变化,并不会主动通知客户端(发送者和接收者),需要客户端拉取。

        1.1.3.1 拉取策略

        生产者和消费者实例底层依赖于MQClientInstance类,其中updateTopicRouteInfoFromNameServer()方法,每30秒执行一次

        1. 从生产者和消费者中获取所有Topic

        2. 对每个Topic执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3),获取NameServer新的路由信息

        3. 判断本地TopicRouteTable中的路由信息和新的路由信息是否一致,若一致还需要再判断topic在本地是否存在,不存在也需要更新

        updateTopicRouteInfoFromNameServer()这个方法有三个地方引用:

        1. 首次使用Topic,本地topic信息不存在,需要去注册中心获取

        2. topic不存在,走默认方法,isDefault=true,defaultMQProducer!=null。获取默认Topic(TBM102)下的Broker等等

        3. 定时任务调用  

        源码分析:https://segmentfault.com/a/1190000021175879,https://zhuanlan.zhihu.com/p/299795679

      1.1.4 客户端NameServer选择策略

      RocketMQ将用户设置的NameServer列表设置到NettyRemotingClient类的namesrvAddrList字段中。

      每个客户端只选择一个注册中心连接,具体使用哪个注册中心,使用轮询策略。但第一次选取的结果,下次优先选取。只连接一个是为了减轻注册中心集群的压力。如果连接失败回自动重试其他节点,参见NettyRemotingClient#getAndCreateNameserverChannel。

      1.1.5 特点

      稳定性高。集群中注册中心彼此独立,一个挂掉不会影响其他。没有频繁的读写,性能开销很小。

    二、Broker

      功能:暂存和传输消息

      消息存储是核心,提供了消息的接收、存储和拉取功能。一般会使用主从,主写读,从只读。主节点和从节点BrokerName相同,BrokerId为0是主节点,非0是从节点。

      每30秒向注册中心发送心跳

    三、Producer

      功能:发送消息

      生产者与注册中心保持长连接,将路由信息缓存到本地,每30秒向注册中心拉取新的路由信息

      生产者和Broker保持长连接,每30秒向Broker发送心跳,检测broker是否活跃

    四、Consumer

      功能:接收消息并消费

      消费者和注册中心保持长连接,将路由信息缓存到本地,每30秒向注册中心拉取新的路由信息

      消费者和Broker保持长连接,每30秒向Broker发送心跳,检测broker是否活跃  

    五、Topic

       区分消息的种类,一个发送者可以发送多个topic、一个消费者也可以消费多个topic

    六、Quene

      对于同一个消费者组,一个分区只支持一个消费者消费。分区过少,容易造成消息积压。在生产环境中,一个Topic会设置成多个分区(默认16),支持多个消费者。

      

      (网图:https://blog.csdn.net/qq_33709508/article/details/107937475)

      一个Topic可以分布在各个Broker上,每个Broker上的Topic可以看成是一个Topic分片,而Queue是负载均衡调度的最小单位。

      RocketMQ的负载均衡策略规定,Consumer数量应该小于等于Queue的数量,多余的Consumer不能消费消息。在同一个消费者组中,Queue和消费者是一对多的关系,一个消费者可以消费多个Queue,但是一个Queue只能由一个消费者消费,保证了消费的过程中,不会产生竞争,提高处理速度。

      同一个Queue对同一个消费者组内的消费者是独享的。

      同一个Queue对不同的消费者组是共享的。

    七、Producer Group

      一个生产者组,生产相同的topic。

    八、Consumer Group

      一个消费者组,消费相同的topic,tag。

    九、Message  

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
        private String topic;
        private int flag;
        private Map<String, String> properties;
        private byte[] body;
        private String transactionId;
        //....      
        
    
    }

      topic:消息的主题

      flag:RocketMQ不做处理,由应用设置

      properties:消息属性,存储元数据,比如tag标签用于过滤,keys用作幂等

      body:消息内容,一个字节数组,序列化方式由应用决定。

      transactionId:事务id,在事务消息中用到

    十、Tag

      消息的二级类型,比如订单创建消息,订单完成消息

    十一、Offset

      offset指某个Topic下的某条消息在某个Message Queue中的位置。

      集群模式消费:由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore结构。

    {
        "offsetTable": {
            "test-topic@test-group": {
                "0": 88526, 
                "1": 88528
            }
        }
    }

      consumer本地也会维护一份在offsetTable,在本地消费完成之后,会通过定时任务同步给broker。

      广播模式消费:每个消费者内部存储,使用LocalfileOffsetStore

      消费端的offset机制

        1. offset初始化

        消费者启动时,会初始化加载offset(OffsetStore#load),广播模式下从本地,集群模式没有实现这个方法。在rebalance对messageQueue分配完成之后会更新offset。

          1. 首先删除本地旧的offset信息(OffsetStore#removeOffset,从offsetTable删除该队列)

          2. 请求broker获取最新的offset,这里分成三种模式(enum ConsumeFromWhere):从最新offset开始消费、从第一个offset开始消费、从指定时间点开始消费。这三种都会获取最新offset,如果不小于0,则从该位置继续消费。

        2. offset提交更新

        消费者从broker拉去消息后,将消息的扩展信息存放到ProcessQueue的TreeMap<Long, MessageExt> msgTreeMap属性中,key为消息在queue中的offset。并发消费模式下,拉取得消息会分发给消费线程。消费成功,将消费完成的消息从msgTreeMap中移除,继续从msgTreeMap中取出第一条消息(offset最小的消息,TreeMap是排序了的),将其offset存入本地的offsetTable。将offset同步回服务器是定时任务处理的。MQClientInstance.start()会启动客户端相关定时任务。

        同步消费进度的定时任务:

        DefaultMQPushConsumerImpl#persistConsumerOffset方法,底层是RemoteBrokerOffsetStore#persistAll

        会将当前offsetTable存储的每个队列的offset同步到broker。默认5秒一次version-4.3.2版本

        3. 并发消费时offset的更新

        消费者一次拉取多条消息,譬如offset从1到9,因为是并发消费的,2到9已经消费了,1还没有消费。从msgTreeMap中删除已消费消息,返回的最小offset是1,更新到offsetTable中,当前队列的消费进度依然是1,即使同步broker,broker存放的也依然是1(见2)

        下次拉取时,使用的是上一次拉取返回的nextBeginOffset也就是11,并不是本地offsetTable中存储的值。所以正常情况下并不会重复拉取。但是如果消费者此时宕机了,内存保存的nextBeginOffset丢失了,重新请求broker,拿到1到9,就会重复消费2到9 

        offset管理,请见:https://www.jianshu.com/p/b4970f59a8b1

    十二、网络模型  

      RocketMQ对Broker的线程池进行了隔离,消息的生产、消费、客户端心跳、客户端注册等请求互不干扰。

    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    121. Best Time to Buy and Sell Stock
    70. Climbing Stairs
    647. Palindromic Substrings
    609. Find Duplicate File in System
    583. Delete Operation for Two Strings
    556 Next Greater Element III
    553. Optimal Division
    539. Minimum Time Difference
    537. Complex Number Multiplication
    227. Basic Calculator II
  • 原文地址:https://www.cnblogs.com/walker993/p/14563403.html
Copyright © 2011-2022 走看看