zoukankan      html  css  js  c++  java
  • RocketMQ源码分析

    1. NameServer

      启动:NamesrvStartup#main0()。内部注意初始化了两个线程池,一个每10秒扫描所有的broker,如果一个broker断开120秒,则关闭链接并移除broker。另一个线程池一个打印KV信息。

      路由实现类:RouteInfoManager

    package org.apache.rocketmq.namesrv.routeinfo;
    public class RouteInfoManager { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME); private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; private final ReadWriteLock lock = new ReentrantReadWriteLock(); //Topic的队列路由信息 private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; //Broker基础信息,包括brokerName,所属集群名称,地址 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; //Broker集群信息,集群中所有broker的名称 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; //Broker状态信息,nameServer每次收到心跳后更新,检测下线broker也是通过扫描这个属性 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; //Broker上的FilterServer列表,用于类模式消息过滤 private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable; public RouteInfoManager() { this.topicQueueTable = new HashMap<String, List<QueueData>>(1024); this.brokerAddrTable = new HashMap<String, BrokerData>(128); this.clusterAddrTable = new HashMap<String, Set<String>>(32); this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256); this.filterServerTable = new HashMap<String, List<String>>(256); }   //.... }

      1.1 处理心跳请求:DefaultRequestProcessor#processRequest方法,会处理消费者,生产者和broker发送的心跳

        1.1.1 处理路由注册,RouteInfoManager#registerBroker方法,这是个同步方法,保证只有一个线程执行注册动作。内部就是根据请求维护 brokerAddrTable

        1.1.2 处理路由剔除,RouteInfoManager#unregisterBroker方法,从topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable中移除该broker信息

        1.1.3 处理路由发现,RouteInfoManager#getRouteInfoByTopic方法,根据请求体的topic从topicQueueTable中获取路由信息

    2. Broker

      包含模块:

      1. Remoting Module:负责处理客户端的请求

      2. Client Manager:负责管理客户端和维护Consumer的Topic订阅信息

      3. Store Service:负责消息存储和查询

      4. HA Service:高可用服务,提供Master Broker和Slave Broker之间的数据同步

      5. Index Service:根据特定的Message key对消息进行索引,以提供消息查询

      启动流程

      2.1. 创建:BrokerStartup#createBrokerController方法

        1. NettyServer和NettyClient的配置处理

        2. 命令行参数的处理

        3. Broker角色的处理

        4. 创建BrokerController

        5. 初始化BrokerController通过调用方法

      2.2. BrokerController启动:BrokerStartup#start方法

        1. 启动一大堆服务

        2. 向所有NameService注册自己的信息(IP、名字、id),BrokerController#registerBrokerAll方法,内部是由线程池提交向每一个NameServer的心跳。

        3. 开启一个定时线程池,每隔30秒向nameServer发送心跳,nameServer接收到后会更新心跳时间

      2.3. 路由注册

        单个的注册方法是BrokerOuterAPI#registerBroker方法,内部有同步和异步两种方式。nameServer处理请见1.1.1

      2.4 路由剔除

        1. Broker正常关闭,执行unrgisterBroker指令

        2. nameServer定时扫描,剔除心跳超时的Broker

        nameServer处理请见1.1.2

      2.5 路由发现

        nameServer收到路由信息变更后,不会主动推送,需要客户端定时拉取。nameServer处理请见1.1.3

    3. producer

      3.1 消息生产者org.apache.rocketmq.client.producer.MQProducer接口

        内部定义了发送各种消息的方法。有两个实现,一个DefaultMQProducer不支持发送事务消息,另一个是TransactionMQProducer(继承DefaultMQProducer)专门处理事务发送

      3.2 RocketMQ消息

        生产端发送的消息:

    public class Message implements Serializable {
        private static final long serialVersionUID = 8445773977080406428L;
    
        private String topic;
        private int flag;
        //扩展属性
        private Map<String, String> properties;
        private byte[] body;
        private String transactionId;
    
        public Message() {
        }
    
        public Message(String topic, byte[] body) {
            this(topic, "", "", 0, body, true);
        }
    
        /**
         * @param topic          主题
         * @param tags           tag 用于消息过滤
         * @param keys           消息索引
         * @param flag           消息flag
         * @param body           消息体
         * @param waitStoreMsgOK 消息发送时是否等消息存储完毕
         */
        public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
            this.topic = topic;
            this.flag = flag;
            this.body = body;
    
            if (tags != null && tags.length() > 0)
                this.setTags(tags);
    
            if (keys != null && keys.length() > 0)
                this.setKeys(keys);
    
            this.setWaitStoreMsgOK(waitStoreMsgOK);
        }
    
        public void setDelayTimeLevel(int level) {
            this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
        }
        //...
    }

        隐藏属性keys、tags、waitStoreMsgOk、还有一个是DELAY设置在properties中      

        Broker返给消费者的消息做一层封装的消息:

    public class MessageExt extends Message {
        private static final long serialVersionUID = 5720810158625748049L;
        //消息发送到的queue编号
        private int queueId;
        //消息在Broker的存盘大小
        private int storeSize;
        //消息在queue中的偏移量
        private long queueOffset;
        //系统标志的开关
        private int sysFlag;
        //发送者发送消息时设置的创建时间
        private long bornTimestamp;
        private SocketAddress bornHost;
        private long storeTimestamp;
        private SocketAddress storeHost;
        //注意!是broker设置的,MessageDecoder#createMessageId。消费者拿不到生产者设置的msgId
        private String msgId;
        //消息在commitLog中的偏移量
        private long commitLogOffset;
        private int bodyCRC;
        //消息重试次数
        private int reconsumeTimes;
        private long preparedTransactionOffset;
        //...
    }

      3.3 发送者启动

        DefaultMQProducer#start方法,封装了DefaultMQProducerImpl#start    

        1. DefaultMQProducerImpl初始时状态是CREATE_JUST,进行一些初始化的配置

        2. 根据当前客户端ID先尝试获取MQClientInstance实例,否则创建一个

        3. 将当前发送者注册到MQClientInstance实例中(维护一个ConcurrentMap<String/* group */, MQProducerInner> producerTable)

        4. 启动MQClientInstance

        5. 将当前生产者状态设置为RUNNING

        6. 将心跳发送给broker(MQClientInstance#sendHeartbeatToAllBrokerWithLock方法)

      3.4 发送者发送

        DefaultMQProducer#send方法,封装了DefaultMQProducerImpl#send,这个方法默认是同步发送的。核心方法是DefaultMQProducerImpl#sendDefaultImpl

        1. 确认发送者状态是RUNNING(已经成功启动3.3/5)。

        2. 根据topic查找路由信息(TopicPublishInfo),先从本地缓存获取,没有则请求nameServer,并缓存到本地(DefaultMQProducerImpl#tryToFindTopicPublishInfo方法)

        3. 同步发送模式支持重试,默认重试两次,超时则不再重试

        4. 从路由中选择一个queue,DefaultMQProducerImpl#selectOneMessageQueue方法——>MQFaultStrategy#selectOneMessageQueue方法。

          

           这个方法有两种机制,开启容错和不开启容错。默认不开启

          不开启容错:直接忽略上次的broker

          

           开启容错

          

           开启容错的情况下,先按照正常轮询选择一个broker,如果broker判断可用则返回。否则,从之前有问题的broker中选择一个好一点的broker。这里解释下,一旦一个broker延迟时间过长,那么在一定时间内,这个broker的优先级很低。

          

          这里的排序操作:

          

          4.1 循环发送机制:TopicPublishInfo内部维护了一个ThreadLocal变量sendWhichQueue,每次选择一个queue发送消息则递增1。它对queue集合大小取模得到本次发送选用的queue。

          4.2 避错机制:如果4.1选取的queue被标记过有失败记录或响应时间太长,并且还没有到可以重试的设定时间(这个时间是根据响应时长配置的),那么会选择其他queue。

          

        5. 核心方法sendKernelImpl

          5.1 根据queue从本地缓存拿到broker地址,本地缓存没有则同3.4/2。优先使用broker的VIP地址

          5.2 非批消息设置消息ID,消息压缩(消息体大于4K),事务消息设置sysFlag

          5.3 按照发送模式(同步,异步和单向)发送消息     

    4. 消息存储   

      4.1 CommitLog#putMessage方法。

      1. 写入文件映射对象MappedFile。因为写的同一个文件,写文件时会上锁。新消息追加到文件中,当写到文件尾时,会自动创建下一个文件继续写入,写完释放锁。

      2. 刷盘CommitLog#handleDiskFlush。分为同步刷盘和两种异步刷盘

      3. 主从同步

      

      4.2 checkpoint文件

      checkpoint记录commitLog、ConsumeQueue、Index文件的刷盘时机,文件固定大小4K,只有前24个字节。分别记录了commitLog刷盘时间点(8字节)+消费队列文件刷盘时间点(8字节)+索引文件刷盘时间点(8字节)

      RocketMQ的文件都是顺序写的,旧的文件不会被更新。在启动时,会将全部文件加载到内存,所以必须保证文件的及时清理,清除标准一个是过期文件(没有再被操作)只保存3天,另一个是磁盘空间不足

      DefaultMessageStore类负责管理,该类启动时,开启定时任务,每10秒检查一次,清理commitLog和consumeQueue文件

    5. Consumer

      消费方式:PullConsumer和PushConsumer

      PullConsumer:用户主动调用pull方法获取消息

      PushConsumer:消费者循环发送pull请求到broker,没有消息,broker会将请求放入等待队列,消息到达后返回给消费者。

      消费模式:集群和广播

      集群:每个消费者组中只有一个消费成功,支持重试

      广播:所有消费者都收到,不支持重试

      5.1 消费者启动

        DefaultMQPushConsumer#start()——>DefaultMQPushConsumerImpl#start

        1. 初始状态CREATE_JUST,进行初始化配置

        2. 根据当前客户端ID先尝试获取MQClientInstance实例,否则创建一个。这个实例被同一个客户端的消费者和生产者共用

        3. 获取消费进度对象offsetStore,广播模式创建一个LocalFileOffsetStore对象,集群模式创建一个RemoteBrokerOffsetStore对象

        4. 广播模式下加载消费进度offsetStore#load,存储到LocalFileOffsetStore.offsetTable属性

        5. 根据顺序消费还是并发消费初始化消费逻辑,有并发和顺序两个实现

        6. 启动消费逻辑consumeMessageService#start方法

        7. 注册消费者(将自己注册到MQClientInstance维护的消费者Map中)

        8. 启动MQClientInstance,会开启PullMessageService(负责拉取消息)和rebalanceService(负责rebalance)两个线程

        9. 从nameServer更新topic路由信息

        10. 检查broker状态

        11. 向每个broker发送心跳

        12. 触发rebalance

      5.2 拉取消息

        DefaultMQPushConsumer#pullMessage

        1. 将消息拉到本地ProcessQueue中缓存起来等待消息。

        2. 当ProcessQueue存储的未消费的消息数大于阈值(默认1000个),延迟拉取

        3.  当ProcessQueue存储的未消费的消息体大于阈值(默认100M),延迟拉取

        4. 如果不是顺序消费,判断ProcessQueue存储的未消费消息中,最大消息偏移和最小消息偏移差值是否超过阈值(默认2000),延迟拉取

          2,3,4是对消费端的流量控制,防止积压的消息过多。

      5.3 消费消息

        ConsumeMessageService#consumeMessageDirectly方法,有并发ConsumeMessageConcurrentlyService和顺序ConsumeMessageOrderlyService两种实现

        并发模式下,维护了一个线程池,将任务交给线程池执行。

     

    6. 附录

      6.1 OffsetStore对象

        6.1.1 什么是OffsetStore?

        OffsetStore对象维护消费者消费队列的进度(属性offsetTable),每个消费者启动时创建这个对象。有两个实现分成广播模式和集群模式,广播模式下存储在本地,集群模式存储在服务器,但是每个消费者本地也会维护一份。

        

        

        6.1.2 何时载入消费进度?

        广播模式下,在启动时加载,见5.1/4

        集群模式下,在rebalance时更新。调用链很长,具体是在按照topic重新分配完queue,这个时候需要根据重分配结果更新ProcessQueueTable(消费者的消费队列的消费快照,ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable。key是消费队列,ProcessQueue包含了从这个队列拉取并缓存到本地的消息实体)。然后更新一下拉取的请求体集合(这个东西自动维护在PullMessageService里,LinkedBlockingQueue<PullRequest>,每次根据这个配置拉取消息),这个里会请求broker获取最新的消费进度,并存储到本地。

        RebalancePushImpl#updateProcessQueueTableInRebalance——>RebalancePushImpl#computePullFromWhere——>OffsetStore#readOffset

        

    人生就像蒲公英,看似自由,其实身不由己。
  • 相关阅读:
    LCS(最长公共子序列)
    如何利用MAXScript代码进行DNA双螺旋结构的创建
    如何在3ds MAX中进行宏脚本MacroScript的编写
    3dsmax:[5]maxscript是干什么的
    Visual MAXScript 工具
    3D MAXScript(1)
    如何写3DMAX的插件
    利用GitHub for Window 来进行项目的上传
    VS中的库
    软件测试作业
  • 原文地址:https://www.cnblogs.com/walker993/p/14574847.html
Copyright © 2011-2022 走看看