zoukankan      html  css  js  c++  java
  • RocketMq延迟消息实现原理

     

    这边博文介绍Rocketmq的延迟消息的实现管理。文章直接将不会介绍RocketMq的组件,后续将会补上。

    首先上图:

    定义用户topic为study_rocketmq_topic。流程如下:

    1.消息消费者将message投递到broker的commitLog服务

    2.commitLog服务判断message为延迟消息,将实际的topic和queueId保存到message的属性中(为了后面的流程用于消息的重新投递)。并将topic设置成延迟topic(SCHEDULE_TOPIC_XXXX),queueId对应的延迟级别。消息投递时间保存在tagCode中。

    3.消息延迟服务(ScheduleMessageService)从SCHEDULE_TOPIC_XXXX主题循环拉取消息。

    4.将达到发送要求的消息重新推向commitLog服务

    5.commitLog服务,将消息推到study_rocketmq_topic中

    6.消息消费者重study_rocketmq_topic拉取消息

    重要的类:

    1.org.apache.rocketmq.store.DefaultMessageStore.putMessage(MessageExtBrokerInner):消息进入mq文件系统的入口

    2.org.apache.rocketmq.store.CommitLog.putMessage(MessageExtBrokerInner):消息保存到Commit文件的入口(保存消息之后,包含刷满策略,ha处理)

    3.org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService:消息分发服务

    4.org.apache.rocketmq.store.schedule.ScheduleMessageService:延迟消息服务

    5.org.apache.rocketmq.store.DefaultMessageStore.doDispatch(DispatchRequest):消息服务入口(分发给consumerqueue、index)

    图片地址见:https://github.com/lrlxz1127/rocketmq/blob/%E6%BA%90%E7%A0%81%E9%98%85%E8%AF%BB%E7%89%88/learn/RocketMq%E5%BB%B6%E8%BF%9F%E6%B6%88%E6%81%AF%E5%AE%9E%E7%8E%B0.svg

  • 相关阅读:
    Binary Search Tree Iterator 解答
    Invert Binary Tree 解答
    Min Stack 解答
    Trapping Raining Water 解答
    Candy 解答
    Jump Game II 解答
    Implement Hash Map Using Primitive Types
    Gas Station 解答
    Bucket Sort
    HashMap 专题
  • 原文地址:https://www.cnblogs.com/shoshana-kong/p/14761118.html
Copyright © 2011-2022 走看看