zoukankan      html  css  js  c++  java
  • RocketMQ 的事务消息

    rocketMQ解决分布式事务的思路:1、a事务成功和mq收到消息保持一致。2、保证这条消息一定会被消费,从而完成b事务。时效性可能差了点,但是能达到最终的一致,优点是不会阻塞。

    其中第二步保证消息一定会被消费可以看之前的博文,消费端用集群模式可以做到这一点。

    下面来看怎样使得 事务a成功 和 mq收到消息 保持一致:

    sendMessageInTransaction中,往header中放PROPERTY_TRANSACTION_PREPARED属性,值为true,请求来到了broker这边的SendMessageProcessor.sendMessage方法中,判断刚才的值为true的话,调用getTransactionalMessageService().prepareMessage,把message原来的topic和queueId隐藏起来,替换为topic: RMQ_SYS_TRANS_HALF_TOPIC queueId : 0,然后存入commitLog中,这个topic是没人订阅的所以不会被消费,发送成功后,执行a事务(这里执行a事务是在transactionListener.executeLocalTransaction方法中,transactionListener还有一个方法是checkLocalTransaction,这个方法是查看事务是否完成,逻辑都是需要用户自己写,两者之间要一致)。

    正常情况:如果事务a成功,会发送endTransaction的消息,broker收到之后会把RMQ_SYS_TRANS_HALF_TOPIC 中的消息取出来放到事务消息真实对应的topic中,被b消费。

    异常情况:

    1、half请求broker没收到,broker将不会存储half消息,producer也不会执行下面的事务。

    2、half请求broker收到,但是producer没有成功收到broker的响应,那么producer暂时不会执行自己的事务,broker会有回查机制,发现producer没有执行事务,就不会把half消息放到真实topic中。

    3、成功发送half消息,但是提交事务或者回滚事务的消息broker没有收到,还是会被回查。

    回查的代码逻辑:broker的处理方式是间隔一定时间扫描half消息,然后发出请求向producer回查(间隔1分钟,最多15次),逻辑是BrokerController.start--->startProcessorByHa--->transactionalMessageCheckService.start()--->waitForRunning--->onWaitEnd--->getTransactionalMessageService().check--->listener.resolveHalfMsg--->sendCheckMessage给consumer,consumer收到CHECK_TRANSACTION_STATE消息,checkTransactionState--->producer.checkTransactionState--->transactionListener.checkLocalTransaction(如上文所说,检查本地a事务的情况)--->processTransactionState--->endTransactionOneway(再次提交事务) 

  • 相关阅读:
    #1071 : 小玩具
    #1063 : 缩地
    #1124 : 好矩阵
    hiho#1145 : 幻想乡的日常
    hiho#14
    hiho 毁灭者问题
    西南民大oj(递推)
    西南民大oj(矩阵快速幂)
    西南民大oj(两园交求面积)
    hdu2844(多重背包)
  • 原文地址:https://www.cnblogs.com/chuliang/p/13141913.html
Copyright © 2011-2022 走看看