zoukankan      html  css  js  c++  java
  • Seata的分布式事务实现原理

    Seata分布式事务方案

    简介

    Seata是阿里开源的分布式事务解决方案中间件,对业务侵入小,在应用中Seata整体事务逻辑基于两阶段提交的模型,核心概念包含三个角色:

    • TM:事务发起者。用来告诉TC全局事务的开始,提交,回滚。
    • RM:事务资源,每一个RM都会作为一个分支事务注册在TC。
    • TC:事务协调者,即独立运行的seata-server,用于接收事务注册,提交和回滚。

    Seata的运行分AT和MT两种模式。还有其他的模式如SAGA,还未研究。

    AT(Auto Transaction)模式

    这个模式需要模块为Java语言,并且数据库支持本地事务。一个典型的分布式事务过程:

    • TM 向 TC 申请开启一个全局事务,全局事务创建并生成一个全局唯一的XID。
    • XID 在微服务调用链路的上下文中传播。
    • RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
    • TM 向 TC 发起针对 XID 的全局提交或回滚决议。
    • TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。

    MT(Manual Transaction)模式

    这个模式适合其他的场景,因为底层存储可能没有事务支持,需要自己实现 prepare、commit和rollback的逻辑

    源码分析

    参考 https://juejin.im/post/6844904148089962510

    初始化

    全局的两阶段提交,实际上是通过对数据源的代理实现的,Seata中的代理数据源对druid数据源做了一层代理

    两阶段提交

    在需要加全局事务的方法上,加上GlobalTransactional注解,Seata中拦截全局事务的拦截器是GlobalTransactionalInterceptor

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
        Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis())
            : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    
        final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(method, targetClass, GlobalTransactional.class);
        final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
        if (!disable && globalTransactionalAnnotation != null) {
            return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
        } else if (!disable && globalLockAnnotation != null) {
            return handleGlobalLock(methodInvocation);
        } else {
            return methodInvocation.proceed();
        }
    }
    

    调用handleGlobalTransaction方法开启全局事务;否则按普通方法执行。handleGlobalTransaction方法

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation, final GlobalTransactional globalTrxAnno) throws Throwable {
        try {
            return transactionalTemplate.execute(new TransactionalExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
    
                public String name() {
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }
    
                @Override
                public TransactionInfo getTransactionInfo() {
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getCause());
                    throw e.getCause();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
    
            }
        }
    }
    

    方法中调用了TransactionalTemplate的execute方法

    public Object execute(TransactionalExecutor business) throws Throwable {
        // 1 get transactionInfo
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 get or create a transaction
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
    
        // 1.2 Handle the Transaction propatation and the branchType
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
            switch (propagation) {
                case NOT_SUPPORTED:
                    suspendedResourcesHolder = tx.suspend(true);
                    return business.execute();
                case REQUIRES_NEW:
                    suspendedResourcesHolder = tx.suspend(true);
                    break;
                case SUPPORTS:
                    if (!existingTransaction()) {
                        return business.execute();
                    }
                    break;
                case REQUIRED:
                    break;
                case NEVER:
                    if (existingTransaction()) {
                        throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
                                        ,RootContext.getXID()));
                    } else {
                        return business.execute();
                    }
                case MANDATORY:
                    if (!existingTransaction()) {
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }
    
    
            try {
    
                // 2. begin transaction
                beginTransaction(txInfo, tx);
    
                Object rs = null;
                try {
    
                    // Do Your Business
                    rs = business.execute();
    
                } catch (Throwable ex) {
    
                    // 3.the needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
    
                // 4. everything is fine, commit.
                commitTransaction(tx);
    
                return rs;
            } finally {
                //5. clear
                triggerAfterCompletion();
                cleanUp();
            }
        } finally {
            tx.resume(suspendedResourcesHolder);
        }
    }
    

    该方法中主要有以下几个步骤:

    1. 获取事务信息,
    2. 开启事务
    3. 执行业务方法
    4. 提交事务(没有抛出异常)
    5. 回滚操作(抛出异常)

    beginTransaction最终调用了DefaultGlobalTransaction的begin方法

    @Override
    public void begin(int timeout, String name) throws TransactionException {
        if (role != GlobalTransactionRole.Launcher) {
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        if (RootContext.getXID() != null) {
            throw new IllegalStateException();
        }
        xid = transactionManager.begin(null, null, name, timeout);
        status = GlobalStatus.Begin;
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Begin new global transaction [{}]", xid);
        }
    }
    

    通过transactionManager.begin()方法通过TmRpcClient与server通信并生成一个xid,再将将xid绑定到Root上下文中。全局事务拦截成功后还是会执行原业务方法,但是由于seata代理了数据源,sql解析undolog是在代理数据源中完成的。seata不止会代理数据源,还会对Connection,Statement做代理封装。对sql解析发生在StatementProxy中

    @Override
    public ResultSet executeQuery(String sql) throws SQLException {
        this.targetSQL = sql;
        return ExecuteTemplate.execute(this, (statement, args) -> statement.executeQuery((String) args[0]), sql);
    }
    

    最终执行了ExecuteTemplate类的execute方法:

    public static <T, S extends Statement> T execute(
            List<SQLRecognizer> sqlRecognizers, StatementProxy<S> statementProxy, 
            StatementCallback<T, S> statementCallback, Object... args) throws SQLException {
        if (!shouldExecuteInATMode()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
    
        if (sqlRecognizers == null) {
            sqlRecognizers = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor;
        if (CollectionUtils.isEmpty(sqlRecognizers)) {
            executor = new PlainExecutor<>(statementProxy, statementCallback);
        } else {
            if (sqlRecognizers.size() == 1) {
                SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                switch (sqlRecognizer.getSQLType()) {
                    case INSERT:
                        executor = new InsertExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case UPDATE:
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case DELETE:
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    case SELECT_FOR_UPDATE:
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                        break;
                    default:
                        executor = new PlainExecutor<>(statementProxy, statementCallback);
                        break;
                }
            } else {
                executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
            }
        }
        T rs;
        try {
            rs = executor.execute(args);
        } catch (Throwable ex) {
            if (!(ex instanceof SQLException)) {
                // Turn other exception into SQLException
                ex = new SQLException(ex);
            }
            throw (SQLException) ex;
        }
        return rs;
    }
    

    先判断是否存在全局事务,不在全局事务中按普通方法执行,如果在全局事务中则开始解析sql,对不同的DML语句做响应的处理,再调用执行方法。具体流程为:

    1. 先判断是否开启了全局事务,如果没有,不走代理,不解析sql。
    2. 调用SQLVisitorFactory对目标sql进行解析。
    3. 针对特定类型sql操作(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE)等进行特殊解析。
    4. 执行sql并返回结果。

    关键点在于特定类型执行器里面的execute方法(下面以InsertExecutor类的execute方法举例), 调用了父类BaseTransactionalExecutor的execute方法,

    @Override
    public T execute(Object... args) throws Throwable {
        if (RootContext.inGlobalTransaction()) {
            String xid = RootContext.getXID();
            statementProxy.getConnectionProxy().bind(xid);
        }
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        return doExecute(args);
    }
    

    将XID绑定到connectionProxy中并调用了doExecute方法,这里又调用了它的子类的AbstractDMLBaseExecutor的doExecute方法

    @Override
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            return executeAutoCommitFalse(args);
        }
    }
    

    executeAutoCommitTrue方法中也会将AutoCommit属性设置为false,对sql进行解析生成undolog,防止在undolog生成之前入库。

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            connectionProxy.setAutoCommit(false);
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                T result = executeAutoCommitFalse(args);
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            connectionProxy.getContext().reset();
            connectionProxy.setAutoCommit(true);
        }
    }
    

    在将AutoCommit设置为false后会继续执行AbstractDMLBaseExecutor中的executeAutoCommitFalse(args)

      protected T executeAutoCommitFalse(Object[] args) throws Exception {
            TableRecords beforeImage = beforeImage();
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            TableRecords afterImage = afterImage(beforeImage);
            prepareUndoLog(beforeImage, afterImage);
            return result;
        }
    

    注意这是一个很关键的方法,executeAutoCommitFalse中主要分四步执行:

    1. 获取sql执行前记录快照beforeImage;
    2. 执行sql;
    3. 获取sql执行后记录快照afterimage;
    4. 根据beforeImage,afterImage生成undolog记录并添加到connectionProxy的上下文中

    生成undolog的方法,就是记录lockKey后,将beforeImage和afterImage都记录下来

    protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
        if (!beforeImage.getRows().isEmpty() || !afterImage.getRows().isEmpty()) {
            ConnectionProxy connectionProxy = this.statementProxy.getConnectionProxy();
            TableRecords lockKeyRecords = this.sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
            String lockKeys = this.buildLockKey(lockKeyRecords);
            connectionProxy.appendLockKey(lockKeys);
            SQLUndoLog sqlUndoLog = this.buildUndoItem(beforeImage, afterImage);
            connectionProxy.appendUndoLog(sqlUndoLog);
        }
    }
    
    protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
        SQLType sqlType = this.sqlRecognizer.getSQLType();
        String tableName = this.sqlRecognizer.getTableName();
        SQLUndoLog sqlUndoLog = new SQLUndoLog();
        sqlUndoLog.setSqlType(sqlType);
        sqlUndoLog.setTableName(tableName);
        sqlUndoLog.setBeforeImage(beforeImage);
        sqlUndoLog.setAfterImage(afterImage);
        return sqlUndoLog;
    }
    

    最终会通过UndoLogManager,对undolog记录进行undo或delete操作

    try {
        // put serializer name to local
        setCurrentSerializer(parser.getName());
        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
        if (sqlUndoLogs.size() > 1) {
            Collections.reverse(sqlUndoLogs);
        }
        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
            TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
            sqlUndoLog.setTableMeta(tableMeta);
            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                dataSourceProxy.getDbType(), sqlUndoLog);
            undoExecutor.executeOn(conn);
        }
    } finally {
        // remove serializer name
        removeCurrentSerializer();
    }
    

    但是在这之前,会对当前数据库中的记录和afterImage中的记录进行对比,需要相同才会继续进行

    /**
     * Data validation.
     *
     * @param conn the conn
     * @return return true if data validation is ok and need continue undo, and return false if no need continue undo.
     * @throws SQLException the sql exception such as has dirty data
     */
    protected boolean dataValidationAndGoOn(Connection conn) throws SQLException {
    
        TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
        TableRecords afterRecords = sqlUndoLog.getAfterImage();
    
        // Compare current data with before data
        // No need undo if the before data snapshot is equivalent to the after data snapshot.
        Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
        if (beforeEqualsAfterResult.getResult()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Stop rollback because there is no data change " +
                        "between the before data snapshot and the after data snapshot.");
            }
            // no need continue undo.
            return false;
        }
    
        // Validate if data is dirty.
        TableRecords currentRecords = queryCurrentRecords(conn);
        // compare with current data and after image.
        Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
        if (!afterEqualsCurrentResult.getResult()) {
    
            // If current data is not equivalent to the after data, then compare the current data with the before 
            // data, too. No need continue to undo if current data is equivalent to the before data snapshot
            Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);
            if (beforeEqualsCurrentResult.getResult()) {
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("Stop rollback because there is no data change " +
                            "between the before data snapshot and the current data snapshot.");
                }
                // no need continue undo.
                return false;
            } else {
                if (LOGGER.isInfoEnabled()) {
                    if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {
                        LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());
                    }
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("check dirty datas failed, old and new data are not equal," +
                            "tableName:[" + sqlUndoLog.getTableName() + "]," +
                            "oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
                            "newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
                }
                throw new SQLException("Has dirty records when undo.");
            }
        }
        return true;
    }
    

    从这个可以说明,Seata的分布式事务实际上全局是串行的,对于存在热点资源的情况下,会导致性能问题。

    分支事务注册与事务提交
    业务sql和undolog执行完成后会在代理连接ConnectionProxy中执行commit操作

    @Override
    public void commit() throws SQLException {
        try {
            LOCK_RETRY_POLICY.execute(() -> {
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
    
    private void doCommit() throws SQLException {
        if (context.inGlobalTransaction()) {
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            processLocalCommitWithGlobalLocks();
        } else {
            targetConnection.commit();
        }
    }
    

    如果处于全局事务中则调用processGlobalTransactionCommit处理全局事务提交;
    如果加了全局锁注释调用 processLocalCommitWithGlobalLocks()加全局锁并提交;
    其他情况直接进行事务提交。

    private void processGlobalTransactionCommit() throws SQLException {
        try {
            register();
        } catch (TransactionException e) {
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
    

    processGlobalTransactionCommit方法有以下几个操作步骤:

    1. 注册分支事务,将branchId分支绑定在上下文中。
    2. 如果包含undolog,则将之前绑定到上下文中的undolog进行入库;
    3. 提交本地事务;
    4. 如果操作失败,report()中通过RM提交第一阶段失败消息,如果成功,report()提交第一阶段成功消息
    private void report(boolean commitDone) throws SQLException {
        if (context.getBranchId() == null) {
            return;
        }
        int retry = REPORT_RETRY_COUNT;
        while (retry > 0) {
            try {
                DefaultResourceManager.get().branchReport(BranchType.AT, context.getXid(), context.getBranchId(),
                    commitDone ? BranchStatus.PhaseOne_Done : BranchStatus.PhaseOne_Failed, null);
                return;
            } catch (Throwable ex) {
                LOGGER.error("Failed to report [" + context.getBranchId() + "/" + context.getXid() + "] commit done ["
                    + commitDone + "] Retry Countdown: " + retry);
                retry--;
    
                if (retry == 0) {
                    throw new SQLException("Failed to report branch status " + commitDone, ex);
                }
            }
        }
    }
    

    由于undolog入库和业务sql的执行调用了同一个connection,处于同一个事务中,这就保证了业务sql和undolog肯定是成对存在。

    总结

    Seata的AT模式实现的是一个传统意义的分布式事务,以自动生成undolog的形式实现了各资源节点的两段式提交。
    这个方案的好处在于对现有基于MySQL, PostgreSQL和Oracle的应用可以快速实现分布式事务并且对现有代码无需大量改造,但是缺点在于整体是串行的,并且因为undolog的处理会带来额外损耗,不能解决热点资源的性能问题。
    Seata在每个子模块中增加undolog表, 利用节点数据库的单机事务保证子事务和补偿信息的原子性, 可以在分布式事务设计中借鉴

    其他: 分布式事务的模式有2PC, TCC, SAGA等, 其中SAGA有集中编排和自由编排两种形式, 分布式事务框架除了Seata, 还有Axon, ServiceComb等.

  • 相关阅读:
    使用 requests 维持会话
    使用 requests 发送 POST 请求
    使用 requests 发送 GET 请求
    requests 安装
    使用 urllib 分析 Robots 协议
    使用 urllib 解析 URL 链接
    使用 urllib 处理 HTTP 异常
    使用 urllib 处理 Cookies 信息
    使用 urllib 设置代理服务
    按单生产程序发布
  • 原文地址:https://www.cnblogs.com/milton/p/13755862.html
Copyright © 2011-2022 走看看