zoukankan      html  css  js  c++  java
  • 分布式事务解决方案之TCC(三)

    一 什么是 TCC 事务

      TCC 是 Try、Confirm、Cancel 三个词语的缩写,TCC 要求每个分支事务实现三个操作:预处理 Try、确认 Confirm、撤销 Cancel。Try 操作做业务检查及资源预留,Confirm 做业务确认操作,Cancel 实现一个与 Try 相反的操作即回滚操作。TM 首先发起所有的分支事务的 Try 操作,任何一个分支事务的 Try 操作执行失败,TM 将会发起所有分支事务的 Cancel 操作,若 Try 操作全部成功,TM 将会发起所有分支事务的 Confirm 操作,其中 Confirm/Cancel 操作若执行失败,TM 会进行重试

      

    分支事务失败的情况: 

      

    TCC分为三个阶段:

      1)Try  阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的 Confirm   一起才能真正构成一个完整的业务逻辑;

      2)Confirm 阶段是做确认提交,Try 阶段所有分支事务执行成功后开始执行 Confirm。通常情况下,采用 TCC 则认为 Confirm 阶段是不会出错的。即:只要 Try 成功,Confirm 一定成功。若 Confirm 阶段真的出错了,需引入重试机制或人工处理;

      3)Cancel  阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用 TCC 则认为 Cancel 阶段也是一定成功的。若 Cancel 阶段真的出错了,需引入重试机制或人工处理;

      4)TM 事务管理器

      TM 事务管理器可以实现为独立的服务,也可以让全局事务发起方充当 TM 的角色,TM 独立出来是为了成为公用组件,是为了考虑系统结构和软件复用;

      TM 在发起全局事务时生成全局事务记录,全局事务 ID 贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态,由于 Confirm 和 Cancel 失败需进行重试,因此需要实现幂等,幂等性是指同一个操作无论请求多少次,其结果都相同。

    二 TCC 解决方案

    目前市面上的TCC框架众多比如下面这几种:

    (以下数据采集日为2019年07月11日)

    框架名称

    Gitbub地址

    star数量

    tcc-transaction

    https://github.com/changmingxie/tcc-transaction

    3850

    Hmily

    https://github.com/yu199195/hmily

    2407

    ByteTCC

    https://github.com/liuyangming/ByteTCC

    1947

    EasyTransaction

    https://github.com/QNJR-GROUP/EasyTransaction

    1690

    上一节所讲的 Seata 也支持 TCC,但 Seata 的 TCC 模式对 SpringCloud 并没有提供支持。我们的目标是理解 TCC 的原理以及事务协调运作的过程,因此更请倾向于轻量级易于理解的框架,因此最终确定了 Hmily;

    Hmily 是一个高性能分布式事务 TCC 开源框架。基于Java语言来开发(JDK1.8),支持 Dubbo,Spring Cloud 等 RPC 框架进行分布式事务。它目前支持以下特性:

      支持嵌套事务(Nested transaction support);

      采用 Disruptor 框架进行事务日志的异步读写,与 RPC 框架的性能毫无差别。支持 SpringBoot-starter 项目启动,使用简单;

      RPC 框架支持 : dubbo,motan,springcloud;

      本地事务存储支持:redis,mongodb,zookeeper,file,mysql。事务日志序列化支持:java,hessian,kryo,protostuff;

      采用 Aspect AOP 切面思想与 Spring 无缝集成,天然支持集群;

      RPC 事务恢复,超时异常恢复等。

      Hmily 利用 AOP 对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的调用到另一方的 Try、Confirm、Cancel 方法;传递事务上下文;事务日志,进行补偿,重试等。

    Hmily 不需要事务协调服务,但需要提供一个数据库 (mysql/mongodb/zookeeper/redis/file) 来进行日志存储。

    Hmily 实现的 TCC 服务与普通的服务一样,只需要暴露一个接口,也就是它的 Try 业务。Confirm/Cancel 业务逻辑,只是因为全局事务提交/回滚的需要才提供的,因此 Confirm/Cancel 业务只需要被 Hmily TCC 事务框架发现即可,不需要被调用它的其他业务服务所感知。

    官网介绍:https://dromara.org/website/zh-cn/docs/hmily/index.html

    重点:TCC需要注意三种异常处理分别是空回滚、幂等、悬挂

    空回滚

    在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功;

    出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行 Try 段,当故障恢复后,分布式事务进行回滚则会调用二阶段的 Cancel 方法,从而形成空回滚;

    解决思路:关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过 TM 在发起全局事务时生成全局事务记录,全局事务 ID 贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

    幂等

    通过前面介绍已经了解到,为了保证 TCC 二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、Confirm Cancel 接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题;

    解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。

    悬挂

    悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行;

    出现原因是在 RPC 调用分支事务 Try 时,先注册分支事务,再执行 RPC 调用,如果此时 RPC 调用的网络发生拥堵, 通常 RPC 调用是有超时时间的,RPC 超时以后,TM 就会通知 RM 回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者真正执行,而一个  Try  方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。

    解决思路:如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行 Try。

     三 举例说明

    举例,场景为 A 转账 30 元给 BAB账户在不同的服务。方案1

    账户A

    try:
      检查余额是否够30元
      扣减30元 confirm:
      空 cancel:   增加30元

    账户B

    try:
      增加30元
    
    confirm: 
      空 cancel:   减少30元

    方案 说明:

    1) 账户A,这里的余额就是所谓的业务资源,按照前面提到的原则,在第一阶段需要检查并预留业务资源,因此,我们在扣钱 TCC 资源的 Try 接口里先检查账户余额是否足够,如果足够则扣除 30 元。 Confirm 接口表示正式提交,由于业务资源已经在 Try 接口里扣除掉了,那么在第二阶段的 Confirm 接口里可以什么都不用做。Cancel 接口的执行表示整个事务回滚,账户A回滚则需要把 Try 接口里扣除掉的 30 元还给账户。

    2) 账号B,在第一阶段 Try 接口里实现给账户 B 加钱,Cancel 接口的执行表示整个事务回滚,账户B回滚则需要把 Try 接口里加的 30 元再减去。

    方案的问题分析:

    1) 如果账户A的 Try 没有执行在 Cancel 则就多加了30元;

    2) 由于 Try,Cancel、Confirm 都是由单独的线程去调用,且会出现重复调用,所以都需要实现幂等;

    3) 账号 B 在 Try 中增加 30 元,当 Try 执行完成后可能会被其它线程给消费了;

    4) 如果账户 B 的 Try 没有执行在 Cancel 则就多减了 30 元。

    问题解决:

    1)账户A的 Cancel 方法需要判断 Try 方法是否执行,正常执行 Try 后方可执行 Cancel;

    2)Try,Cancel、Confirm 方法实现幂等。

    3) 账号B在 Try 方法中不允许更新账户金额,在 Confirm 中更新账户金额;

    4) 账户B的 Cancel 方法需要判断 Try 方法是否执行,正常执行 Try 后方可执行 Cancel。

     优化方案:

    账户A

    try:
        try幂等校验    
        try悬挂处理
        检查余额是否够30元
        扣减30元
    
    confirm: 
        空
    
    cancel:
        cancel幂等校验
        cancel空回滚处理
        增加可用余额30元

    账户B

    try:
        空
    
    confirm:
        confirm 幂等校验
        正式增加30元
    
    cancel:
        空

    四 Hmily实现TCC事务

    1)业务说明

    本实例通过 Hmily 实现 TCC 分布式事务,模拟两个账户的转账交易过程。

    两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2 是两个微服务。交易过程是,张三给李四转账指定金额。

    上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。

         

    2)dtx-tcc-demo-bank1

    dtx-tcc-demo-bank1 实现 Try 和 Cancel 方法,如下:

    try:
        try幂等校验
        try悬挂处理
        检查余额是够扣减金额
        扣减金额
    
    confirm:
        空
    
    cancel:
        cancel幂等校验
        cancel空回滚处理
        增加可用余额

    a)Dao

    @Mapper
    @Component
    public interface AccountInfoDao {
       @Update(
    "update account_info set account_balance=account_balance - #{amount} where account_balance >= #{amount} and account_no = #{accountNo} ") int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); @Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo} ") int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); /** * 增加某分支事务try执行记录 * @param localTradeNo 全局事务编号 */ @Insert("insert into local_try_log values(#{txNo}, now());") int addTry(String localTradeNo); @Insert("insert into local_confirm_log values(#{txNo}, now());") int addConfirm(String localTradeNo); @Insert("insert into local_cancel_log values(#{txNo}, now());") int addCancel(String localTradeNo); /** * 查询分支事务try是否已执行 * @param localTradeNo 全局事务编号 */ @Select("select count(1) from local_try_log where tx_no = #{txNo} ") int isExistTry(String localTradeNo); /** * 查询分支事务confirm是否已执行 * @param localTradeNo 全局事务编号 */ @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ") int isExistConfirm(String localTradeNo); /** * 查询分支事务cancel是否已执行 * @param localTradeNo 全局事务编号 */ @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ") int isExistCancel(String localTradeNo); }

    b)Service 张三转账 try - confirm - cancel 方法编码

    @Service
    @Slf4j
    public class AccountInfoServiceImpl implements AccountInfoService {
    
        @Autowired
        private AccountInfoDao accountInfoDao;
    
        @Autowired
        private Bank2Client bank2Client;
    
        /**
         * 账号扣款逻辑   就是 TCC 的 try 方法
         * TCC 强调的是最终一致性【第一个分支事务执行完毕提交,释放锁资源;第二个分支事务执行成功后提交,执行 confirm ,执行失败回滚执行 cancel 】
         * @param accountNo 账户编号
         * @param amount    扣款金额
         *
         * 1. try幂等校验【避免重复执行 try 预处理操作,出现数据的不一致情况】
         * 2. try悬挂处理【避免 cancel 操作在 try 操作之前进行】
         * 3. 检查余额是否够30元
         * 4. 扣减金额
         */
        @Hmily(confirmMethod = "commit", cancelMethod = "rollback")    // 只要标记 @Hmily 注解就是 try 方法,在注解中指定 confirm cancel 两个方法名字
        @Transactional
        @Override
        public void updateAccountBalance(String accountNo, Double amount) {
            // 获取全局事务ID
            String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
            log.info("bank1 try begin 开始执行,XID = {}", transId);
    
            // 1. try 的幂等校验【判断 local_try_log 表中是否有 try 日志记录,如果有就不再执行】
            if (accountInfoDao.isExistTry(transId) > 0) {
                log.info("bank1 try 已经执行,无需重复执行, XID = {}", transId);
                return;
            }
            // 2. try 的悬挂处理【如果 cancel confirm 有一个已经执行了, try 不再执行】
            if (accountInfoDao.isExistConfirm(transId) > 0 || accountInfoDao.isExistCancel(transId) > 0) {
                log.info("bank1 try 悬挂处理,cancel 或 confirm 已经执行,不允许执行 try,XID = {}", transId);
                return;
            }
            // 3. 扣减金额,需要判断账户金额
            if (accountInfoDao.subtractAccountBalance(accountNo, amount) <= 0) {
                // 扣减失败
                throw new RuntimeException("bank1 try 扣减金额失败,XID = " + transId);
            }
            // 4. 插入一条 try 预处理的执行记录,实现 try 预处理的幂等性
            accountInfoDao.addTry(transId);
    
            // 远程调用李四,转账
            if (!bank2Client.transfer(amount)) {
                throw new RuntimeException("bank1 远程调用李四微服务失败,XID = " + transId);
            }
            // 人为制造异常
            if (amount == 2) {
                throw new RuntimeException("bank1 人为制造异常,XID = " + transId);
            }
        }
    
        /**
         * confirm 方法
         * 注意:方法参数需要和 try 方法参数保持一致
         */
        public void commit(String accountNo, Double amount) {
            String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
            log.info("bank1 commit begin 开始执行,XID = {}", transId);
        }
    
        /**
         * cancel 方法
         * 注意:方法参数需要和 try 方法参数保持一致
         * 1. cancel 幂等校验【避免重复执行 cancel 回滚操作,出现数据的不一致情况】
         * 2. cancel 空回滚处理【try 预处理执行完成,cancel 需要先判断 try 是否执行完成,然后再进行回滚处理,否则会出现数据不一致情况】
         * 3. 增加可用余额
         */
        @Transactional
        public void rollback(String accountNo, Double amount) {
            // 获取全局事务ID
            String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
            log.info("bank1 rollback begin 开始执行,XID = {}", transId);
    
            // 1. cancel 幂等校验【避免重复执行 cancel 回滚操作,出现数据的不一致情况】
            if (accountInfoDao.isExistCancel(transId) > 0 ) {
                log.info("bank1 cancel 已经执行,无需重复执行, XID = {}", transId);
            }
            // 2. cancel 空回滚处理【如果 try 没有执行,cancel 不能执行】
            if (accountInfoDao.isExistTry(transId) <= 0) {
                log.info("bank1 空回滚处理, try 没有执行,cancel 不允许执行,XID = {}", transId);
                return;
            }
            // 3. 增加可用余额
            accountInfoDao.addAccountBalance(accountNo, amount);
            // 4. 插入一条 cancel 的执行记录
            accountInfoDao.addCancel(transId);
        }
    }

    c)openFeign 远程调用李四微服务

    @FeignClient(value = "tcc-demo-bank2", fallback = Bank2ClientFallback.class)      // 指定调用微服务的服务名,以及服务熔断降级失败调用类
    public interface Bank2Client {
    
        /**
         * 远程调用李四的微服务
         */
        @GetMapping("/bank2/transfer")
        @Hmily      // 需要将张三转账的全局事务信息带到下游李四的微服务中
        Boolean transfer(@RequestParam("amount") Double amount);
    }
    
    @Component
    public class Bank2ClientFallback implements Bank2Client {
    
        // 降级方法
        @Override
        public Boolean transfer(Double amount) {
            return false;
        }
    }

    d)Controller

    @RestController
    @Slf4j
    public class Bank1Controller {
    
        @Autowired
        private AccountInfoService accountInfoService;
    
        // 张三转账
        @GetMapping("/transfer")
        public Boolean transfer(@RequestParam("amount") Double amount) {
            accountInfoService.updateAccountBalance("1", amount);
            return true;
        }
    
    }

    3)dtx-tcc-demo-bank2

    dtx-tcc-demo-bank2 实现如下功能:

    try:
        空
    
    confirm:
        confirm 幂等校验
        正式增加金额
    
    cancel:
        空

    a)Dao 

    @Component
    @Mapper
    public interface AccountInfoDao {
    
        @Update("update account_info set account_balance=account_balance + #{amount} where  account_no=#{accountNo} ")
        int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
        
        /**
         * 增加某分支事务try执行记录
         * @param localTradeNo 全局事务编号
         * @return
         */
        @Insert("insert into local_try_log values(#{txNo},now());")
        int addTry(String localTradeNo);
    
        @Insert("insert into local_confirm_log values(#{txNo},now());")
        int addConfirm(String localTradeNo);
    
        @Insert("insert into local_cancel_log values(#{txNo},now());")
        int addCancel(String localTradeNo);
    
        /**
         * 查询分支事务try是否已执行
         * @param localTradeNo 全局事务编号
         * @return
         */
        @Select("select count(1) from local_try_log where tx_no = #{txNo} ")
        int isExistTry(String localTradeNo);
        /**
         * 查询分支事务confirm是否已执行
         * @param localTradeNo 全局事务编号
         * @return
         */
        @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
        int isExistConfirm(String localTradeNo);
    
        /**
         * 查询分支事务cancel是否已执行
         * @param localTradeNo 全局事务编号
         * @return
         */
        @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
        int isExistCancel(String localTradeNo);
    
    }

    b)Service 李四收账 try - confirm - try 方法代码编写

    @Service
    @Slf4j
    public class AccountInfoServiceImpl implements AccountInfoService {
    
        @Autowired
        private AccountInfoDao accountInfoDao;
    
        @Hmily(confirmMethod = "commit", cancelMethod = "rollback")    // 只要标记 @Hmily 注解就是 try 方法,在注解中指定 confirm cancel 两个方法名字
        @Override
        public void updateAccountBalance(String accountNo, Double amount) {
            // 获取全局事务ID
            String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
            log.info("bank2 try begin 开始执行,XID = {}", transId);
        }
    
        /**
         * confirm 方法
         * 注意:方法参数需要和 try 方法参数保持一致
         * 1. confirm 幂等校验【confirm 执行失败时,会重复执行,需要幂等校验操作,避免数据出现不一致】
         * 2. 正式增加30元
         */
        @Transactional
        public void commit(String accountNo, Double amount) {
            // 获取全局事务ID
            String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
            log.info("bank2 commit begin 开始执行,XID = {}", transId);
            // 1. confirm 幂等性校验
            if(accountInfoDao.isExistConfirm(transId) > 0) {
                log.info("bank2 commit 已经执行,无需重复执行,XID = {}", transId);
                return;
            }
            // 2. 增加金额
            accountInfoDao.addAccountBalance(accountNo, amount);
    
            // 3. 插入一条 confirm 确认的执行记录,实现 confirm 确认执行的幂等性
            accountInfoDao.addConfirm(transId);
        }
    
        /**
         * cancel 方法
         * 注意:方法参数需要和 try 方法参数保持一致
         */
        public void rollback(String accountNo, Double amount) {
            // 获取全局事务ID
            String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
            log.info("bank2 rollback begin 开始执行,XID = {}", transId);
        }
    }

    c)controller

    @RestController
    @Slf4j
    public class Bank2Controller {
    
        @Autowired
        private AccountInfoService accountInfoService;
    
        // 接收张三转账
        @GetMapping("/transfer")
        public Boolean transfer(@RequestParam("amount") Double amount) {
            accountInfoService.updateAccountBalance("2", amount);
            return true;
        }
    
    }

    五 总结

      如果拿 TCC 事务的处理流程与 2PC 两阶段提交做比较,2PC 通常都是在跨库的 DB 层面,而 TCC 则在应用层面的处理,需要通过业务逻辑来实现。这种分布式事务的实现方式的优势在于,可以让应用自己定义数据操作的粒度,使降低锁冲突、提高吞吐量成为可能

      而不足之处则在于对应用的侵入性非常强,业务逻辑的每个分支都需要实现 try、confirm、cancel 三个操作。此外,其实现难度也比较大,需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。

      每天进步一点点......

  • 相关阅读:
    2019.8.15刷题统计
    2019.8.12刷题统计
    2019.8.11刷题统计
    2019.8.10刷题统计
    2019.8.9刷题统计
    2019.8.8刷题统计
    2019.8.7刷题统计
    2019.8.6刷题统计
    xuezhan.org 6.28
    xuezhan.org 6.27
  • 原文地址:https://www.cnblogs.com/blogtech/p/14504560.html
Copyright © 2011-2022 走看看