zoukankan      html  css  js  c++  java
  • EQueue文件持久化消息关键点设计思路

    要持久化的关键数据有三种

    1. 消息;
    2. 队列,队列中存放的是消息索引信息,即消息在文件中的物理位置(messageOffset)和在队列中的逻辑位置(queueOffset)的映射信息;
    3. 队列消费进度,表示当前队列中的消息消费到第几个了;

    发送消息的设计

    1. producer将消息的二进制数据发送到broker;
    2. broker做的事情:
      • 单线程持久化消息到内存映射文件;
      • 将当前消息的索引信息放入缓冲区,可以使用disruptor的ringbuffer实现,单线程写,无锁。
      • 单线程从缓冲区读取消息索引信息,并将索引信息写入内存映射文件;
      • 消息的内存映射文件、消息索引的内存映射文件都定时刷新到磁盘,比如每隔1s刷新一次,可配置;
    3. broker将当前消息的索引信息放入缓冲区后,就立即返回了,然后producer就收到了消息发送的结果;

    其他说明:

    1. 因为不可能用一个文件来保存所有的消息,所以肯定是用多个文件的方式。也就是说,无论是保存消息还是保存消息索引,都用多个文件。另外,由于队列有多个,所以每个队列都对应多个内存映射文件。队列文件的目录命名规则:rootPath / topic / queueId / queue mapped files
    2. broker在将消息的索引信息放入缓冲区时,要检查缓冲区是否到达一定的水位,比如ringbuffer总大小100W个槽,假如水位是80%,那就是当现在ringbuffer中可用的槽不到20%时,应该要做流控,比如sleep 100s;理论上应该不会到达水位,因为写消息索引肯定比写消息本身要快;

    消费消息的设计

    1. consumer告诉broker当前需要拉取哪个topic下的哪个队列里的第几个位置(queueOffset)开始的消息,并告诉要最多拉取多少个消息;
    2. broker根据topic和queueId找到对应的队列;
    3. 根据queueOffset从队列拿到消息在文件中的物理位置,即messageOffset;
    4. 根据messageOffset从消息的内存映射文件获取消息二进制数据;
    5. 将消息二进制数据写入临时的内存流里,该内存流里包含了所有要返回的消息;
    6. 消息拉取数量达到要求或没有新的消息可以拉取后,将内存流对应的二进制数据返回给consumer;
    7. consumer解析二进制数据,得到所有的消息对象;

    broker定时清理过期的消息和消息索引

    1. 每隔10s扫描是否有过期的消息文件,过期时间可配置,比如三天;扫描时,发现文件的最后修改时间是3天前,则删除;
    2. 每隔10s扫描是否有过期的消息索引文件,判断是否过期的依据是扫描每个消息索引文件,判断该文件中的最后一个消息索引的messageOffset是否比最小的messageOffset还要小;如果小,就说明这个消息索引文件已经无意义了,可以删除;

    broker启动时的逻辑

    1. 扫描磁盘上所有的消息的存储文件,为每个文件建立内存映射;
    2. 扫描磁盘上所有的队列(消息索引)的存储文件,为每个文件建立内存映射;
    3. 对每个队列,预恢复几个文件(比如最后的3个文件)的数据到内存,剩余的用到时再恢复;
    4. 同理,对于存储消息的文件,也预恢复几个(比如最后的3个文件)到内存;一般大部分消息者只要消费进度不是太慢,总是应该已经赶上了最后那三个文件了;
    5. 关于异常关闭broker时的逻辑,暂时还没想清楚,还需要再细思;
  • 相关阅读:
    dotnet 新项目格式与对应框架预定义的宏
    dotnet 线程静态字段
    dotnet 线程静态字段
    dotnet 通过 WMI 拿到显卡信息
    dotnet 通过 WMI 拿到显卡信息
    dotnet 通过 WMI 获取指定进程的输入命令行
    dotnet 通过 WMI 获取指定进程的输入命令行
    dotnet 通过 WMI 获取系统信息
    dotnet 通过 WMI 获取系统信息
    PHP show_source() 函数
  • 原文地址:https://www.cnblogs.com/netfocus/p/4802748.html
Copyright © 2011-2022 走看看