zoukankan      html  css  js  c++  java
  • Seata的分布式事务实现原理

    Seata分布式事务方案

    简介

    Seata是阿里开源的分布式事务解决方案中间件,对业务侵入小,在应用中Seata整体事务逻辑基于两阶段提交的模型,核心概念包含三个角色:

    • TM:事务发起者。用来告诉TC全局事务的开始,提交,回滚。
    • RM:事务资源,每一个RM都会作为一个分支事务注册在TC。
    • TC:事务协调者,即独立运行的seata-server,用于接收事务注册,提交和回滚。

    Seata的运行分AT和MT两种模式。还有其他的模式如SAGA,还未研究。

    AT(Auto Transaction)模式

    这个模式需要模块为Java语言,并且数据库支持本地事务。一个典型的分布式事务过程:

    • TM 向 TC 申请开启一个全局事务,全局事务创建并生成一个全局唯一的XID。
    • XID 在微服务调用链路的上下文中传播。
    • RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
    • TM 向 TC 发起针对 XID 的全局提交或回滚决议。
    • TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

    MT(Manual Transaction)模式

    这个模式适合其他的场景,因为底层存储可能没有事务支持,需要自己实现 prepare、commit和rollback的逻辑

    源码分析

    参考 https://juejin.im/post/6844904148089962510

    初始化

    全局的两阶段提交,实际上是通过对数据源的代理实现的,Seata中的代理数据源对druid数据源做了一层代理

    两阶段提交

    在需要加全局事务的方法上,加上GlobalTransactional注解,Seata中拦截全局事务的拦截器是GlobalTransactionalInterceptor

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
            : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    
        final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(method, targetClass, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        if (!disable && globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (!disable && globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }
    

    调用handleGlobalTransaction方法开启全局事务;否则按普通方法执行。handleGlobalTransaction方法

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
    
                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
    
                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
    
            }
        }
    }
    

    方法中调用了TransactionalTemplate的execute方法

    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    
        // 1.2 Handle the Transaction propatation and the branchType
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    suspendedResourcesHolder = tx.suspend(true);
                    return business.execute();
                case REQUIRES_NEW:
                    suspendedResourcesHolder = tx.suspend(true);
                    break;
                case SUPPORTS:
                    if (!existingTransaction()) {
                        return business.execute();
                    }
                    break;
                case REQUIRED:
                    break;
                case NEVER:
                    if (existingTransaction()) {
                        throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
                                        ,RootContext.getXID()));
                    } else {
                        return business.execute();
                    }
                case MANDATORY:
                    if (!existingTransaction()) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }
    
    
            try {
    
                // 2. begin transaction
                beginTransaction(txInfo, tx);
    
                Object rs = null;
                try {
    
                    // Do Your Business
                    rs = business.execute();
    
                } catch (Throwable ex) {
    
                    // 3.the needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
    
                // 4. everything is fine, commit.
                commitTransaction(tx);
    
                return rs;
            } finally {
                //5. clear
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            tx.resume(suspendedResourcesHolder);
        }
    }
    

    该方法中主要有以下几个步骤:

    1. 获取事务信息,
    2. 开启事务
    3. 执行业务方法
    4. 提交事务(没有抛出异常)
    5. 回滚操作(抛出异常)

    beginTransaction最终调用了DefaultGlobalTransaction的begin方法

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }
    

    通过transactionManager.begin()方法通过TmRpcClient与server通信并生成一个xid,再将将xid绑定到Root上下文中。全局事务拦截成功后还是会执行原业务方法,但是由于seata代理了数据源,sql解析undolog是在代理数据源中完成的。seata不止会代理数据源,还会对Connection,Statement做代理封装。对sql解析发生在StatementProxy中

    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
    }
    

    最终执行了ExecuteTemplate类的execute方法:

    public static <T, S extends Statement> T execute(
            List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, 
            StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
        if (!shouldExecuteInATMode()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
    
        if (sqlRecognizers == null) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = new InsertExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }
    

    先判断是否存在全局事务,不在全局事务中按普通方法执行,如果在全局事务中则开始解析sql,对不同的DML语句做响应的处理,再调用执行方法。具体流程为:

    1. 先判断是否开启了全局事务,如果没有,不走代理,不解析sql。
    2. 调用SQLVisitorFactory对目标sql进行解析。
    3. 针对特定类型sql操作(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE)等进行特殊解析。
    4. 执行sql并返回结果。

    关键点在于特定类型执行器里面的execute方法(下面以InsertExecutor类的execute方法举例), 调用了父类BaseTransactionalExecutor的execute方法,

    @Override
    public T execute(Object... args) throws Throwable {
        if (RootContext.inGlobalTransaction()) {
            String xid = RootContext.getXID();
            statementProxy.getConnectionProxy().bind(xid);
        }
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);
    }
    

    将XID绑定到connectionProxy中并调用了doExecute方法,这里又调用了它的子类的AbstractDMLBaseExecutor的doExecute方法

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
    

    executeAutoCommitTrue方法中也会将AutoCommit属性设置为false,对sql进行解析生成undolog,防止在undolog生成之前入库。

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            connectionProxy.setAutoCommit(false);
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }
    

    在将AutoCommit设置为false后会继续执行AbstractDMLBaseExecutor中的executeAutoCommitFalse(args)

      protected T executeAutoCommitFalse(Object[] args) throws Exception {
            TableRecords beforeImage = beforeImage();
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            TableRecords afterImage = afterImage(beforeImage);
            prepareUndoLog(beforeImage, afterImage);
            return result;
        }
    

    注意这是一个很关键的方法,executeAutoCommitFalse中主要分四步执行:

    1. 获取sql执行前记录快照beforeImage;
    2. 执行sql;
    3. 获取sql执行后记录快照afterimage;
    4. 根据beforeImage,afterImage生成undolog记录并添加到connectionProxy的上下文中

    生成undolog的方法,就是记录lockKey后,将beforeImage和afterImage都记录下来

    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (!beforeImage.getRows().isEmpty() || !afterImage.getRows().isEmpty()) {
            ConnectionProxy connectionProxy = this.statementProxy.getConnectionProxy();
            TableRecords lockKeyRecords = this.sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
            String lockKeys = this.buildLockKey(lockKeyRecords);
            connectionProxy.appendLockKey(lockKeys);
            SQLUndoLog sqlUndoLog = this.buildUndoItem(beforeImage, afterImage);
            connectionProxy.appendUndoLog(sqlUndoLog);
        }
    }
    
    protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
        SQLType sqlType = this.sqlRecognizer.getSQLType();
        String tableName = this.sqlRecognizer.getTableName();
        SQLUndoLog sqlUndoLog = new SQLUndoLog();
        sqlUndoLog.setSqlType(sqlType);
        sqlUndoLog.setTableName(tableName);
        sqlUndoLog.setBeforeImage(beforeImage);
        sqlUndoLog.setAfterImage(afterImage);
        return sqlUndoLog;
    }
    

    最终会通过UndoLogManager,对undolog记录进行undo或delete操作

    try {
        // put serializer name to local
        setCurrentSerializer(parser.getName());
        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
        if (sqlUndoLogs.size() > 1) {
            Collections.reverse(sqlUndoLogs);
        }
        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
            sqlUndoLog.setTableMeta(tableMeta);
            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                dataSourceProxy.getDbType(), sqlUndoLog);
            undoExecutor.executeOn(conn);
        }
    } finally {
        // remove serializer name
        removeCurrentSerializer();
    }
    

    但是在这之前,会对当前数据库中的记录和afterImage中的记录进行对比,需要相同才会继续进行

    /**
     * Data validation.
     *
     * @param conn the conn
     * @return return true if data validation is ok and need continue undo, and return false if no need continue undo.
     * @throws SQLException the sql exception such as has dirty data
     */
    protected boolean dataValidationAndGoOn(Connection conn) throws SQLException {
    
        TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
        TableRecords afterRecords = sqlUndoLog.getAfterImage();
    
        // Compare current data with before data
        // No need undo if the before data snapshot is equivalent to the after data snapshot.
        Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
        if (beforeEqualsAfterResult.getResult()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Stop rollback because there is no data change " +
                        "between the before data snapshot and the after data snapshot.");
            }
            // no need continue undo.
            return false;
        }
    
        // Validate if data is dirty.
        TableRecords currentRecords = queryCurrentRecords(conn);
        // compare with current data and after image.
        Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
        if (!afterEqualsCurrentResult.getResult()) {
    
            // If current data is not equivalent to the after data, then compare the current data with the before 
            // data, too. No need continue to undo if current data is equivalent to the before data snapshot
            Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);
            if (beforeEqualsCurrentResult.getResult()) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Stop rollback because there is no data change " +
                            "between the before data snapshot and the current data snapshot.");
                }
                // no need continue undo.
                return false;
            } else {
                if (LOGGER.isInfoEnabled()) {
                    if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {
                        LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());
                    }
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("check dirty datas failed, old and new data are not equal," +
                            "tableName:[" + sqlUndoLog.getTableName() + "]," +
                            "oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
                            "newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
                }
                throw new SQLException("Has dirty records when undo.");
            }
        }
        return true;
    }
    

    从这个可以说明,Seata的分布式事务实际上全局是串行的,对于存在热点资源的情况下,会导致性能问题。

    分支事务注册与事务提交
    业务sql和undolog执行完成后会在代理连接ConnectionProxy中执行commit操作

    @Override
    public void commit() throws SQLException {
        try {
            LOCK_RETRY_POLICY.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
    
    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }
    

    如果处于全局事务中则调用processGlobalTransactionCommit处理全局事务提交;
    如果加了全局锁注释调用 processLocalCommitWithGlobalLocks()加全局锁并提交;
    其他情况直接进行事务提交。

    private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
    

    processGlobalTransactionCommit方法有以下几个操作步骤:

    1. 注册分支事务,将branchId分支绑定在上下文中。
    2. 如果包含undolog,则将之前绑定到上下文中的undolog进行入库;
    3. 提交本地事务;
    4. 如果操作失败,report()中通过RM提交第一阶段失败消息,如果成功,report()提交第一阶段成功消息
    private void report(boolean commitDone) throws SQLException {
        if (context.getBranchId() == null) {
            return;
        }
        int retry = REPORT_RETRY_COUNT;
        while (retry > 0) {
            try {
                DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                    commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
                return;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                    + commitDone + "] Retry Countdown: " + retry);
                retry--;
    
                if (retry == 0) {
                    throw new SQLException("Failed to report branch status " + commitDone, ex);
                }
            }
        }
    }
    

    由于undolog入库和业务sql的执行调用了同一个connection,处于同一个事务中,这就保证了业务sql和undolog肯定是成对存在。

    总结

    Seata的AT模式实现的是一个传统意义的分布式事务,以自动生成undolog的形式实现了各资源节点的两段式提交。
    这个方案的好处在于对现有基于MySQL, PostgreSQL和Oracle的应用可以快速实现分布式事务并且对现有代码无需大量改造,但是缺点在于整体是串行的,并且因为undolog的处理会带来额外损耗,不能解决热点资源的性能问题。
    Seata在每个子模块中增加undolog表, 利用节点数据库的单机事务保证子事务和补偿信息的原子性, 可以在分布式事务设计中借鉴

    其他: 分布式事务的模式有2PC, TCC, SAGA等, 其中SAGA有集中编排和自由编排两种形式, 分布式事务框架除了Seata, 还有Axon, ServiceComb等.

  • 相关阅读:
    全排列
    RazorPages中的绑定
    SQL Server安装步骤
    2020-2021---开发工作总述
    C#.NET编程的特点
    VS自带Git的使用
    从apk反编译出.java文件
    基于页面的编程模型+关于设计的表达
    XtraReport注意事项
    Android总结
  • 原文地址:https://www.cnblogs.com/milton/p/13755862.html
Copyright © 2011-2022 走看看