zoukankan      html  css  js  c++  java
  • 七、seata客户端二阶段分支事务的提交和回滚

    所有文章

    https://www.cnblogs.com/lay2017/p/12485081.html

    正文

    在阅读seata自动配置相关的内容的时候,我们说过。客户端会初始化一个RMClient的RPC客户端,且同时会添加一个监听器RmMessageListener,监听器将监听来自seata的server发送的RPC消息。我们再回顾一下这段代码

    public static void init(String applicationId, String transactionServiceGroup) {
        RmRpcClient rmRpcClient = RmRpcClient.getInstance(applicationId, transactionServiceGroup);
        rmRpcClient.setResourceManager(DefaultResourceManager.get());
        rmRpcClient.setClientMessageListener(new RmMessageListener(DefaultRMHandler.get()));
        rmRpcClient.init();
    }

    RmMessageListener监听到的消息将交付给TransactionMessageHandler处理

    这里的DefaultRMHandler.get()将返回一个DefaultRMHandler的单例对象,DefaultRMHandler简介实现TransactionMessageHandler,并且组合了三种handler

    1)RMHandlerAT

    2) RMHandlerTCC

    3) RMHandlerSaga

    分别对应不同的事务模式,三种处理器和DefaultRMHandler一样都继承了AbstractRMHandler。而AbstractRMHandler包含了两个核心方法,doBranchCommit和doBranchRollback,分别用于二阶段分支事务的提交和分支事务的回滚。

    分支事务提交doBranchCommit

    跟进AbstractRMHandler的doBranchCommit方法

    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        // 选择对应的ResourceManager,调用commit
        BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
    }

    doBranchCommit方法将会获取到对应的ResourceManager,我们以AT模式为例。AT模式将会获取到DataSourceManager这个ResourceManager。

    我们跟进DataSourceManager的branchCommit方法

    private ResourceManagerInbound asyncWorker;
    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
    }

    branchCommit方法中交付给了一个异步线程处理,我们跟进AsyncWorker的branchCommit

    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
            LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid
                + "] will be handled by housekeeping later.");
        }
        return BranchStatus.PhaseTwo_Committed;
    }

    异步线程中使用了一个BlockQueue来排队处理,将会有一个Scheduler定时从BlockQueue中获取poll出来,然后进行undoLog的批量删除

    UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids, branchIds, conn);

    小结:所谓分支事务的二阶段提交,其实就是异步删除undoLog。因为一阶段的时候已经提交了本地事务,所以二阶段就非常地快速。

    分支事务回滚doBranchRollback

    和doBranchCommit,先跟进AbstractRMHandler的doBranchRollback方法

    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException {
        String xid = request.getXid();
        long branchId = request.getBranchId();
        String resourceId = request.getResourceId();
        String applicationData = request.getApplicationData();
        // 选择ResourceManager,调用rollback
        BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId, applicationData);
        response.setXid(xid);
        response.setBranchId(branchId);
        response.setBranchStatus(status);
    }

    可以看到,一样是调用ResourceManager的方法。我们同样以DataSourceManager为例,跟进DataSourceManager的branchRollback方法

    @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
        // 选择Resource
        DataSourceProxy dataSourceProxy = get(resourceId);
    
        try {
            // undo补偿
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        } catch (TransactionException te) {
            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
            } else {
                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
            }
        }
        return BranchStatus.PhaseTwo_Rollbacked;
    }

    在阅读DataSourceProxy的时候说过,DataSourceProxy将会作为一个Resource注册到ResourceManager当中。

    而在分支事务回滚的时候,将会获取到该Resource,也就是DataSourceProxy。并且执行对应数据源的undo补偿操作。

    我们跟进undo方法,看看补偿操作,方法较长,这里缩减掉一些内容

    @Override
    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
        // ...
        for (; ; ) {
            try {
                //...
    
                // 查找undoLog
                selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                selectPST.setLong(1, branchId);
                selectPST.setString(2, xid);
                rs = selectPST.executeQuery();
    
                boolean exists = false;
                while (rs.next()) {
                       // ...
                       // 反编码
                    UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer);
                    BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
    
                    try {
                        List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                        
                        for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                            //...
                            // 执行器处理undo
                            AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(dataSourceProxy.getDbType(), sqlUndoLog);
                            undoExecutor.executeOn(conn);
                        }
                    }
                }
    
                if (exists) {
                    // 删除undoLog
                    deleteUndoLog(xid, branchId, conn);
                    conn.commit();
                } else {
                    //...
                }
    
                return;
            }
            // ...
        }
    }

    我们看到,首先是查询出分支事务的undoLog。然后反序列化出undoLog数据对象,且丢给执行器去执行。执行完毕删除undoLog,且提交事务。

    由此可见,rollback就是把StatementProxy准备地undoLog拿出来,然后进行反向地补偿操作。

    总结

    本文我们简单地阅读了一下seata客户端在二阶段分支事务的commit和rollback操作做了啥。commit主要就是把undoLog删除,rollback则是获取了undoLog然后对数据进行反向生成。

    到这里,seata的客户端代码阅读部分就结束了。我们从自动配置 -> 切面 -> 数据源代理 -> 监听器这么一个流程阅读下来可以发现AT模式的核心要点就是在于数据源代理,由undoLog做反向补偿操作。

    后续,将开始server端的代码阅读...

  • 相关阅读:
    Airtest环境搭建及介绍
    再谈PHP错误与异常处理
    Composer基础
    PHP中this,self,parent的区别
    3种方法轻松处理php开发中emoji表情的问题
    php防注入和XSS攻击通用过滤.
    mysql where in 数组解决小tips
    记录搜索关键字到数据库
    获取用户id的方法
    file_get_contents('php://input') 数据如何转换成数组
  • 原文地址:https://www.cnblogs.com/lay2017/p/12491982.html
Copyright © 2011-2022 走看看