zoukankan      html  css  js  c++  java
  • 分布式事务:Seata框架AT模式及TCC模式执行流程剖析

    Seata角色术语

    TC - 事务协调者

    维护全局和分支事务的状态,驱动全局事务提交或回滚,即Seata服务端。

    TM - 事务管理器

    定义全局事务的范围:开始全局事务、提交或回滚全局事务,在事务发起的客户端。

    RM - 资源管理器

    管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚,在分支事务执行的客户端。

    Seata执行流程

    Seata AT模式

    流程图解

    第一阶段

    通过代理数据源DataSourceProxy对业务SQL进行解析,转换成undolog,并与业务SQL在一个事务内入库,然后注册分支事务、提交、上报状态。

    第二阶段

    分布式事务操作成功,则TC通知RM异步删除undolog。

    分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。

    工作机制

    以一个示例来说明整个 AT 分支的工作过程。

    业务表:product

    Field

    Type

    Key

    id

    bigint(20)

    PRI

    name

    varchar(100)

     

    since

    varchar(100)

     

    AT 分支事务的业务逻辑:

    update product set name = 'GTS' where name = 'TXC';
    一阶段

    过程:

    1. 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = 'TXC')等相关的信息。
    2. 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
    select id, name, since from product where name = 'TXC';

    得到前镜像:

    id

    name

    since

    1

    TXC

    2014

    1. 执行业务 SQL:更新这条记录的 name 为 'GTS'。
    2. 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
    select id, name, since from product where id = 1;

    得到后镜像:

    id

    name

    since

    1

    GTS

    2014

    1. 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
    {
        "branchId": 641789253,
        "undoItems": [{
            "afterImage": {
                "rows": [{
                    "fields": [{
                        "name": "id",
                        "type": 4,
                        "value": 1
                    }, {
                        "name": "name",
                        "type": 12,
                        "value": "GTS"
                    }, {
                        "name": "since",
                        "type": 12,
                        "value": "2014"
                    }]
                }],
                "tableName": "product"
            },
            "beforeImage": {
                "rows": [{
                    "fields": [{
                        "name": "id",
                        "type": 4,
                        "value": 1
                    }, {
                        "name": "name",
                        "type": 12,
                        "value": "TXC"
                    }, {
                        "name": "since",
                        "type": 12,
                        "value": "2014"
                    }]
                }],
                "tableName": "product"
            },
            "sqlType": "UPDATE"
        }],
        "xid": "xid:xxx"
    }
    1. 提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁 。
    2. 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
    3. 将本地事务提交的结果上报给 TC。
    二阶段-回滚
    1. 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
    2. 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
    3. 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理,详细的说明在另外的文档中介绍。
    4. 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
    update product set name = 'TXC' where id = 1;
    1. 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
    二阶段-提交
    1. 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
    2. 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

    事务使用

    我们只需要使用一个 @GlobalTransactional 注解在业务方法上:

     @GlobalTransactional
        public void purchase(String userId, String commodityCode, int orderCount) {
            ......
        }

    源码分析

    相关配置

    SeataAutoConfiguration

    Spring自动配置类中配置了全局事务扫描器GlobalTransactionScanner。

    GlobalTransactionScanner

    GlobalTransactionScanner内容如下。

    可以看到分别实现了Spring的3个接口InitializingBean,ApplicationContextAware,DisposableBean。

    在afterPropertiesSet()中,调用了initClient方法:

    initClient方法里面对TmClient,RmClient进行了初始化(参数就是配置文件bean里配置的applicationId和txServiceGroup),并注册了一个Spring的ShutdownHook。

    TmClient.init()

    TmClient的初始化方法。

    其最终调用到的是AbstractNettyRemotingClient的init()方法,启动了一个定时器不断进行重连操作。

    NettyClientChannelManager的reconnect方法内容如下:

    方法getAvailServerList内从注册中心获取服务器列表。

    RegistryFactory.getInstance().lookup(transactionServiceGroup)是针对不同注册中心做了适配的,默认看下File形式的实现。

    进到FileRegistryServiceImpl#lookup方法,这里结合File.conf配置来说明。

     

    FileRegistryServiceImpl的服务器查找方法如下:

     

    1、现根据事务分组(key=vgroup_mapping.事务分组名称)找到分组所属的server集群名称,这里是default

    2、然后根据集群名称(key=集群名称.grouplist)找到server对应ip端口地址

    梳理下TmClient的初始化流程:

    1. 启动定时执行器,每10秒尝试进行一次重连seata-server
    2. 重连时,先从file.conf中根据分组名称(service_group)找到集群名称(cluster_name)
    3. 再根据集群名称找到seata-serverr集群ip端口列表
    4. 从ip列表中选择一个用netty进行连接
    RmClient.init()

    RmClient的初始化方法。

    1、设置了资源管理器resourceManager

    2、设置了消息回调监听器,rmHandler用于接收seata-server在二阶段发出的提交或者回滚请求

    ResourceManager管理资源的注册和注销。

    RMHandlerAT在收到TC二阶段回滚消息时执行回滚。

    第一阶段

    拦截器中开启事务

    在需要加全局事务的方法中,会加上GlobalTransactional注解,注解往往对应着拦截器,Seata中拦截全局事务的拦截器是GlobalTransactionalInterceptor,看下其拦截方法。

    判断:

    l  如果方法上有全局事务注解,调用handleGlobalTransaction开启全局事务

    l  如果没有,按普通方法执行,避免性能下降

    看下handleGlobalTransaction()方法:

    可以看到最终调用的是TransactionalTemplate的execute方法,execute方法如下:

    分为几步:

    l  开启全局事务beginTransaction

    l  执行业务方法

    l  提交事务commitTransaction(若没抛异常)

    l  执行completeTransactionAfterThrowing回滚操作(抛异常)

    beginTransaction最终调用到了io.seata.tm.api.DefaultGlobalTransaction#begin(int, java.lang.String)方法,代码如下:

    1. 调用transactionManager.begin()方法通过TmNettyRemotingClient与server通信并生成一个xid
    2. 将xid绑定到Root上下文中

    看到这里,也就明确了一点,全局事务开启时,是由TM来发起的。

    commitTransaction和rollbackTransaction方法类似,由TM发送事务commit或rollback信息给seata-server。

    SQL解析与undolog生成

    由于Seata对数据源做了代理,所以sql解析与undolog入库操作是在数据源代理中执行的。数据源代理类创建器的配置。

    DataSourceProxy,ConnectionProxy,StatementProxy是Seata提供的代理封装类。

    最终对Sql进行解析操作,发生在StatementProxy类中:

    然后交给了ExecuteTemplate执行,跟到ExecuteTemplate中查看:

    1. 先判断是否开启了全局事务,如果没有,不走代理,不解析sql,避免性能下降
    2. 调用SQLVisitorFactory对目标sql进行解析
    3. 针对特定类型sql操作(INSERT,UPDATE,DELETE,SELECT_FOR_UPDATE)等进行特殊解析
    4. 执行sql并返回结果

    关键点在于特定类型执行器中的execute方法,挑选InsertExecutor为例说明,其execute方法调用的是父类BaseTransactionalExecutor中的execute方法,看下源码。

    将ROOT上下文中的xid绑定到了connectionProxy中,并调用了doExecute方法,看下AbstractDMLBaseExecutor中的doExecute方法。

    查看代码,生成undolog在executeAutoCommitFalse方法中:

    executeAutoCommitTrue中先将autoCommit设置为false(因为要对sql进行解析,生成undolog在一个事务中入库,避免提前入库)。

    再执行到executeAutoCommitFalse中,分为4步:

    1. 获取sql执行前镜像beforeImage
    2. 执行sql
    3. 获取sql执行后afterimage
    4. 根据beforeImage,afterImage生成undolog记录并添加到connectionProxy的上下文中

    到此为止,红色框中几步已经完成。

    分支事务注册与事务提交

    业务sql执行以及undolog执行完后会在ConnectionProxy中执行commit操作,

    看下代码。

    1、如果处于全局事务中,则调用processGlobalTransactionCommit()处理全局事务提交

    2、如果加了全局锁注解,加全局锁并提交

    3、如果没有对应注释,按直接进行事务提交

    主要看processGlobalTransactionCommit()方法,也是核心代码:

    流程分为如下几步:

    1. 注册分支事务register(),并将branchId分支id绑定到上下文中。
    2. UndoLogManager.flushUndoLogs(this) 如果包含undolog,则将之前绑定到上下文中的undolog进行入库。
    3. 提交本地事务。
    4. 如果操作失败,report()中通过RM提交第一阶段失败消息,如果成功,report()提交第一阶段成功消息。

    undolog入库和普通业务sql的执行用的一个connection,处于一个本地事务中,保证了业务数据变更时,一定会有对应undolog存在。

    至此,第一阶段中undolog提交与本地事务提交,分支事务注册与汇报也已完成。

    第二阶段

    在前面分析RmClient.init()方法时,提到了Seata会使用SPI拓展机制找到RmClient的回调处理器RMHandlerAT,该类是负责接送二阶段seata-server发给RmClient的提交、回滚消息,并作出提交,回滚操作。

    RMHandlerAT继承自AbstractRMHandler,AbstractRMHandler中两个handle方法对应,事务提交、回滚操作。

    全局事务提交

    全局事务提交对应了doBranchCommit(request, response)方法。

    调用的是getResourceManager(),上面提到SPI拓展提到的DataSourceManager类。

    DataSourceManager中调用了asyncWorker来异步提交,看下AsyncWorker中branchCommit方法。

    这边只是往一个ASYNC_COMMIT_BUFFER缓冲List中新增了一个二阶段提交的context。

    但真正提交在哪呢?答案在AsyncWorker的init()方法中,其init()方法会在DataSourceManager中被调用,内部启动一个定时器不断进行全局事务提交操作。

    真正的分支事务提交就是在doBranchCommits中完成的,主要工作是删除回滚日志。

    主要分为几步:

    1. 先按resourceId(也就是数据连接)对提交操作进行分组,一个数据库的可以一起操作,提升效率
    2. 根据resourceId找到对应DataSourceProxy,并获取一个普通的数据库连接getPlainConnection(),估计这本身不需要做代理操作,故用了普通的数据库连接
    3. 调用UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn)删除undolog

    回过头来看下设计原理图:

    全局事务回滚

    接下来我们看看全局事务回滚的方法,AbstractRMHandler#doBranchRollback 。

    该方法调用了DataSourceManager的branchRollback方法。

    最终回滚方法调用的是UndoLogManager.undo(dataSourceProxy, xid, branchId),大体是根据Undolog进行反解析并执行回滚操作。

    然后进行回滚日志的清理和提交。

    具体的Undolog反解析操作实现在AbstractUndoExecutor的子类中。

    再回头看下回滚设计原理图:

    Seata TCC模式

    流程图解

    TCC执行流程如下图所示:

    大致流程如下:

    1.全局事务拦截器拦截到@GlobalTransational注解,调用TM开启全局事务

    2.执行TCC参与者的prepare方法时,被TCC拦截器拦截,在prepare方法执

    行前注册分支事务到TC,在prepare方法执行后向TC报告分支事务的状态

    3.如果执行发生异常则TM通知TC回滚事务,否则TM通知TC执行提交事务

    4.TC收到TM的提交或回滚通知,遍历各TCC分支事务,逐个进行提交或回滚

    事务使用

    跟AT模式一样,TCC模式也通过@GlobalTransactional注解开启全局事务,然后调用各个两阶段参与者的prepare方法即可。

    两阶段的参与者格式如下:

    l  TwoPhaseBusinessAction注解标记这是个TCC接口,同时指定commitMethod,rollbackMethod的名称

    l  BusinessActionContext是TCC事务中的上下文对象

    l  BusinessActionContextParameter注解标记的参数会在上下文中传播,即能通过BusinessActionContext对象在commit方法及cancle方法中取到该参数值

    源码分析

    TCC资源注册

    GlobalTransactionScanner继承了AbstractAutoProxyCreator抽象类,并重新实现了wrapIfNecessary接口,该接口用来在spring启动时,生成代理类。

    看下重写的wrapIfNecessary方法。

    可以看到这段逻辑中,判断了bean如果是个TCC的接口实现,则将拦截器初始化为TccActionInterceptor,TccActionInterceptor是TCC方法的核心拦截器,后面会具体介绍,先跟到TCCBeanParserUtils.isTccAutoProxy()中看下源码。

    isTccAutoProxy()中又会调用DefaultRemotingParser#parserRemotingServiceInfo来进行TCC资源注册。

     

    可以看到,通过反射拿到了TwoPhaseBusinessAction注解中声明的Commit方法和Rollback方法并封装成TCCResource对象,最终调用ResourceManager的registerResource方法。

    TCC模式下ResourceManger的实现为TCCResourceManager,AbstractRMHandler的实现为RMHandlerTCC。

    跟到TCCResourceManager中查看registerResource方法。

    看到将TCCResource对象存储在本地Map中,方便后续通过ResourceId找到对应Resource来进行提交,回滚操作。super.registerResource代码如下,通过RmNettyRemotingClient发送rpc请求给Seata-server进行资源注册。

    至此,本地内存中会有个TCCResourceCache,注册完成后,seata-server端也会有个TCC的资源列表。

    服务端接收RM注册信息的接口在DefaultServerMessageListenerImpl 的onRegRmMessage中,看下代码。

    最终调用了ChannelManager.registerRMChannel方法。

    服务端也会对RpcContext进行缓存,缓存Map嵌套层次较多,最外层key为resourceId,往内一次是applicationId,clientIIp,port。

    至此,TCC资源管理器RM已完成注册,本地及服务端均有以resourceId为key的缓存Map。

    开启TCC全局事务

    TCC模式业务调用方和AT模式一样,需要使用GlobalTransactional注解来开启全局事务。

    业务方法执行时,最终会被AT模式源码分析中提到过的拦截器GlobalTransactionalInterceptor拦截,开启一个全局事务,获得全局事务id,即xid。

    具体代码是TransactionalTemplate的execute方法,execute方法如下:

    分为一下几步:

    1.开启全局事务beginTransaction(TM与TC通信并获得Xid)

    服务端接收全局事务开启请求的方法在DefaultCore的begin方法中,可以看到创建了一个GlabalSession。

    2.执行业务方法

    3.提交事务commitTransaction(TM与TC通信,发起事务提交请求)

    4.如果发生异常,执行completeTransactionAfterThrowing回滚操作(TM与TC通信,发起事务回滚请求)

    TCC拦截器-注册分支事务

    TCC注册过程分析时,如果bean是个TCC的bean(即bean中方法包含TwoPhaseBusinessAction注解),会初始出TccActionInterceptor拦截器,其实现了MethodInterceptor,这也是TCC接口的方法级别核心拦截器。

    看下源码中的invoke方法:

    方法调用了actionInterceptorHandler.proceed方法:

    接着看doTccActionLogStore方法:

     

    服务端接收分支注册的代码也在DefaultCore(见Seata源码)中,代码如下:

    至此,TCC分支事务注册完毕。

    全局事务提交

    TransactionalTemplate的execute方法中,若业务执行无异常,则会调用commitTransaction方法。

    最终调用的DefaultGlobalTransaction的commit方法。

    其中调用TM的commit方法,来通知TC对全局事务进行提交。

    TC收到commit消息的处理在DefaultCore(见Seata源码)的commit方法中,查看代码:

    分为几步:

    1. 根据xid取出GlabalSession
    2. 关闭Session,防止再有分支注册尽量
    3. 修改状态为提交中
    4. 如果可以异步提交,则异步提交,否则同步提交

    看下同步提交代码doGlobalCommit:

    遍历每个branchSession,对每个分支事务进行提交,失败会无限重试。

    resourceManagerInbound.branchCommit方法会调用DefaultCoordinator中branchCommit方法来与TCC资源管理器通信,发送分支事务提交消息,这里sendSyncRequest方法中就会根据resourceId去找到第一步(TCC资源管理器注册)中RpcContext的缓存,并得到对应Channel来建立Netty通信。

    各TCC资源管理器接收到分支事务提交请求后,会调用TCCResourceManager的branchCommit方法实际对事务进行提交。

    自此客户端收到seata-server提交信息后,完成了对分支事务的提交。

    总结一下全局事务提交的大致流程:

    1. 业务方调用微服务无异常,通过TM发起事务提交请求
    2. TC接收到事务提交请求后,通过Xid找到全局事务,再取出所有分支事务
    3. 遍历分支事务,发出分支事务提交请求
    4. TCC资源管理器RM接收到提交请求后,从本地TCCResource缓存中根据resourceId取出对应方法bean,反射调用commit方法

    全局事务回滚

    全局事务回滚思路与全局事务提交过程基本一致。

    全局事务回滚的大致流程:

    1.    业务方调用微服务发生异常,通过TM发起事务回滚请求

    2.    TC接收到事务回滚请求后,通过Xid找到全局事务,再取出所有分支事务

    3.    遍历分支事务,发出分支事务回滚请求

    4.    TCC资源管理器RM接收到回滚请求后,从本地TCCResource缓存中根据resourceId取出对应方法bean,反射调用rollback方法

    到此,我们完成了对Seata框架AT模式和TCC模式完整执行流程的分析。


    作者:朝雨忆轻尘
    出处:https://www.cnblogs.com/xifengxiaoma/ 
    版权所有,欢迎转载,转载请注明原文作者及出处。

  • 相关阅读:
    Spring框架:第八章:声明式事务
    Spring框架:第七章:AOP切面编程
    Spring框架:第六章:注解功能
    Jmeter之WebService接口测试
    Jmeter中的参数化常用的几种方式
    Jmeter之定时器
    Jmeter之断言——检查点
    Jmeter重要组件介绍(一)
    Jmeter中之各种乱码问题解决方案
    Jmeter之https请求
  • 原文地址:https://www.cnblogs.com/xifengxiaoma/p/13995416.html
Copyright © 2011-2022 走看看