zoukankan      html  css  js  c++  java
  • RocketMQ角色介绍

    Broker集群:

      接收生产者发送的消息和消费者消费的请求。Master可读可写,Slave只读。

      每个Broker节点,启动时遍历NameServer列表,与每个NameServer建立长连接,注册自己的信息,定时上报

    Producer集群:

      通过NameServer集群获得Topic的路由信息,包括Topic下的Queue,以及Queue分布在哪些Broker上。Producer只需要跟Broker集群的主节点建立连接,因为只将消息发送给Master

    Consumer集群:

      通过NameServer集群获得Topic的路由信息,连接到对应的Broker上,主从节点都会连接。   

    三者的关系:https://blog.csdn.net/linyaogai/article/details/77876078

    一、NameServer

      功能:管理Broker,相当于注册中心,对外提供路由注册,路由剔除和路由发现,但NameServer内部节点之间不通信。RocketMQ早期版本参考Kafka使用Zookeeper中间件作为NameServer,它保证了强一致性。RocketMQ的设计只需要保证最终一致性,为了维护成本减少依赖,自己开发NameServer。

      1.1 节点互不通信,如何保证最终一致性

      1.1.1 路由注册

      Broker节点启动时,遍历NameServer列表,对每个都建立长连接,并进行初始注册。每个NameServer维护一个Broker表,存储Broker信息。

      每隔30秒,Broker节点像NameServer集群发送心跳(包含BrokerId,地址,名称,以及所属集群名称),NameServer收到后更新存活时间戳,存在并发修改同一个表的问题,加入了读写锁,消费者可以共享读取配置信息,但NameServer写入时只能串行写,并会阻塞读。

      1.1.2 路由剔除

      正常情况,Broker关闭与NameServer断开连接,关闭监听器监测到断开会将这个Broker信息剔除。

      异常情况,NameServer有一个定时任务轮询每个Broker的心跳同步时间,如果距离上次同步时间过长,则判定失效并移除Broker

      1.1.3 路由发现

      正常情况,生产者在发送陌生Topic之前会从NameServer获取路由信息。而消费者在启动时从NameServer拉取。

      当NameServer中的路由信息发生变化,并不会主动通知客户端(发送者和接收者),需要客户端拉取。

        1.1.3.1 拉取策略

        生产者和消费者实例底层依赖于MQClientInstance类,其中updateTopicRouteInfoFromNameServer()方法,每30秒执行一次

        1. 从生产者和消费者中获取所有Topic

        2. 对每个Topic执行mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3),获取NameServer新的路由信息

        3. 判断本地TopicRouteTable中的路由信息和新的路由信息是否一致,若一致还需要再判断topic在本地是否存在,不存在也需要更新

        updateTopicRouteInfoFromNameServer()这个方法有三个地方引用:

        1. 首次使用Topic,本地topic信息不存在,需要去注册中心获取

        2. topic不存在,走默认方法,isDefault=true,defaultMQProducer!=null。获取默认Topic(TBM102)下的Broker等等

        3. 定时任务调用  

        源码分析:https://segmentfault.com/a/1190000021175879,https://zhuanlan.zhihu.com/p/299795679

      1.1.4 客户端NameServer选择策略

      RocketMQ将用户设置的NameServer列表设置到NettyRemotingClient类的namesrvAddrList字段中。

      每个客户端只选择一个注册中心连接,具体使用哪个注册中心,使用轮询策略。但第一次选取的结果,下次优先选取。只连接一个是为了减轻注册中心集群的压力。如果连接失败回自动重试其他节点,参见NettyRemotingClient#getAndCreateNameserverChannel。

      1.1.5 特点

      稳定性高。集群中注册中心彼此独立,一个挂掉不会影响其他。没有频繁的读写,性能开销很小。

    二、Broker

      功能:暂存和传输消息

      消息存储是核心,提供了消息的接收、存储和拉取功能。一般会使用主从,主写读,从只读。主节点和从节点BrokerName相同,BrokerId为0是主节点,非0是从节点。

      每30秒向注册中心发送心跳

    三、Producer

      功能:发送消息

      生产者与注册中心保持长连接,将路由信息缓存到本地,每30秒向注册中心拉取新的路由信息

      生产者和Broker保持长连接,每30秒向Broker发送心跳,检测broker是否活跃

    四、Consumer

      功能:接收消息并消费

      消费者和注册中心保持长连接,将路由信息缓存到本地,每30秒向注册中心拉取新的路由信息

      消费者和Broker保持长连接,每30秒向Broker发送心跳,检测broker是否活跃  

    五、Topic

       区分消息的种类,一个发送者可以发送多个topic、一个消费者也可以消费多个topic

    六、Quene

      对于同一个消费者组,一个分区只支持一个消费者消费。分区过少,容易造成消息积压。在生产环境中,一个Topic会设置成多个分区(默认16),支持多个消费者。

      

      (网图:https://blog.csdn.net/qq_33709508/article/details/107937475)

      一个Topic可以分布在各个Broker上,每个Broker上的Topic可以看成是一个Topic分片,而Queue是负载均衡调度的最小单位。

      RocketMQ的负载均衡策略规定,Consumer数量应该小于等于Queue的数量,多余的Consumer不能消费消息。在同一个消费者组中,Queue和消费者是一对多的关系,一个消费者可以消费多个Queue,但是一个Queue只能由一个消费者消费,保证了消费的过程中,不会产生竞争,提高处理速度。

      同一个Queue对同一个消费者组内的消费者是独享的。

      同一个Queue对不同的消费者组是共享的。

    七、Producer Group

      一个生产者组,生产相同的topic。

    八、Consumer Group

      一个消费者组,消费相同的topic,tag。

    九、Message  

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
        private String topic;
        private int flag;
        private Map<String, String> properties;
        private byte[] body;
        private String transactionId;
        //....      
        
    
    }

      topic:消息的主题

      flag:RocketMQ不做处理,由应用设置

      properties:消息属性,存储元数据,比如tag标签用于过滤,keys用作幂等

      body:消息内容,一个字节数组,序列化方式由应用决定。

      transactionId:事务id,在事务消息中用到

    十、Tag

      消息的二级类型,比如订单创建消息,订单完成消息

    十一、Offset

      offset指某个Topic下的某条消息在某个Message Queue中的位置。

      集群模式消费:由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore结构。

    {
        "offsetTable": {
            "test-topic@test-group": {
                "0": 88526, 
                "1": 88528
            }
        }
    }

      consumer本地也会维护一份在offsetTable,在本地消费完成之后,会通过定时任务同步给broker。

      广播模式消费:每个消费者内部存储,使用LocalfileOffsetStore

      消费端的offset机制

        1. offset初始化

        消费者启动时,会初始化加载offset(OffsetStore#load),广播模式下从本地,集群模式没有实现这个方法。在rebalance对messageQueue分配完成之后会更新offset。

          1. 首先删除本地旧的offset信息(OffsetStore#removeOffset,从offsetTable删除该队列)

          2. 请求broker获取最新的offset,这里分成三种模式(enum ConsumeFromWhere):从最新offset开始消费、从第一个offset开始消费、从指定时间点开始消费。这三种都会获取最新offset,如果不小于0,则从该位置继续消费。

        2. offset提交更新

        消费者从broker拉去消息后,将消息的扩展信息存放到ProcessQueue的TreeMap<Long, MessageExt> msgTreeMap属性中,key为消息在queue中的offset。并发消费模式下,拉取得消息会分发给消费线程。消费成功,将消费完成的消息从msgTreeMap中移除,继续从msgTreeMap中取出第一条消息(offset最小的消息,TreeMap是排序了的),将其offset存入本地的offsetTable。将offset同步回服务器是定时任务处理的。MQClientInstance.start()会启动客户端相关定时任务。

        同步消费进度的定时任务:

        DefaultMQPushConsumerImpl#persistConsumerOffset方法,底层是RemoteBrokerOffsetStore#persistAll

        会将当前offsetTable存储的每个队列的offset同步到broker。默认5秒一次version-4.3.2版本

        3. 并发消费时offset的更新

        消费者一次拉取多条消息,譬如offset从1到9,因为是并发消费的,2到9已经消费了,1还没有消费。从msgTreeMap中删除已消费消息,返回的最小offset是1,更新到offsetTable中,当前队列的消费进度依然是1,即使同步broker,broker存放的也依然是1(见2)

        下次拉取时,使用的是上一次拉取返回的nextBeginOffset也就是11,并不是本地offsetTable中存储的值。所以正常情况下并不会重复拉取。但是如果消费者此时宕机了,内存保存的nextBeginOffset丢失了,重新请求broker,拿到1到9,就会重复消费2到9 

        offset管理,请见:https://www.jianshu.com/p/b4970f59a8b1

    十二、网络模型  

      RocketMQ对Broker的线程池进行了隔离,消息的生产、消费、客户端心跳、客户端注册等请求互不干扰。

    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    不同品牌交换机设置telnet方法
    Oracle 11G RAC For Windows 2008 R2部署手册(亲测,成功实施多次)
    oracle 11g ADG实施手册(亲测,已成功部署多次)
    如何正确的使用uwsgi
    debian小巧好看的桌面
    zsh中home键失灵问题
    C#_Markov_心得感想
    NLP—WordNet——词与词之间的最小距离
    这不算爬虫吧?!
    Table-Driven Design 表驱动设计
  • 原文地址:https://www.cnblogs.com/walker993/p/14563403.html
Copyright © 2011-2022 走看看