zoukankan      html  css  js  c++  java
  • RocketMQ之四:RocketMq事务消息

    事务消息

    通过消息的异步事务,可以保证本地事务和消息发送同时执行成功或失败,从而保证了数据的最终一致性。

    发送端执行如下几步:

    1. 发送prepare消息,该消息对Consumer不可见
    2. 执行本地事务(如 update DB)
    3. 若本地事务执行成功,则向MQ提交消息确认发送指令;若本地事务执行失败,则向MQ发送取消指令(取消prepared消息)
    • 若MQ长时间未收到确认发送或取消发送的指令,则向业务系统询问本地事务状态,并做补偿处理。(

      RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

      如果endTransaction方法执行失败,导致数据没有发送到brokerbroker会有回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用我们的事务设置的决断方法,最后调用endTransactionOnewaybroker来更新消息的最终状态。

    RocketMq事务消息

    客户端事务消息发送


    发送prepare消息复用了普通消息发送,只是给消息增加了TRAN_MSG=true的属性,该属性决定prepare消息对Consumer不可见

    消息写入CommitLog


    事务消息的CommitLog写入和普通消息一致,它利用文件的顺序写来提升吞吐量,并采用文件映射的方式降低系统开销。

    消息写入ConsumeQueue


    ConsumeQueue的写入是采用异步方式完成的,ReputMessageSerivce作为一个长驻线程负责查询索引的构造和ConsumeQueue的写入,对于Prepare/Rollback消息不会写ConsumeQueue,从而保证它们对Consumer不可见

    Broker端事务提交/回滚


    Broker收到提交/回滚指令后,首先从根据offset从CommitLog读出原有的prepare消息,构造新的消息(修改事务状态标识)并写入Broker。对于一条事务消息,RocketMq会存储两条消息,存在一定资源浪费。其实它是为了保证随后的消费者能尽可能从PageCache中读到该消息,而不是读取较早的prepare消息(可能导致缺页中断),以提升系统吞吐量。

    此外,rocketmq的最新版本(4.2.0)尚未支持本地事务的状态回查,这样可能存在由于网络抖动,导致commit/rollback未提交到broker导致prepare消息长期悬挂的风险。

    在RocketMq的设计文档中,为事务消息增加了一张事务状态表,它维护了消息的Offset、事务状态(P/C/R)信息。可以采用如下思路实现事务消息的回查机制:


    • 在prepare消息写入commitLog后,可以通过CommitLogDispatcher写入一条事务状态记录(state=P)
    • 在提交/回滚事务时,根据transactionId找到对应的事务状态记录,并修改对应的事务状态
    • 通过长驻线程扫描事务状态表,对于超过一定时间的Prepare事务,发起对业务方的事务状态回查,根据回查结果修改事务状态,并向brokder发送相应的Commit/Rollback消息。


    作者:Andy的书斋
    链接:https://www.jianshu.com/p/c26b3af5880f
    来源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    精彩回顾 | Serverless Developer Meetup 12.04 深圳站
    Dubbo3 Triple 协议简介与选型思考
    阿里云 FaaS 架构设计与创新实践
    KubeDL 0.4.0 Kubernetes AI 模型版本管理与追踪
    shell脚本awk的用法
    case用法ping命令脚本(工作中常用的)
    linux时间与internet时间同步
    bootStrap表单验证插件的使用
    状态模式之观察者模式
    20211125
  • 原文地址:https://www.cnblogs.com/duanxz/p/5063377.html
Copyright © 2011-2022 走看看