zoukankan      html  css  js  c++  java
  • 五、connectionProxy分支事务注册

    所有文章

    https://www.cnblogs.com/lay2017/p/12485081.html

    正文

    上一篇关于DataSourceProxy的文章中,我们看到了一个DataSourceProxy在构造的时候将作为Resource注册到ResourceManager并通过RPC注册到Server端。

    而我们知道Connection表示的是与DataSource的通信连接,seata对Connection也进行了代理。DataSourceProxy的getConnection方法中将获取到该代理类。我们看看DataSourceProxy的getConnection方法

    @Override
    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        // 实例化一个代理类
        return new ConnectionProxy(this, targetConnection);
    }

    可以看到,从DataSourceProxy中get出来的Connection并不是原始对象,而是经过ConnectionProxy代理包装以后的对象。

    ConnectionProxy不仅包含了Connection,也包含了当前的DataSourceProxy对象,从而间接包含DataSource。

    ConnectionProxy的类图

    了解ConnectionProxy的方法之前,我们先来看看它的UML类图结构

    类图比较简单,AbstractConnectionProxy直接实现Connection接口,且包含了DataSourceProxy和原始Connection的成员变量

    public abstract class AbstractConnectionProxy implements Connection {
        /**
         * 数据源代理对象
         */
        protected DataSourceProxy dataSourceProxy;
        /**
         * 原始Connection对象
         */
        protected Connection targetConnection;
    
        // 省略
    }   

    ConnectionProxy直接继承于AbstractConnectionProxy,主要扩展了commit方法和rollback方法。

    commit注册分支事务

    我们跟进ConnectionProxy的commit方法

    @Override
    public void commit() throws SQLException {
        try {
            // 重试
            LOCK_RETRY_POLICY.execute(() -> {
                // 提交
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }

    commit方法加上了一个重试执行,跟进doCommit

    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }

    这里区分了三种情况,如果没有全局事务,没有全局锁,那么就直接调用原始的commit,提交本地事务即可。

    我们这里以全局事务为例,进入processGlobalTransactionCommit看看

    private void processGlobalTransactionCommit() throws SQLException {
        try {
            // 注册分支事务
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
    
        try {
            // 如果由undoLog
            if (context.hasUndoLog()) {
                // 插入到undo_log表中
                UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            }
            // 提交本地事务
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            // 广播本地事务提交异常
            report(false);
            throw new SQLException(ex);
        }
        // 如果分支事务正常注册、且本地事务提交,同时开启了通知
        if (IS_REPORT_SUCCESS_ENABLE) {
            // 广播分支事务提交正常
            report(true);
        }
        // 清除当前ConnectionProxy中的xid、branchId、undoLog buffer之类的,本次提交结束
        context.reset();
    }

    processGlobalTransactionCommit方法首先将RPC到Server端注册分支事务,Server端返回一个branchId到当前ConnectionContext当中。

    如果注册分支事务失败?直接向上抛出异常,由发起全局事务的地方捕获,并进行全局事务的回滚。

    如果分支事务顺利注册成功,那么就将undo_log插入到本地的undo_log表中,且提交本地事务。

    如果本地事务提交失败,意味着一阶段执行失败,那么就通过report方法通知Server端。

    如果成功呢?若开启通知的话,也通过report方法通知一阶段成功。

    最后清除当前Connection的上下文事务相关数据,本次结束。

    最后,再看看report方法吧

    private void report(boolean commitDone) throws SQLException {
        int retry = REPORT_RETRY_COUNT;
        while (retry > 0) {
            try {
                DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(), commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
                return;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                    + commitDone + "] Retry Countdown: " + retry);
                retry--;
    
                if (retry == 0) {
                    throw new SQLException("Failed to report branch status " + commitDone, ex);
                }
            }
        }
    }

    report中通过ResourceManager把分支事务的执行情况上报,如果本地事务提交,那么就报一阶段完成,如果没有提交,那么就报一阶段提交失败。

    小结:到这里我们回顾一下ConnectionProxy的commit方法干了啥?其实就是注册branch到global Transaction当中,作为其中一个分支存在。

    且本地事务将会直接提交,而不用等待全局事务提交才提交,这样对于资源来说减少了锁定时间。

    rollback通知

    前面我们说commit注册分支事务,并提交本地事务。那么如果执行失败呢?将会调用rollback方法,我们再看看rollback做了啥

    @Override
    public void rollback() throws SQLException {
        // 回滚本地事务
        targetConnection.rollback();
        // 如果在全局事务当中
        if (context.inGlobalTransaction()) {
            // 如果分支事务已经注册
            if (context.isBranchRegistered()) {
                // 发送一阶段失败通知
                report(false);
            }
        }
        // 清理上下文
        context.reset();
    }

    rollback中,先是回滚了本地事务。然后判断是否在全局事务当中,且当前分支事务已经提交,如果是的话发送一阶段的回滚到Server端,最后清理上下文。

    总结

    我们看到commit和rollback的实现逻辑里,本地的事务都会预先commit或者rollback,不会等待全局事务一起commit或者rollback。

    分支事务如果已经注册成功,那么将会通过ResourceManager来report给Server端一阶段的成功或者失败的结果。那如果分支事务注册不成功呢?将会向上抛出异常,由全局事务的发起者捕获到,并发起全局事务回滚

    总得来说,ConnectionProxy里面的commit和rollback尽量在程序正常的情况下进行分支事务的处理,而要保证最终一致性主要还是得由全局事务在Server端的二阶段处理逻辑为主。

  • 相关阅读:
    antd按需加载
    解决vscode开发react项目没有代码提示问题
    在react中配置less
    flutter之fluro导航传参数
    Flutter游戏:简单规则与结束页
    zsh: command not found:XXX
    React的安装与使用
    git stash 用法总结和注意点
    【OSS】工具类
    ajax将数组或list集合传到后台 的 【坑】
  • 原文地址:https://www.cnblogs.com/lay2017/p/12483201.html
Copyright © 2011-2022 走看看