zoukankan      html  css  js  c++  java
  • Rocket消息存储原理

    1. 何时存储消息

    分布式队列因为有高可靠性的要求,所以数据要进行持久化存储。

    1. MQ收到一条消息后,需要向生产者返回一个ACK响应,并将消息存储起来。
    2. MQ Push一条消息给消费者后,等待消费者的ACK响应,需要将消息标记为已消费。如果没有标记为消费,MQ会不断的尝试往消费者推送这条消息。
    3. MQ需要定期删除一些过期的消息,这样才能保证服务一直可用

    2.消息存储介质

    RocketMQ采用的是类似于Kafka的文件存储机制,即直接用磁盘文件来保存消息,而不需要借助MySQL这一类索引工具。

    2.1 磁盘保存文件慢吗?

    磁盘如果使用得当,磁盘的速度完全可以匹配上网络 的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s, 超过了一般网卡的传输速度。但是磁盘随机写的速度只有大概100KB/s,和顺序写的性能相差6000倍!因为有如此巨大的速度差别,好的消息队列系统会比普通的消息队列系统速度快多个数量级。RocketMQ的消息用顺序写,保证了消息存储的速度。

    2.2 零拷贝技术加速文件读写

    Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制

    一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

    • read;读取本地文件内容
    • write;将读取的内容通过网络发送出去

    这两个看似简单的操作,实际进行了4 次数据复制,分别是:

    • 从磁盘复制数据到内核态内存;
    • 从内核态内存复 制到用户态内存;
    • 然后从用户态 内存复制到网络驱动的内核态内存;
    • 最后是从网络驱动的内核态内存复 制到网卡中进行传输。

    而通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过NIO包中的MappedByteBuffffer实现的。RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。

    3. 消息存储结构

    RocketMQ消息的存储分为三个部分:

    • CommitLog:存储消息的元数据。所有消息都会顺序存入到CommitLog文件当中。CommitLog由多个文件组成,每个文件固定大小1G。以第一条消息的偏移量为文件名。
    • ConsumerQueue:存储消息在CommitLog的索引。一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
    • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

    4. 刷盘机制

    RocketMQ需要将消息存储到磁盘上,这样才能保证断电后消息不会丢失。同时这样才可以让存储的消息量可以超出内存的限制。RocketMQ为了提高性能,会尽量保证磁盘的顺序写。消息在写入磁盘时,有两种写磁盘的方式,同步刷盘和异步刷盘

    • 同步刷盘:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘, 然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写 成功的状态
    • 异步刷盘:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
    • 配置方式:刷盘方式是通过Broker配置文件里的flflushDiskType 参数设置的,这个参数被配置成SYNC_FLUSH、ASYNC_FLUSH中的 一个。

    5. 消息主从复制

    如果Broker以一个集群的方式部署,会有一个master节点和多个slave节点,消息需要从Master复制到Slave上。而消息复制的方式分为同步复制和异步复制。

    • 同步复制:同步复制是等Master和Slave都写入消息成功后才反馈给客户端写入成功的状态。
    • 异步复制:异步复制是只要master写入消息成功,就反馈给客户端写入成功的状态。然后再异步的将消息复制给Slave节点。
    • 配置方式:消息复制方式是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成ASYNC_MASTER、 SYNC_MASTER、SLAVE三个值中的一个

    6. 负载均衡

    6.1Producer负载均衡

    Producer发送消息时,默认会轮询目标Topic下的所有MessageQueue,并采用递增取模的方式往不同的MessageQueue上发送消息,以达到让消息平均落在不同的queue上的目的。而由于MessageQueue是分布在不同的Broker上的,所以消息也会发送到不同的broker上。

    同时生产者在发送消息时,可以指定一个MessageQueueSelector。通过这个对象来将消息发送到自己指定的MessageQueue上。这样可以保证消息局部有序。

    6.2 Consumer负载均衡

    Consumer也是以MessageQueue为单位来进行负载均衡。分为集群模式和广播模式。

    集群模式:在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条messagequeue。

    广播模式:广播模式下,每一条消息都会投递给订阅了Topic的所有消费者实例,所以也就没有消息分配这一说。而在实现上,就是在Consumer分配Queue时,所有Consumer都分到所有的Queue。

    7.消息重试

    对于广播模式的消息, 是不存在消息重试的机制的,即消息消费失败后,不会再重新进行发送,而只是继续消费新的消息。

    而对于普通的消息,当消费者消费消息失败后,你可以通过设置返回状态达到消息重试的结果

    7.1 如何让消息进行重试

    集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置。可以有三种配置方式:

    • 返回Action.ReconsumeLater-推荐
    • 返回null
    • 抛出异常

    7.2 重试消息如何处理

    重试的消息会进入一个 “%RETRY%”+ConsumeGroup 的队列中。

    然后RocketMQ默认允许每条消息最多重试16次,每次重试的间隔时间如下:

    如果消息重试16次后仍然失败,消息将不再投递。转为进入死信队列

    8. 死信队列

    当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。死信队列的名称是%DLQ%+ConsumGroup

    死信队列的特征

    • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例
    • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
    • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
    • 死信队列中的消息不会再被消费者正常消费。
    • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fifileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

    9. 消息幂等

    9.1 幂等的概念

    在MQ系统中,对于消息幂等有三种实现语义:

    • at most once 最多一次:每条消息最多只会被消费一次
    • at least once 至少一次:每条消息至少会被消费一次
    • exactly once 刚刚好一次:每条消息都只会确定的消费一次

    9.2 消息幂等的必要性

    在互联网应用中,尤其在网络不稳定的情况下,消息队列 RocketMQ 的消息有可能会出现重复,这个重复简单可以概括为以下情况:

    • 发送时消息重复
    • 投递时消息重复
    • 负载均衡时消息重复

    9.3 处理方式

    要在业务上自行来保证消息消费的幂等性。

  • 相关阅读:
    kafka_consumer3->spark_master
    为什么.NET感觉上比Java差一点
    学习Scala: 初学者应该了解的知识
    函数式编程 : 一个程序猿进化的故事
    Scala underscore的用途
    不变(Invariant), 协变(Covarinat), 逆变(Contravariant) : 一个程序猿进化的故事
    Scala Collection简介
    C# on Visual Studio Code
    我的Machine Learning学习之路
    Scala on Visual Studio Code
  • 原文地址:https://www.cnblogs.com/steven158/p/15688426.html
Copyright © 2011-2022 走看看