zoukankan      html  css  js  c++  java
  • 分布式事务解决方案--Seata源码解析

    前言

    分布式项目只要有业务交互就会涉及到分布式事务问题,事务通常分为三步:创建事务、执行事务、提交事务或回滚事务,单机模式下只要一个事务可以依赖数据库的事务实现,而分布式事务往往涉及到多个项目多个数据库的同步更新操作,此时就需要有一套分布式事务解决方案,否则就会出现分布式系统数据不一致的问题。所以分布式事务解决就是要将多个事务当作一个事务来执行,要么全部提交成功要么全部回滚。

    一、常用分布式解决方案

    1、基于XA的二阶段提交

    XA是X/Open组织提出的二阶段提交的分布式事务规范,XA规范核心角色是事务管理器(Transaction Manager) 和 资源管理器 (Resource Manager),资源管理器就是本地的事物,事务管理器负责协调各个资源管理器事务的提交和回滚。

    二阶段提交主要将事务拆分成功两个阶段

    1、事务准备阶段:事务管理器通知各个资源管理器准备事务,各个资源管理器接受到通知之后在本地执行事务但是不提交,并且写好本地的redo和undo日志,然后将事务执行结果上报给事务管理器

    2、事务提交阶段:事务管理器根据各个资源管理器上报的结果决定是提交事务还是回滚事务,然后通知各个资源管理器,各个资源管理器本地提交事务或回滚事务,然后再上报提交或回滚的结果给事务管理器

    二阶段提交方案的问题

    1、第一阶段基本上不会有什么问题,如果有问题的话直接回滚即可,一般会出问题的可能是第二阶段

    2、第二阶段事务管理器下发通知之后,各个资源管理器就开始执行提交或回滚,但是此时还是会出现提交或回滚失败的问题。比如以下场景:

    事务管理器通知提交,资源管理器1接收到提交指令提交本地事务;资源管理器2由于宕机没有接收到提交指令而没有提交事务,此时就会出现事务提交不一致的问题了。

    3、异常问题处理方案

    a.本地资源管理器在执行事务时需要记录redo和undo日志

    b.当资源管理器宕机重启时,根据本地redo和undo日志,询问事务管理器当前事务的执行状态,然后继续提交或回滚事务

    c.当事务管理器宕机时,如果指令未下发,可以通过记录事务日志的方式根据本地日志进行下发指令;如果指令已经下发,那么各个资源管理器已经提交或回滚事务了,也没有问题

    d.第二阶段由于只需要执行提交和回滚命令,而事务中的业务实际已经执行完成,所以提交或回滚命令的执行时间是非常短的,所以出现问题的概率本身就不是很大

    4、二阶段提交还有一个问题是整体流程较长,就会导致各个资源管理器占用锁的时间就变长了,必须等待事务提交完成才会释放锁资源,所以高并发情况下不建议采用,否则会导致大量的请求被阻塞在锁竞争上

    2、基于TCC的事务补偿方案

    事务分成事务执行、事务提交和事务回滚,TCC本质就是实现了事务的三步,包括事务执行(Try)、事务提交(Confirm)、事务回滚(Cancel),TCC核心思想是针对每一个操作都注册一个对应的确认和取消操作。

    事务开始时,业务应用会向事务协调器注册启动事务。之后业务应用会调用所有服务的try接口,完成一阶段准备。之后事务协调器会根据try接口返回情况,决定调用confirm接口或者cancel接口。如果接口调用失败,会进行重试。
    TCC方案让应用自己定义数据库操作的粒度,使得降低锁冲突、提高吞吐量成为可能。 当然TCC方案也有不足之处,集中表现在以下两个方面:

    a.对应用的侵入性强。业务逻辑的每个分支都需要实现try、confirm、cancel三个操作,应用侵入性较强,改造成本高。

    b.实现难度较大。需要按照网络状态、系统故障等不同的失败原因实现不同的回滚策略。为了满足一致性的要求,confirm和cancel接口必须实现幂等。

    上述原因导致TCC方案大多被研发实力较强、有迫切需求的大公司所采用。微服务倡导服务的轻量化、易部署,而TCC方案中很多事务的处理逻辑需要应用自己编码实现,复杂且开发量大。
     

    3、基于MQ的最终一致性方案

    基于MQ的一致性相当于是异步实现的分布式事务,将事务分成了两个不关联的本地事务,基于MQ进行同步,根据最终结果实现一致性从而达到分布式事务。但是最终一致性方案的一致性会有所延迟,并不能保证实时的一致性,另外还需要保证MQ是高可用且不会丢失消息的情况,否则可能会丢失事务的一部分。

    二、基于Seata实现的分布式事务解决方案

    Seata是阿里开源的分布式事务解决方案,Seata的前身是阿里开源的分布式事务中间件Fescar,后来更名为Seata,意思是Simple Extensible Autonomous Transaction Architecture,是一套一站式分布式事务解决方案。

    Seata是基于XA的二阶段提交方案演变而来的,核心角色分成了三个:

    Transcation Coordinator(TC):事务协调器,负责维护全局事务的状态,以及协调全局事务的提交和回滚

    Transcation Manager(TM):控制全局事务的边界,负责开启一个全局事务,并发起最终的全局事务的提交或回滚

    Resource Manager(RM):资源管理器,控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚。

    一次完整的分布式事务过程如下:

    1、TM向TC申请一个全局事务,TC创建一个新的全局事务,并有一个全局事务唯一ID为XID

    2、XID在微服务调用链路的上下文中用于传播

    3、RM向TC注册分支事务,并由XID对应的全局事务管理

    4、TM向TC发起全局事务XID的提交或回滚决议

    5、TC调用XID管理的所有分支事务进行提交或回滚

    Seata和XA有点类似都是进行了两个阶段,但是有所不同,

    1、XA的资源管理器本质是数据库,所以比较依赖数据库的事务;而Seata的资源管理器是第三方包的形式嵌入在应用程序中,更加的灵活

    2、XA的事务占有锁资源需要等到第二阶段完成才会释放;而Seata是在第一阶段就将所有的事务进行提交,因为90%以上的请求都会执行成功,所以第二阶段回滚的概率一般不会太高,所以将提交事务的工作放在第一阶段,可以保证虽然全局事务没有结束,但是分支事务可以提前先提交好,如果真的出现异常情况,再通过第二阶段回滚即可,这样就极大的建设了各个分支事务的资源占有时间,提高了整体吞吐量。

    三、Seata的实现细节

    3.1、Seata的事务如何回滚?

    Seata是通过中间件的方式嵌入应用程序的,Seata的 JDBC 数据源代理通过对业务 SQL 的解析,把业务数据在更新前后的数据镜像组织成回滚日志,利用 本地事务 的 ACID 特性,将业务数据的更新和回滚日志的写入在同一个 本地事务中提交。这样就可以保证任何提交的业务数据的更新一定有相应的回滚日志存在。基于这样的逻辑Seata可以在第一阶段就将分支事务直接提交,而不用担心后续的回滚。

    到了第二阶段,如果全局事务是提交,那么分支事务无需再提交事务,只需要异步将回滚日志删除即可,而这个异步操作的效率比较高;如果全局事务是回滚,那么分支事务就根据XID找到对应的回滚日志进行回滚操作即可。

    3.2、分布式事务的传播

    全局事务是由于事务协调器发起,实际就是调用链路的第一个发起方,全局事务发起之后会产生全局事务XID,而为了让其他分支事务拿到XID,就需要XID在服务调用链路上进行传播。所以服务调用机制就需要提高一套可以透传XID的机制,比如HTTP请求的header、dubbo的Filter+RpcContext、本地方法的ThreadLocal等

    四、Seata使用案例

    业务场景:模拟下单业务,需要扣库存,扣用户余额,创建订单三步,假设有订单服务、库存服务、账户服务,那么下单业务就需要调用库存服务和订单服务,而订单服务又需要调用账户服务

    案例采用Seata官方提供的Demo

    1、maven依赖如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <!--
      ~  Copyright 1999-2018 Alibaba Group Holding Ltd.
      ~
      ~  Licensed under the Apache License, Version 2.0 (the "License");
      ~  you may not use this file except in compliance with the License.
      ~  You may obtain a copy of the License at
      ~
      ~       http://www.apache.org/licenses/LICENSE-2.0
      ~
      ~  Unless required by applicable law or agreed to in writing, software
      ~  distributed under the License is distributed on an "AS IS" BASIS,
      ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      ~  See the License for the specific language governing permissions and
      ~  limitations under the License.
      -->
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <groupId>io.seata</groupId>
            <artifactId>seata-samples</artifactId>
            <version>1.1.0</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
        <artifactId>seata-samples-dubbo</artifactId>
        <packaging>jar</packaging>
        <name>seata-samples-dubbo ${project.version}</name>
        <dependencies>
            <dependency>
                <groupId>org.apache.dubbo</groupId>
                <artifactId>dubbo</artifactId>
            </dependency>
            <dependency>
                <groupId>io.seata</groupId>
                <artifactId>seata-all</artifactId>
            </dependency>
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>dubbo-registry-nacos</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba.spring</groupId>
                <artifactId>spring-context-support</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba.spring</groupId>
                <artifactId>spring-context-support</artifactId>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jdbc</artifactId>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
            </dependency>
    
            <!--<dependency>-->
                <!--<groupId>org.apache.dubbo</groupId>-->
                <!--<artifactId>dubbo-remoting-etcd3</artifactId>-->
            <!--</dependency>-->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                    <configuration>
                        <skip>true</skip>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    2、数据库初始化业务数据表脚本如下:

    DROP TABLE IF EXISTS `storage_tbl`;
    CREATE TABLE `storage_tbl` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `commodity_code` varchar(255) DEFAULT NULL,
      `count` int(11) DEFAULT 0,
      PRIMARY KEY (`id`),
      UNIQUE KEY (`commodity_code`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    DROP TABLE IF EXISTS `order_tbl`;
    CREATE TABLE `order_tbl` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `user_id` varchar(255) DEFAULT NULL,
      `commodity_code` varchar(255) DEFAULT NULL,
      `count` int(11) DEFAULT 0,
      `money` int(11) DEFAULT 0,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    
    DROP TABLE IF EXISTS `account_tbl`;
    CREATE TABLE `account_tbl` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `user_id` varchar(255) DEFAULT NULL,
      `money` int(11) DEFAULT 0,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    View Code

    3、数据库初始化seata日志表脚步如下:

    CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

    4、定义服务和实现

    账户服务接口和实现

    public interface AccountService {
    
        /**
         * 余额扣款
         *
         * @param userId 用户ID
         * @param money  扣款金额
         */
        void debit(String userId, int money);
    }
    
    /** 账户服务 */
    public class AccountServiceImpl implements AccountService {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);
    
        private JdbcTemplate jdbcTemplate;
    
        /**
         * Sets jdbc template.
         *
         * @param jdbcTemplate the jdbc template
         */
        public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
            this.jdbcTemplate = jdbcTemplate;
        }
    
        @Override
        public void debit(String userId, int money) {
            LOGGER.info("Account Service ... xid: " + RootContext.getXID());
            LOGGER.info("Deducting balance SQL: update account_tbl set money = money - {} where user_id = {}",money,userId);
    
            jdbcTemplate.update("update account_tbl set money = money - ? where user_id = ?", new Object[] {money, userId});
            LOGGER.info("Account Service End ... ");
        }
    }
    View Code

    库存服务接口和实现

    public interface StorageService {
    
        /**
         * 扣减库存
         *
         * @param commodityCode 商品编号
         * @param count         扣减数量
         */
        void deduct(String commodityCode, int count);
    }
    
    /** 库存服务*/
    public class StorageServiceImpl implements StorageService {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(StorageService.class);
    
        private JdbcTemplate jdbcTemplate;
    
        public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
            this.jdbcTemplate = jdbcTemplate;
        }
    
        @Override
        public void deduct(String commodityCode, int count) {
            LOGGER.info("Storage Service Begin ... xid: " + RootContext.getXID());
            LOGGER.info("Deducting inventory SQL: update storage_tbl set count = count - {} where commodity_code = {}",
                count, commodityCode);
    
            jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?",
                new Object[] {count, commodityCode});
            LOGGER.info("Storage Service End ... ");
        }
    }
    View Code

    订单服务接口和实现

    public interface OrderService {
        /**
         * 创建订单
         *
         * @param userId        用户ID
         * @param commodityCode 商品编号
         * @param orderCount    订购数量
         * @return 生成的订单 order
         */
        Order create(String userId, String commodityCode, int orderCount);
    }
    
    /** 订单服务*/
    public class OrderServiceImpl implements OrderService {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(OrderService.class);
    
        private AccountService accountService;
    
        private JdbcTemplate jdbcTemplate;
    
        @Override
        public Order create(String userId, String commodityCode, int orderCount) {
            LOGGER.info("Order Service Begin ... xid: " + RootContext.getXID());
    
            // 计算订单金额
            int orderMoney = calculate(commodityCode, orderCount);
    
            // 从账户余额扣款
            accountService.debit(userId, orderMoney);
    
            final Order order = new Order();
            order.userId = userId;
            order.commodityCode = commodityCode;
            order.count = orderCount;
            order.money = orderMoney;
    
            KeyHolder keyHolder = new GeneratedKeyHolder();
    
            LOGGER.info("Order Service SQL: insert into order_tbl (user_id, commodity_code, count, money) values ({}, {}, {}, {})" ,userId ,commodityCode ,orderCount ,orderMoney );
    
            jdbcTemplate.update(new PreparedStatementCreator() {
    
                @Override
                public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
                    PreparedStatement pst = con.prepareStatement(
                            "insert into order_tbl (user_id, commodity_code, count, money) values (?, ?, ?, ?)",
                            PreparedStatement.RETURN_GENERATED_KEYS);
                    pst.setObject(1, order.userId);
                    pst.setObject(2, order.commodityCode);
                    pst.setObject(3, order.count);
                    pst.setObject(4, order.money);
                    return pst;
                }
            }, keyHolder);
    
            order.id = keyHolder.getKey().longValue();
    
            LOGGER.info("Order Service End ... Created " + order);
    
            return order;
        }
    
        /**
         * Sets account service.
         *
         * @param accountService the account service
         */
        public void setAccountService(AccountService accountService) {
            this.accountService = accountService;
        }
    
        /**
         * Sets jdbc template.
         *
         * @param jdbcTemplate the jdbc template
         */
        public void setJdbcTemplate(JdbcTemplate jdbcTemplate) {
            this.jdbcTemplate = jdbcTemplate;
        }
    
        private int calculate(String commodityId, int orderCount) {
            return 200 * orderCount;
        }
    
    }
    View Code

    下单业务服务接口和实现

    public interface BusinessService {
    
        /**
         * 用户订购商品
         *
         * @param userId        用户ID
         * @param commodityCode 商品编号
         * @param orderCount    订购数量
         */
        void purchase(String userId, String commodityCode, int orderCount);
    
    }
    
    public class BusinessServiceImpl implements BusinessService {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);
    
        private StorageService storageService;
        private OrderService orderService;
    
        @Override
        @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
        public void purchase(String userId, String commodityCode, int orderCount) {
            LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
            storageService.deduct(commodityCode, orderCount);
            orderService.create(userId, commodityCode, orderCount);
            //throw new RuntimeException("xxx");
        }
    
        public void setStorageService(StorageService storageService) {
            this.storageService = storageService;
        }
    
        public void setOrderService(OrderService orderService) {
            this.orderService = orderService;
        }
    
    }

    下单业务是分布式事务的发起者,所以方法上需要添加全局事务注解@GlobalTransactional 表示该方法是一个全局事务,那么该方法内部的执行不管是调用远程方法还是本地方法都会添加到全局事务中

    5、本案例采用dubbo的RPC调用,各个服务提供方和服务消费方配置如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations" value="classpath:jdbc.properties"/>
        </bean>
    
        <bean name="accountDataSource" class="com.alibaba.druid.pool.DruidDataSource"
              init-method="init" destroy-method="close">
            <property name="url" value="${jdbc.account.url}"/>
            <property name="username" value="${jdbc.account.username}"/>
            <property name="password" value="${jdbc.account.password}"/>
            <property name="driverClassName" value="${jdbc.account.driver}"/>
            <property name="initialSize" value="0" />
            <property name="maxActive" value="180" />
            <property name="minIdle" value="0" />
            <property name="maxWait" value="60000" />
            <property name="validationQuery" value="Select 'x' from DUAL" />
            <property name="testOnBorrow" value="false" />
            <property name="testOnReturn" value="false" />
            <property name="testWhileIdle" value="true" />
            <property name="timeBetweenEvictionRunsMillis" value="60000" />
            <property name="minEvictableIdleTimeMillis" value="25200000" />
            <property name="removeAbandoned" value="true" />
            <property name="removeAbandonedTimeout" value="1800" />
            <property name="logAbandoned" value="true" />
            <property name="filters" value="mergeStat" />
        </bean>
    
        <bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
            <constructor-arg ref="accountDataSource" />
        </bean>
    
        <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
            <property name="dataSource" ref="accountDataSourceProxy" />
        </bean>
    
        <dubbo:application name="dubbo-demo-account-service" >
            <dubbo:parameter key="qos.enable" value="false"/>
        </dubbo:application>
    <!--    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />-->
        <!--support etcd -->
        <!--<dubbo:registry address="etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService" />-->
        <!--support zk-->
        <dubbo:registry address="zookeeper://localhost:2181" />
        <!--support nacos-->
        <!--<dubbo:registry address="nacos://127.0.0.1:8848"/>-->
        <dubbo:protocol name="dubbo" port="20881"/>
        <dubbo:service interface="io.seata.samples.dubbo.service.AccountService" ref="service" timeout="10000"/>
    
        <bean id="service" class="io.seata.samples.dubbo.service.impl.AccountServiceImpl">
            <property name="jdbcTemplate" ref="jdbcTemplate"/>
        </bean>
    
        <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
            <constructor-arg value="dubbo-demo-account-service"/>
            <constructor-arg value="my_test_tx_group"/>
        </bean>
    </beans>
    View Code
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations" value="classpath:jdbc.properties"/>
        </bean>
    
        <bean name="orderDataSource" class="com.alibaba.druid.pool.DruidDataSource"
              init-method="init" destroy-method="close">
            <property name="url" value="${jdbc.order.url}"/>
            <property name="username" value="${jdbc.order.username}"/>
            <property name="password" value="${jdbc.order.password}"/>
            <property name="driverClassName" value="${jdbc.order.driver}"/>
            <property name="initialSize" value="0"/>
            <property name="maxActive" value="180"/>
            <property name="minIdle" value="0"/>
            <property name="maxWait" value="60000"/>
            <property name="validationQuery" value="Select 'x' from DUAL"/>
            <property name="testOnBorrow" value="false"/>
            <property name="testOnReturn" value="false"/>
            <property name="testWhileIdle" value="true"/>
            <property name="timeBetweenEvictionRunsMillis" value="60000"/>
            <property name="minEvictableIdleTimeMillis" value="25200000"/>
            <property name="removeAbandoned" value="true"/>
            <property name="removeAbandonedTimeout" value="1800"/>
            <property name="logAbandoned" value="true"/>
            <property name="filters" value="mergeStat"/>
        </bean>
    
        <bean id="orderDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
            <constructor-arg ref="orderDataSource"/>
        </bean>
    
        <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
            <property name="dataSource" ref="orderDataSourceProxy"/>
        </bean>
    
        <dubbo:application name="dubbo-demo-order-service">
            <dubbo:parameter key="qos.enable" value="false"/>
        </dubbo:application>
    <!--    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />-->
        <!--support etcd -->
        <!--<dubbo:registry address="etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService" />-->
        <!--support zk-->
        <dubbo:registry address="zookeeper://localhost:2181" />
        <!--support nacos-->
        <!--<dubbo:registry address="nacos://127.0.0.1:8848"/> -->
        <dubbo:protocol name="dubbo" port="20883"/>
        <dubbo:service interface="io.seata.samples.dubbo.service.OrderService" ref="service" timeout="10000"/>
    
        <dubbo:reference id="accountService" check="false" interface="io.seata.samples.dubbo.service.AccountService"/>
    
        <bean id="service" class="io.seata.samples.dubbo.service.impl.OrderServiceImpl">
            <property name="jdbcTemplate" ref="jdbcTemplate"/>
            <property name="accountService" ref="accountService"/>
        </bean>
    
        <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
            <constructor-arg value="dubbo-demo-order-service"/>
            <constructor-arg value="my_test_tx_group"/>
        </bean>
    </beans>
    View Code
    <?xml version="1.0" encoding="UTF-8"?>
    <!--
      ~  Copyright 1999-2018 Alibaba Group Holding Ltd.
      ~
      ~  Licensed under the Apache License, Version 2.0 (the "License");
      ~  you may not use this file except in compliance with the License.
      ~  You may obtain a copy of the License at
      ~
      ~       http://www.apache.org/licenses/LICENSE-2.0
      ~
      ~  Unless required by applicable law or agreed to in writing, software
      ~  distributed under the License is distributed on an "AS IS" BASIS,
      ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      ~  See the License for the specific language governing permissions and
      ~  limitations under the License.
      -->
    
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations" value="classpath:jdbc.properties"/>
        </bean>
    
        <bean name="storageDataSource" class="com.alibaba.druid.pool.DruidDataSource"
              init-method="init" destroy-method="close">
            <property name="url" value="${jdbc.storage.url}"/>
            <property name="username" value="${jdbc.storage.username}"/>
            <property name="password" value="${jdbc.storage.password}"/>
            <property name="driverClassName" value="${jdbc.storage.driver}"/>
            <property name="initialSize" value="0" />
            <property name="maxActive" value="180" />
            <property name="minIdle" value="0" />
            <property name="maxWait" value="60000" />
            <property name="validationQuery" value="Select 'x' from DUAL" />
            <property name="testOnBorrow" value="false" />
            <property name="testOnReturn" value="false" />
            <property name="testWhileIdle" value="true" />
            <property name="timeBetweenEvictionRunsMillis" value="60000" />
            <property name="minEvictableIdleTimeMillis" value="25200000" />
            <property name="removeAbandoned" value="true" />
            <property name="removeAbandonedTimeout" value="1800" />
            <property name="logAbandoned" value="true" />
            <property name="filters" value="mergeStat" />
        </bean>
    
        <bean id="storageDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
            <constructor-arg ref="storageDataSource" />
        </bean>
    
        <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
            <property name="dataSource" ref="storageDataSourceProxy" />
        </bean>
    
        <dubbo:application name="dubbo-demo-storage-service"  >
            <dubbo:parameter key="qos.enable" value="false"/>
        </dubbo:application>
    <!--    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />-->
        <!--support etcd -->
        <!--<dubbo:registry address="etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService" />-->
        <!--support zk-->
        <dubbo:registry address="zookeeper://localhost:2181" />
        <!--support nacos-->
        <!--<dubbo:registry address="nacos://127.0.0.1:8848"/> -->
        <dubbo:protocol name="dubbo" port="20882" />
        <dubbo:service interface="io.seata.samples.dubbo.service.StorageService" ref="service" timeout="10000"/>
    
        <bean id="service" class="io.seata.samples.dubbo.service.impl.StorageServiceImpl">
            <property name="jdbcTemplate" ref="jdbcTemplate"/>
        </bean>
    
        <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
            <constructor-arg value="dubbo-demo-storage-service"/>
            <constructor-arg value="my_test_tx_group"/>
        </bean>
    </beans>
    View Code
    <?xml version="1.0" encoding="UTF-8"?>
    <!--
      ~  Copyright 1999-2018 Alibaba Group Holding Ltd.
      ~
      ~  Licensed under the Apache License, Version 2.0 (the "License");
      ~  you may not use this file except in compliance with the License.
      ~  You may obtain a copy of the License at
      ~
      ~       http://www.apache.org/licenses/LICENSE-2.0
      ~
      ~  Unless required by applicable law or agreed to in writing, software
      ~  distributed under the License is distributed on an "AS IS" BASIS,
      ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      ~  See the License for the specific language governing permissions and
      ~  limitations under the License.
      -->
    
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
    
        <dubbo:application name="dubbo-demo-app">
            <dubbo:parameter key="qos.enable" value="false" />
            <dubbo:parameter key="qos.accept.foreign.ip" value="false" />
            <dubbo:parameter key="qos.port" value="33333" />
        </dubbo:application>
    <!--    <dubbo:registry address="multicast://224.5.6.7:1234?unicast=false" />-->
        <!--support etcd -->
        <!--<dubbo:registry address="etcd3://127.0.0.1:2379/org.apache.dubbo.registry.RegistryService" />-->
        <!--support zk-->
        <dubbo:registry address="zookeeper://localhost:2181" />
        <!--support nacos-->
        <!-- <dubbo:registry address="nacos://127.0.0.1:8848"/> -->
    
        <dubbo:reference id="orderService" check="false" interface="io.seata.samples.dubbo.service.OrderService"/>
        <dubbo:reference id="storageService" check="false" interface="io.seata.samples.dubbo.service.StorageService"/>
    
        <bean id="business" class="io.seata.samples.dubbo.service.impl.BusinessServiceImpl">
            <property name="orderService" ref="orderService"/>
            <property name="storageService" ref="storageService"/>
        </bean>
    
        <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
            <constructor-arg value="dubbo-demo-app"/>
            <constructor-arg value="my_test_tx_group"/>
        </bean>
    
    </beans>
    View Code

    6、启动Seata服务,Seata服务下载地址为:https://github.com/seata/seata/releases, 下载完成之后解压,并进入bin目录,执行如下脚本开启seata服务

    1 sh seata-server.sh -p 8091 -h 127.0.0.1 -m file

    7、分别启动库存服务、订单服务、账户服务

     1 public class DubboStorageServiceStarter {
     2     /**
     3      * 1. Storage service is ready . A seller add 100 storage to a sku: C00321
     4      *
     5      * @param args the input arguments
     6      */
     7     public static void main(String[] args) {
     8         ClassPathXmlApplicationContext storageContext = new ClassPathXmlApplicationContext(
     9             new String[]{"spring/dubbo-storage-service.xml"});
    10         storageContext.getBean("service");
    11         JdbcTemplate storageJdbcTemplate = (JdbcTemplate) storageContext.getBean("jdbcTemplate");
    12         storageJdbcTemplate.update("delete from storage_tbl where commodity_code = 'C00321'");
    13         storageJdbcTemplate.update("insert into storage_tbl(commodity_code, count) values ('C00321', 100)");
    14         new ApplicationKeeper(storageContext).keep();
    15     }
    16 }
    View Code
    public class DubboOrderServiceStarter {
        /**
         * The entry point of application.
         *
         * @param args the input arguments
         */
        public static void main(String[] args) {
            /**
             *  3. Order service is ready . Waiting for buyers to order
             */
            ClassPathXmlApplicationContext orderContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-order-service.xml"});
            orderContext.getBean("service");
            new ApplicationKeeper(orderContext).keep();
        }
    }
    View Code
    public class DubboAccountServiceStarter {
        /**
         * 2. Account service is ready . A buyer register an account: U100001 on my e-commerce platform
         *
         * @param args the input arguments
         */
        public static void main(String[] args) {
            ClassPathXmlApplicationContext accountContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-account-service.xml"});
            accountContext.getBean("service");
            JdbcTemplate accountJdbcTemplate = (JdbcTemplate) accountContext.getBean("jdbcTemplate");
            accountJdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");
            accountJdbcTemplate.update("insert into account_tbl(user_id, money) values ('U100001', 999)");
    
            new ApplicationKeeper(accountContext).keep();
        }
    }
    View Code

    8、启动业务服务,调用下单方法

    public class DubboBusinessTester {
        /**
         * The entry point of application.
         *
         * @param args the input arguments
         */
        public static void main(String[] args) {
            /**
             *  4. The whole e-commerce platform is ready , The buyer(U100001) create an order on the sku(C00321) , the count is 2
             */
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                new String[]{"spring/dubbo-business.xml"});
            final BusinessService business = (BusinessService) context.getBean("business");
            business.purchase("U100001", "C00321", 2);
        }
    }

    9、模拟测试

    1、各个服务均正常情况

    测试结果:业务数据正常

    2、库存服务异常

    测试结果:库存服务回滚,全局事务中其他事务未执行

    3、订单服务异常

    测试结果:数据库数据先更新成功然后全部回滚,说明各个事务都提交成功然后又执行了回滚操作

    4、账户服务异常

    测试结果:库存数据显扣除然后由于账户服务异常导致订单服务回滚,从而全局事务回滚,而库存数据也恢复

    5、业务服务异常

    测试流程:业务服务调用订单服务、库存服务、账户服务全部执行成功,最后处理业务时抛异常,此时数据库数据已经更新成功了

    测试结果:库存服务、订单服务、账户服务一切正常数据库数据全部更新成功,然后由于业务服务异常导致全部数据回滚

    6、业务服务异常,Seata服务宕机

    测试流程:业务服务调用订单服务、库存服务、账户服务全部执行成功,最后处理业务时抛异常,此时数据库数据已经更新成功了,并且Seata服务测试挂了,再重启时能否再回滚呢?

    测试结果:Seata重启之后,各个客户端会再次向Seata注册事务并上报事务状态,Seata根据事务状态判断全局事务是提交还是回滚,所以最终结果为回滚成功

    7、全局事务回滚时,其他事务变更事务场景

    测试流程:由于Seata的分支事务会在第一阶段直接提交,那么此时其他事务理论上就可以任意访问和更新数据了,此时如果全局事务再回滚,那么已经被修改的数据还可以回滚么?

    测试结果:其他分支事务提交成功;全局事务回滚数据恢复

    五、Seata的实现原理

    以上述为例,架构图如下图示: 

    全局事务协调器TC就是Seata Server服务,事务管理器TM也就是事务发起者就是Business服务,资源管理器RM就分别是操作数据库的Storage、Order和Account服务。

    5.1、Seata源码解析

    5.1.1、初始化

    从配置文件入手,使用Seata时需要配置两个bean分别是DataSourceProxy和GlobalTransactionScanner,配置如下:

    <bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">
            <constructor-arg ref="accountDataSource" />
     </bean>
    1 <bean class="io.seata.spring.annotation.GlobalTransactionScanner">
    2         <constructor-arg value="dubbo-demo-account-service"/>
    3         <constructor-arg value="my_test_tx_group"/>
    4     </bean>

    从字面以上看DataSourceProxy意思就是数据源的代理,构造函数中需要传入需要被代理的数据源,比如阿里的DruidDataSource;GlobalTransactionScanner意思是全局事务扫描器,那么应该就是用来扫描项目中配置的所有全局事务。所以了解Seata就需要从这两个类开始着手。

    5.1.2、DataSourceProxy初始化

    DataSourceProxy构造函数保护了一个DataSource对象,意思是要代理这个DataSource对象,构造函数源码如下:

    public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
            if (targetDataSource instanceof SeataDataSourceProxy) {
                LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
                targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
            }
            /** 设置目标数据源DataSource*/
            this.targetDataSource = targetDataSource;
            /** 执行初始化init方法*/
            init(targetDataSource, resourceGroupId);
        }
    
    private void init(DataSource dataSource, String resourceGroupId) {
            /** 设置资源小组ID*/
            this.resourceGroupId = resourceGroupId;
            /** 设置JDBC参数*/
            try (Connection connection = dataSource.getConnection()) {
                jdbcUrl = connection.getMetaData().getURL();
                dbType = JdbcUtils.getDbType(jdbcUrl);
                if (JdbcConstants.ORACLE.equals(dbType)) {
                    userName = connection.getMetaData().getUserName();
                }
            } catch (SQLException e) {
                throw new IllegalStateException("can not init dataSource", e);
            }
            /** 注册当前数据源*/
            DefaultResourceManager.get().registerResource(this);
            /** 判断是否开启定时任务检查表的元数据*/
            if (ENABLE_TABLE_META_CHECKER_ENABLE) {
                tableMetaExcutor.scheduleAtFixedRate(() -> {
                    try (Connection connection = dataSource.getConnection()) {
                        TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
                            .refresh(connection, DataSourceProxy.this.getResourceId());
                    } catch (Exception ignore) {
                    }
                }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
            }
    
            //Set the default branch type to 'AT' in the RootContext.
            /** 设置全局分支类型,默认是AT类型*/
            RootContext.setDefaultBranchType(this.getBranchType());
        }

    从DataSourceProxy初始化的源码来看主要是配置数据源,JDBC相关的熟悉,并且设置全局事务的类型默认为AT类型。另外有一步比较重要就是注册资源到资源管理器,说明Seata资源管理器管理的资源就是DataSourceProxy对象,因为DataSourceProxy类实现了资源Resource接口,而资源管理器是ResourceManager对象,在Seata中就是RM角色,那么就可以在看下资源管理器是怎么来的?如何注册资源的?

    5.1.3、资源管理器初始化

    DefaultResourceManager.get()方法可以返回一个资源管理器,源码如下:

    1 private static class SingletonHolder {
    2         private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
    3     }
    4     
    5     public static DefaultResourceManager get() {
    6         /** 返回资源管理器的单例对象*/
    7         return SingletonHolder.INSTANCE;
    8     }

    资源管理器采用单例模式初始化,返回了DefaultResourceManager类的对象,那么再看下DefaultResourceManager的初始化过程,源码如下:

     1 /** 存储不同模式的资源管理器对象,支持模式为:AT、TCC、SAGA、XA*/
     2     protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();
     3 
     4     /** 初始化资源管理器*/
     5     private DefaultResourceManager() {
     6         initResourceManagers();
     7     }
     8 
     9     /** 初始化资源管理器列表*/
    10     protected void initResourceManagers() {
    11         /** 加载所有的资源管理器对象*/
    12         List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
    13         if (CollectionUtils.isNotEmpty(allResourceManagers)) {
    14             for (ResourceManager rm : allResourceManagers) {
    15                 resourceManagers.put(rm.getBranchType(), rm);
    16             }
    17         }
    18     }

    其中EnhancedServerLoader.loadAll(ResourceManager.class)这一行实际是采用了SPI机制来加载资源管理器对象,SPI机制就是从/resource/META-INF/services目录下寻找ResourceManager接口的实现类,然后进行加载。所以我们就需要从该目录下寻找ResourceManager的实现类,既然应用项目中没有配置SPI,那么肯定就是Seata提供的jar包中配置了,打开seata-all.jar,果然发现了ResourceManager的SPI配置,如下图:

    并且除了ResourceManager配置,还有很多其他接口的配置,所以可以看出seata中大量的使用了SPI机制。那么此时打开ResourceManager配置文件,内容如下:

    io.seata.rm.datasource.DataSourceManager
    io.seata.rm.datasource.xa.ResourceManagerXA
    io.seata.rm.tcc.TCCResourceManager
    io.seata.saga.rm.SagaResourceManager

    可以看出配置了不同模式下的资源管理器,正好和Seata支持的AT、TCC、SAGA、XA四种模式对应上,而默认的AT模式资源管理器就是DataSourceManager类的对象。

    总结:Seata通过SPI机制加载不同模式下的资源管理器对象,存在Map中,DataSourceProxy注册资源时根据模式从Map中获取对应类型的资源管理器即可,默认AT模式就是DataSourceManager对象。

    5.1.4、资源管理器注册资源

    现在有了资源管理器了,那么就需要再注册下资源,也就是调用ResourceManager的registerResource(Resource resource)方法,以AT模式为例,就分析下DataSourceManager的registerResource方法,源码如下:

     1 /** 资源缓存*/
     2     private final Map<String, Resource> dataSourceCache = new ConcurrentHashMap<>();
     3 
     4     /** 注册资源*/
     5     public void registerResource(Resource resource) {
     6         DataSourceProxy dataSourceProxy = (DataSourceProxy) resource;
     7         /** 将资源加入缓存中*/
     8         dataSourceCache.put(dataSourceProxy.getResourceId(), dataSourceProxy);
     9         /** 调用父类的registerResource方法*/
    10         super.registerResource(dataSourceProxy);
    11     }

    加了一步缓存,然后再调用父类AbstractResourceManager的registerResource方法,源码如下:

    1 public void registerResource(Resource resource) {
    2         RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
    3     }

    代码只有一行但是比较长,实际就是通过RmNettyRemotingClinet类的静态方法getInstance()获取一个单例对象,然后调用单例对象的registerResource方法,RmNettyRemotingClient.getInstance()从字面意思就可以很好理解, 获取一个代表资源管理器(RM)的Netty客户端实例,实际就是RmNettyRemotingClient对象,源码如下:

     1 /** 获取表示资源管理器的Netty客户端对象*/
     2     public static RmNettyRemotingClient getInstance() {
     3         if (instance == null) {
     4             synchronized (RmNettyRemotingClient.class) {
     5                 if (instance == null) {
     6                     NettyClientConfig nettyClientConfig = new NettyClientConfig();
     7                     /** 创建一个线程池*/
     8                     final ThreadPoolExecutor messageExecutor = new ThreadPoolExecutor(
     9                             nettyClientConfig.getClientWorkerThreads(), nettyClientConfig.getClientWorkerThreads(),
    10                             KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(MAX_QUEUE_SIZE),
    11                             new NamedThreadFactory(nettyClientConfig.getRmDispatchThreadPrefix(),
    12                                     nettyClientConfig.getClientWorkerThreads()), new ThreadPoolExecutor.CallerRunsPolicy());
    13                     /** 通过构造器创建RmNettyRemotingClient对象*/
    14                     instance = new RmNettyRemotingClient(nettyClientConfig, null, messageExecutor);
    15                 }
    16             }
    17         }
    18         return instance;
    19     }
    20 
    21     private RmNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
    22                                   ThreadPoolExecutor messageExecutor) {
    23         /**
    24          * nettyClientConfig:Netty客户端配置
    25          * eventExecutorGroup:事件线程池组
    26          * messageExecutor:消息线程池
    27          * TransactionRole.RMROLE:表示当前客户端角色是RM
    28          * */
    29         super(nettyClientConfig, eventExecutorGroup, messageExecutor, TransactionRole.RMROLE);
    30     }
    31 
    32     public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
    33                                        ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
    34         super(messageExecutor);
    35         /** 设置角色,分别为 TM、RM、SERVER*/
    36         this.transactionRole = transactionRole;
    37         /** 初始化Netty客户端*/
    38         clientBootstrap = new NettyClientBootstrap(nettyClientConfig, eventExecutorGroup, transactionRole);
    39         /** 设置Netty客户端处理器*/
    40         clientBootstrap.setChannelHandlers(new ClientHandler());
    41         clientChannelManager = new NettyClientChannelManager(
    42                 new NettyPoolableFactory(this, clientBootstrap), getPoolKeyFunction(), nettyClientConfig);
    43     }

    这里实际就是创建了一个Netty的客户端对象,并且当前客户端对象的角色是RM也就是资源管理器,客户端处理业务的处理器是ClientHandler对象

    此时Netty客户端已经创建了,下一步就应该是需要和Netty服务器(TC)进行连接了,那么估计逻辑就在registerResource方法中了,RmNettyRemotingClient的registerResource源码如下:

    public void registerResource(String resourceGroupId, String resourceId) {
    
            // Resource registration cannot be performed until the RM client is initialized
            if (StringUtils.isBlank(transactionServiceGroup)) {
                return;
            }
    
            if (getClientChannelManager().getChannels().isEmpty()) {
                getClientChannelManager().reconnect(transactionServiceGroup);
                return;
            }
            synchronized (getClientChannelManager().getChannels()) {
                for (Map.Entry<String, Channel> entry : getClientChannelManager().getChannels().entrySet()) {
                    String serverAddress = entry.getKey();
                    Channel rmChannel = entry.getValue();
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("will register resourceId:{}", resourceId);
                    }
                    /** 发送注册资源的消息*/
                    sendRegisterMessage(serverAddress, rmChannel, resourceId);
                }
            }
        }
    
        public void sendRegisterMessage(String serverAddress, Channel channel, String resourceId) {
            /** 设置应用ID和事务服务组*/
            RegisterRMRequest message = new RegisterRMRequest(applicationId, transactionServiceGroup);
            /** 设置资源ID*/
            message.setResourceIds(resourceId);
            try {
                /** 向Netty服务器发送消息*/
                super.sendAsyncRequest(channel, message);
            } catch (FrameworkException e) {
                if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && serverAddress != null) {
                    getClientChannelManager().releaseChannel(channel, serverAddress);
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("remove not writable channel:{}", channel);
                    }
                } else {
                    LOGGER.error("register resource failed, channel:{},resourceId:{}", channel, resourceId, e);
                }
            }
        }
    View Code

    大致流程就是构建一个注册资源的消息,然后发送给Netty服务器即可

    总结下资源管理器注册资源的流程:

    1、首先获取代表资源管理器RM的Netty客户端对象

    2、构造代表注册资源的消息体对象

    3、Netty客户端将注册资源的消息发送给Netty服务器请求注册资源

    5.1.5、资源管理器处理器

    资源管理器向资源协调器(TC)注册了资源之后就需要对资源进行管理,由资源协调器向资源管理器发送命令,也就是Netty服务器向Netty客户端发送命令,所以Netty客户端就需要对服务器发送的命令进行处理,通过Netty客户端初始化过程可知处理的逻辑在ClientHandler中,所以就再分析ClientHandler的channelRead方法即可,源码如下:

    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
                if (!(msg instanceof RpcMessage)) {
                    return;
                }
                /** 将Netty服务器发送的消息封装成RpcMessage对象,调用processMessage方法*/
                processMessage(ctx, (RpcMessage) msg);
            }
     1 protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
     2         /** 获取消息体*/
     3         Object body = rpcMessage.getBody();
     4         if (body instanceof MessageTypeAware) {
     5             /** 消息体转换为MessageTypeAware对象*/
     6             MessageTypeAware messageTypeAware = (MessageTypeAware) body;
     7             /** 根据消息体类型编码TypeCode获取对应的执行器*/
     8             final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
     9             if (pair != null) {
    10                 if (pair.getSecond() != null) {
    11                     try {
    12                         pair.getSecond().execute(() -> {
    13                             try {
    14                                 /** 获取第一个执行器处理消息*/
    15                                 pair.getFirst().process(ctx, rpcMessage);
    16                             } catch (Throwable th) {
    17                                 LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
    18                             } finally {
    19                                 MDC.clear();
    20                             }
    21                         });
    22                     } catch (RejectedExecutionException e) {
    23                       //......
    24                     }
    25                 } else {
    26                    //......
    27                 }
    28             } else {
    29                 LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
    30             }
    31         } else {
    32             LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
    33         }
    34     }

    方法比较长但是逻辑比较简单,一共就两步,第一步是获取消息对应的类型,然后找到该类型的处理器;第二步是直接执行对应消息类型的处理器的process方法即可,处理器的顶级接口是RemotingProcessor,表示远程消息处理器,实现类比较多,基本上根据不同的消息类型就会有不同的消息处理器,并且不同消息有不同的MessageTypeAware实现类,也就是意味着不同类型的消息有不同的消息体结构,并且有不同的消息处理器,消息体类型定义在MessageType类中,对应关系如下:

    消息类型 用途 MessageTypeAware RemotingProcessor 接收处理方
    short TYPE_GLOBAL_BEGIN = 1 全局事务开启

    GlobalBeginRequest

    ServerOnRequestProcessor Server
    short TYPE_GLOBAL_BEGIN_RESULT = 2 全局事务开启结果 GlobalBeginResponse ClientOnResponseProcessor Client
    short TYPE_BRANCH_COMMIT = 3 分支事务提交 BranchCommitRequest RmBranchCommitProcessor Client
    short TYPE_BRANCH_COMMIT_RESULT = 4 分支事务提交结果 BranchCommitResponse ServerOnResponseProcessor Server
    short TYPE_BRANCH_ROLLBACK = 5 分支事务回滚 BranchRollbackRequest RmBranchRollbackProcessor Client
    short TYPE_BRANCH_ROLLBACK_RESULT = 6 分支事务回滚结果 BranchRollbackResponse ServerOnResponseProcessor Server
    short TYPE_GLOBAL_COMMIT = 7 全局事务提交 GlobalCommitRequest ServerOnRequestProcessor Server
    short TYPE_GLOBAL_COMMIT_RESULT = 8 全局事务提交结果 GlobalCommitResponse ClientOnResponseProcessor Client
    short TYPE_GLOBAL_ROLLBACK = 9 全局事务回滚 GlobalRollbackRequest ServerOnRequestProcessor Server
    short TYPE_GLOBAL_ROLLBACK_RESULT = 10 全局事务回滚结果 GlobalRollbackResponse ClientOnResponseProcessor Client
    short TYPE_BRANCH_REGISTER = 11 分支事务注册 BranchRegisterRequest ServerOnRequestProcessor Server
    short TYPE_BRANCH_REGISTER_RESULT = 12 分支事务注册结果 BranchRegisterResponse ClientOnResponseProcessor Client
    short TYPE_BRANCH_STATUS_REPORT = 13 分支事务状态报告 BranchReportRequest ServerOnRequestProcessor Server
    short TYPE_BRANCH_STATUS_REPORT_RESULT = 14 分支事务状态报告结果 BranchReportResponse ClientOnResponseProcessor Client
    short TYPE_GLOBAL_STATUS = 15 获取全局事务状态 GlobalStatusRequest ServerOnRequestProcessor Server
    short TYPE_GLOBAL_STATUS_RESULT = 16 获取全局事务状态结果 GlobalStatusResponse ClientOnResponseProcessor Client
    short TYPE_GLOBAL_REPORT = 17 全局事务报告 GlobalReportRequest ServerOnRequestProcessor Server
    short TYPE_GLOBAL_REPORT_RESULT = 18 全局事务报告结果 GlobalReportResponse ClientOnResponseProcessor Client
    short TYPE_GLOBAL_LOCK_QUERY = 21 全局事务查询锁 GlobalLockQueryRequest ServerOnRequestProcessor Server
    short TYPE_GLOBAL_LOCK_QUERY_RESULT = 22 全局事务查询锁结果 GlobalLockQueryResponse ClientOnResponseProcessor Client
    short TYPE_SEATA_MERGE = 59 seata合并 MergedWarpMessage ServerOnRequestProcessor Server
    short TYPE_SEATA_MERGE_RESULT = 60 seata合并结果 MergedResultMessage ClientOnResponseProcessor Client
    short TYPE_REG_CLT = 101 注册TM RegisterTMRequest RegTmProcessor Server
    short TYPE_REG_CLT_RESULT = 102 注册TM结果 RegisterTMResponse ClientOnResponseProcessor Client
    short TYPE_REG_RM = 103 注册RM RegisterRMRequest RegRmProcessor Server
    short TYPE_REG_RM_RESULT = 104 注册RM结果 RegisterRMResponse ClientOnResponseProcessor Client
    short TYPE_RM_DELETE_UNDOLOG = 111 RM删除undolog UndoLogDeleteRequest RmUndoLogProcessor Client
    short TYPE_HEARTBEAT_MSG = 120 心跳 HeartbeatMessage ServerHeartbeatProcessor 和 ClientHeartbeatProcessor Server 和 Client

    Seata将服务器和客户端之间通信的消息类型和处理器全部独立开,很符合设计原则中的单一职责原则和开闭原则。所以不同状态的处理逻辑就可以直接看对应的处理器即可。接下来就针对一个完整的全局事务工作流程来分析seata的工作流程

    5.1.6、注册RM

    注册RM逻辑是由客户端发送给服务器,对于的处理器是RegRmProcessor,逻辑如下:

     1 public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
     2         onRegRmMessage(ctx, rpcMessage);
     3     }
     4 
     5     private void onRegRmMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
     6         RegisterRMRequest message = (RegisterRMRequest) rpcMessage.getBody();
     7         String ipAndPort = NetUtil.toStringAddress(ctx.channel().remoteAddress());
     8         boolean isSuccess = false;
     9         String errorInfo = StringUtils.EMPTY;
    10         try {
    11             if (null == checkAuthHandler || checkAuthHandler.regResourceManagerCheckAuth(message)) {
    12                 /** 调用ChannelManager的registerRMChannel方法注册RM客户端*/
    13                 ChannelManager.registerRMChannel(message, ctx.channel());
    14                 Version.putChannelVersion(ctx.channel(), message.getVersion());
    15                 isSuccess = true;
    16                 if (LOGGER.isDebugEnabled()) {
    17                     LOGGER.debug("checkAuth for client:{},vgroup:{},applicationId:{} is OK", ipAndPort, message.getTransactionServiceGroup(), message.getApplicationId());
    18                 }
    19             }
    20         } catch (Exception exx) {
    21             isSuccess = false;
    22             errorInfo = exx.getMessage();
    23             LOGGER.error("RM register fail, error message:{}", errorInfo);
    24         }
    25         RegisterRMResponse response = new RegisterRMResponse(isSuccess);
    26         if (StringUtils.isNotEmpty(errorInfo)) {
    27             response.setMsg(errorInfo);
    28         }
    29         /** 回复RM注册结果消息 */
    30         remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), response);
    31     }

     注册RM的工作交给了ChannelManager来负责,registerRMChannel逻辑如下:

     1 public static void registerRMChannel(RegisterRMRequest resourceManagerRequest, Channel channel)
     2             throws IncompatibleVersionException {
     3         Version.checkVersion(resourceManagerRequest.getVersion());
     4         /** 获取DBKey,实际就是jdbcUrl*/
     5         Set<String> dbkeySet = dbKeytoSet(resourceManagerRequest.getResourceIds());
     6         RpcContext rpcContext;
     7         /** 如果channel没有认证过*/
     8         if (!IDENTIFIED_CHANNELS.containsKey(channel)) {
     9             /** 创建RpcContext上下文信息用于存储RM和资源信息*/
    10             rpcContext = buildChannelHolder(NettyPoolKey.TransactionRole.RMROLE, resourceManagerRequest.getVersion(),
    11                     resourceManagerRequest.getApplicationId(), resourceManagerRequest.getTransactionServiceGroup(),
    12                     resourceManagerRequest.getResourceIds(), channel);
    13             rpcContext.holdInIdentifiedChannels(IDENTIFIED_CHANNELS);
    14         } else {
    15             rpcContext = IDENTIFIED_CHANNELS.get(channel);
    16             rpcContext.addResources(dbkeySet);
    17         }
    18         if (dbkeySet == null || dbkeySet.isEmpty()) { return; }
    19         /** 缓存RM信息,采用多个集合存储RM信息*/
    20         for (String resourceId : dbkeySet) {
    21             String clientIp;
    22             ConcurrentMap<Integer, RpcContext> portMap = CollectionUtils.computeIfAbsent(RM_CHANNELS, resourceId, key -> new ConcurrentHashMap<>())
    23                     .computeIfAbsent(resourceManagerRequest.getApplicationId(), key -> new ConcurrentHashMap<>())
    24                     .computeIfAbsent(clientIp = ChannelUtil.getClientIpFromChannel(channel), key -> new ConcurrentHashMap<>());
    25 
    26             rpcContext.holdInResourceManagerChannels(resourceId, portMap);
    27             updateChannelsResource(resourceId, clientIp, resourceManagerRequest.getApplicationId());
    28         }
    29     }

    ChannelManager采用了多层集合存储了RM的信息,数据结构如下:

    /**
         * resourceId -> applicationId -> ip -> port -> RpcContext
         */
        private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS = new ConcurrentHashMap<>();

    5.1.7、注册TM

    注册TM的逻辑和注册RM的逻辑几乎一摸一样,只不过采用了不同的数据结构来存储,存储TM的数据结构如下:

    /**
         * ip+appname,port
         */
        private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>= new ConcurrentHashMap<>();

    目前为止已经注册了TM和RM,那么接下来就需要发现全局事务、注册全局事务了,而这些工作很显然就需要GlobalTransactionScanner来实现了

    5.1.8、GlobalTransactionScanner(***)

    GlobalTransactionScanner从字面意思是全局事务扫描器,定义如下:

    1 public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean

    继承了AbstractAutoProxyCreator用于给修饰的bean进行动态代理创建,实现了InitalizingBean用于初始化,实现了ApplicationContextAware用于注入ApplicationContext对象,实现DisposableBean用于销毁,那就先从初始化方法afterPropertiesSet开始分析。

    5.1.8.1.GlobalTransactionScanner初始化

    public void afterPropertiesSet() {
            if (disableGlobalTransaction) {
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)this);
                return;
            }
            /** 开启初始化状态*/
            if (initialized.compareAndSet(false, true)) {
                /** 初始化客户端 */
                initClient();
            }
        }
    
        private void initClient() {
            /** 参数校验*/
            if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
                throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
            }
            /** 初始化TM客户端 */
            TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
            /** 初始化RM客户端 */
            RMClient.init(applicationId, txServiceGroup);
            /** 注册Spring销毁钩子函数 */
            registerSpringShutdownHook();
        }

    核心逻辑就两个,分别是初始化TM客户端和RM客户端,本质就是创建Netty客户端和Netty服务器也就是TC服务器创建连接,以TM客户端为例,init方法逻辑如下:

    public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {
            /** 创建Netty客户端 */
            TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);
            /** Netty客户端初始化 */
            tmNettyRemotingClient.init();
        }
    
        public void init() {
            /** 注册消息处理器*/
            registerProcessor();
            if (initialized.compareAndSet(false, true)) {
                /** 调用父类初始化方法*/
                super.init();
            }
        }
    
        private void registerProcessor() {
            /** 注册不同消息体的处理器*/
            ClientOnResponseProcessor onResponseProcessor =
                    new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
            super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);
            super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);
            // 2.registry heartbeat message processor
            ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
            super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
        }
    
        public void init() {
            timerExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    /** 客户端重连*/
                    clientChannelManager.reconnect(getTransactionServiceGroup());
                }
            }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
            if (NettyClientConfig.isEnableClientBatchSendRequest()) {
                mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
                        MAX_MERGE_SEND_THREAD,
                        KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<>(),
                        new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
                mergeSendExecutorService.submit(new MergedSendRunnable());
            }
            super.init();
            /** 客户端监听程序开启 */
            clientBootstrap.start();
        }

    逻辑如下:

    1.创建Netty客户端表示TM客户端

    2.注册消息处理器,针对不同的消息类型注册不同的业务处理器

    3.连接TC服务器并开始监听服务器消息

    而RM的初始化过程基本上整体逻辑和TM的初始化逻辑一致,不同的是监听的消息和处理器不一样而已。

    5.1.8.2.动态代理

    GlobalTransactionScanner继承了AbstractAutoProxyCreator,而AbstractAutoProxyCreator是用于给bean创建动态代理使用的,通过wrapIfNecessary方法给bean创建动态代理,那么就可以直接看GlobalTransactionScanner的wrapIfNecessary方法即可,逻辑如下:

    @Override
        protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
            try {
                synchronized (PROXYED_SET) {
                    /** 如果代理已存在,直接返回*/
                    if (PROXYED_SET.contains(beanName)) {
                        return bean;
                    }
                    interceptor = null;
                    /** 检查是否是TCC模式的代理*/
                    if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                        //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                        interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)interceptor);
                    } else {
                        Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                        Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
                        /** 判断目标类和目标接口是否存在@GlobalTransactional 或 @GlobalLock注解 */
                        if (!existsAnnotation(new Class[]{serviceInterface})
                                && !existsAnnotation(interfacesIfJdk)) {
                            return bean;
                        }
                        if (globalTransactionalInterceptor == null) {
                            /** 创建方法拦截器 GlobalTransactionalInterceptor*/
                            globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                            ConfigurationCache.addConfigListener(
                                    ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                    (ConfigurationChangeListener)globalTransactionalInterceptor);
                        }
                        interceptor = globalTransactionalInterceptor;
                    }
    
                    LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                    if (!AopUtils.isAopProxy(bean)) {
                        /** 调用父类的创建代理的方法继续添加代理的增强逻辑 */
                        bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                    } else {
                        AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                        Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                        for (Advisor avr : advisor) {
                            advised.addAdvisor(0, avr);
                        }
                    }
                    PROXYED_SET.add(beanName);
                    return bean;
                }
            } catch (Exception exx) {
                throw new RuntimeException(exx);
            }
        }
    
    private boolean existsAnnotation(Class<?>[] classes) {
            if (CollectionUtils.isNotEmpty(classes)) {
                for (Class<?> clazz : classes) {
                    if (clazz == null) {
                        continue;
                    }
                    /** 判断是否存在@GlobalTransactional注解 */
                    GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
                    if (trxAnno != null) {
                        return true;
                    }
                    Method[] methods = clazz.getMethods();
                    for (Method method : methods) {
                        trxAnno = method.getAnnotation(GlobalTransactional.class);
                        if (trxAnno != null) {
                            return true;
                        }
                        /** 判断是否存在@GlobalLock注解*/
                        GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
                        if (lockAnno != null) {
                            return true;
                        }
                    }
                }
            }
            return false;
        }

    整体逻辑就是判断当前bean是否存在@GlobalTransactional注解或@GlobalLock注解,如果存在就创建方法拦截器GlobalTransactionalInterceptor,然后创建代理的工作交给父类AbstractAutoProxyCreator, 所以得出结论是被@GlobalTransactional注解修饰的方法执行的时候会走方法拦截器GlobalTransactionalIntercetor的invoke方法,逻辑如下:

    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
            Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
            Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
            if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
                final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
                /** 获取GlobalTransactional注解对象 */
                final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class);
                /** 获取GlobalLock注解对象 */
                final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
                boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
                if (!localDisable) {
                    if (globalTransactionalAnnotation != null) {
                        /** 如果存在GlobalTransactional注解,就执行handleGlobalTransaction方法*/
                        return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                    } else if (globalLockAnnotation != null) {
                        /** 如果存在GlobalLock注解,就执行handleGlobalLock方法*/
                        return handleGlobalLock(methodInvocation, globalLockAnnotation);
                    }
                }
            }
            /** 执行原方法逻辑*/
            return methodInvocation.proceed();
        }

    如果存在@GlobalTransactional注解最终会执行handleGlobalTransaction方法处理,逻辑如下:

    Object handleGlobalTransaction(final MethodInvocation methodInvocation,
                                       final GlobalTransactional globalTrxAnno) throws Throwable {
            boolean succeed = true;
            try {
                /** 调用事务模版的execute方法执行 */
                return transactionalTemplate.execute(new TransactionalExecutor() {
                    @Override
                    public Object execute() throws Throwable {
                        return methodInvocation.proceed();
                    }
    
                    public String name() {
                        String name = globalTrxAnno.name();
                        if (!StringUtils.isNullOrEmpty(name)) {
                            return name;
                        }
                        return formatMethod(methodInvocation.getMethod());
                    }
    
                    @Override
                    public TransactionInfo getTransactionInfo() {
                        // reset the value of timeout
                        int timeout = globalTrxAnno.timeoutMills();
                        if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
                            timeout = defaultGlobalTransactionTimeout;
                        }
    
                        TransactionInfo transactionInfo = new TransactionInfo();
                        transactionInfo.setTimeOut(timeout);
                        transactionInfo.setName(name());
                        transactionInfo.setPropagation(globalTrxAnno.propagation());
                        transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
                        transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
                        Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                        for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
                            rollbackRules.add(new RollbackRule(rbRule));
                        }
                        for (String rbRule : globalTrxAnno.rollbackForClassName()) {
                            rollbackRules.add(new RollbackRule(rbRule));
                        }
                        for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
                            rollbackRules.add(new NoRollbackRule(rbRule));
                        }
                        for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
                            rollbackRules.add(new NoRollbackRule(rbRule));
                        }
                        transactionInfo.setRollbackRules(rollbackRules);
                        return transactionInfo;
                    }
                });
            } catch (TransactionalExecutor.ExecutionException e) {
                TransactionalExecutor.Code code = e.getCode();
                switch (code) {
                    case RollbackDone:
                        throw e.getOriginalException();
                    case BeginFailure:
                        succeed = false;
                        failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                        throw e.getCause();
                    case CommitFailure:
                        succeed = false;
                        failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                        throw e.getCause();
                    case RollbackFailure:
                        failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                        throw e.getOriginalException();
                    case RollbackRetrying:
                        failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                        throw e.getOriginalException();
                    default:
                        throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
                }
            } finally {
                if (degradeCheck) {
                    EVENT_BUS.post(new DegradeCheckEvent(succeed));
                }
            }
        }
    
        public Object execute(TransactionalExecutor business) throws Throwable {
            /** 1.获取当前事务*/
            TransactionInfo txInfo = business.getTransactionInfo();
            if (txInfo == null) {
                throw new ShouldNeverHappenException("transactionInfo does not exist");
            }
            /** 2.获取当前全局事务,如果存在就表示当前角色为参与者*/
            GlobalTransaction tx = GlobalTransactionContext.getCurrent();
    
            /** 3.获取事务传播机制 */
            Propagation propagation = txInfo.getPropagation();
            SuspendedResourcesHolder suspendedResourcesHolder = null;
            try {
                switch (propagation) {
                    case NOT_SUPPORTED:
                        /** 如果当前事务存在,就暂停事务*/
                        if (existingTransaction(tx)) {
                            suspendedResourcesHolder = tx.suspend();
                        }
                        return business.execute();
                    case REQUIRES_NEW:
                        /** 如果当前事务存在,就开启新事务*/
                        if (existingTransaction(tx)) {
                            suspendedResourcesHolder = tx.suspend();
                            tx = GlobalTransactionContext.createNew();
                        }
                        break;
                    case SUPPORTS:
                        /** 如果当前事务不存在,就不使用事务*/
                        if (notExistingTransaction(tx)) {
                            return business.execute();
                        }
                        break;
                    case REQUIRED:
                        /** 默认机制,如果存在就加入;如果不存在就创建新事务*/
                        break;
                    case NEVER:
                        /** 不使用事务,如果事务存在就抛异常*/
                        if (existingTransaction(tx)) {
                            throw new TransactionException(
                                    String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
                                            , tx.getXid()));
                        } else {
                            return business.execute();
                        }
                    case MANDATORY:
                        /** 使用事务,如果事务不存在就抛异常*/ 
                        if (notExistingTransaction(tx)) {
                            throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                        }
                        break;
                    default:
                        throw new TransactionException("Not Supported Propagation:" + propagation);
                }
    
                /** 如果事务不存在,就创建新事务,当前角色为LAUNCHER表示事务发起者*/
                if (tx == null) {
                    tx = GlobalTransactionContext.createNew();
                }
                GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
    
                try {
                    /** 开启事务,必须以Launcher角色开启*/
                    beginTransaction(txInfo, tx);
    
                    Object rs;
                    try {
                        /** 执行业务逻辑*/
                        rs = business.execute();
                    } catch (Throwable ex) {
                        /** 回滚事务*/
                        completeTransactionAfterThrowing(txInfo, tx, ex);
                        throw ex;
                    }
                    /** 提交事务*/
                    commitTransaction(tx);
    
                    return rs;
                } finally {
                    /** 清理缓存信息*/
                    resumeGlobalLockConfig(previousConfig);
                    triggerAfterCompletion();
                    cleanUp();
                }
            } finally {
                // If the transaction is suspended, resume it.
                if (suspendedResourcesHolder != null) {
                    tx.resume(suspendedResourcesHolder);
                }
            }
        }

    核心逻辑是根据不同事务传播机制进行不同处理,然后开启全局事务并执行业务,根据业务执行结果进行提交或回滚事务。

    5.1.9、全局事务开启

    通过5.1.8.2得知全局事务的开启是通过beginTransaction方法开启的,逻辑如下:

    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
            try {
                /** 触发开启事务之前的钩子逻辑*/
                triggerBeforeBegin();
                /** 调用GlobalTransaction的begin方法开启事务 */
                tx.begin(txInfo.getTimeOut(), txInfo.getName());
                /** 触发开启事务之后的钩子逻辑*/
                triggerAfterBegin();
            } catch (TransactionException txe) {
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.BeginFailure);
            }
        }

    调用GlobalTransaction的begin方法开启事务,GlobalTransaction对象是5.1.8.2中GlobalTransactionContext.createNew方法创建,实际就是创建了一个DefaultGlobalTransaction对象,所以开启事务的逻辑在DefaultGlobalTransaction的begin方法中,逻辑如下:

     1 /** DefaultGlobalTransaction 开启事务*/
     2     public void begin(int timeout, String name) throws TransactionException {
     3         /** 当前角色必须是Launcher才可以开启事务*/
     4         if (role != GlobalTransactionRole.Launcher) {
     5             assertXIDNotNull();
     6             if (LOGGER.isDebugEnabled()) {
     7                 LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
     8             }
     9             return;
    10         }
    11         assertXIDNull();
    12         /** 判断当前事务是否存在,如果存在则抛异常*/
    13         String currentXid = RootContext.getXID();
    14         if (currentXid != null) {
    15             throw new IllegalStateException("Global transaction already exists," +
    16                     " can't begin a new global transaction, currentXid = " + currentXid);
    17         }
    18         /** 调用TransactionManager的begin方法开启事务,并获取XID*/
    19         xid = transactionManager.begin(null, null, name, timeout);
    20         /** 绑定全局事务ID并标记事务状态*/
    21         status = GlobalStatus.Begin;
    22         RootContext.bind(xid);
    23         if (LOGGER.isInfoEnabled()) {
    24             LOGGER.info("Begin new global transaction [{}]", xid);
    25         }
    26     }

    发现最终开启事务是通过TransactionManager类的begin方法实现的,逻辑就是创建开启全局事务请求对象发送给TC服务器,返回全局事务的ID,源码如下:

     1 /** DefaultTransactionManager 开启事务*/
     2     @Override
     3     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
     4             throws TransactionException {
     5         /** 创建开启全局事务的请求消息对象*/
     6         GlobalBeginRequest request = new GlobalBeginRequest();
     7         request.setTransactionName(name);
     8         request.setTimeout(timeout);
     9         /** 发送开启全局事务消息并获取开启结果 */
    10         GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    11         if (response.getResultCode() == ResultCode.Failed) {
    12             throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    13         }
    14         /** 返回全局事务ID*/
    15         return response.getXid();
    16     }

    根据5.1.5的消息对应关系可知,TC服务器处理开启事务消息的处理器是ServerOnRequestProcessor,最终被AbstractTCInboundHandler的handle方法处理,现在被@GlobalTransactional注解修饰的方法已经开启全局事务了,那么接下来就需要进行分支事务的注册以及分支事务执行了。

    TC服务器处理开启全局事务的逻辑是先创建开启事务结果GlobalBeginResponse对象,然后调用doGlobalBegin方法处理,逻辑如下:

     1  protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
     2             throws TransactionException {
     3         /**
     4          * 调用core.begin开启事务,并持久化
     5          * 给response设置全局事务编号XID*/
     6         response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
     7                 request.getTransactionName(), request.getTimeout()));
     8         if (LOGGER.isInfoEnabled()) {
     9             LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
    10                     rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
    11         }
    12     }
    13 
    14     /** DefaultCode的begin方法 */
    15     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    16             throws TransactionException {
    17         /** 创建全局会话*/
    18         GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
    19                 timeout);
    20         /** 日志打印*/
    21         MDC.put(RootContext.MDC_KEY_XID, session.getXid());
    22         /** 添加事务生命周期监听器*/
    23         session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    24         /** 开启事务*/
    25         session.begin();
    26 
    27         /** 发送事务开启事件*/
    28         eventBus.post(new GlobalTransactionEvent(session.getTransactionId(), GlobalTransactionEvent.ROLE_TC,
    29                 session.getTransactionName(), applicationId, transactionServiceGroup, session.getBeginTime(), null, session.getStatus()));
    30         /** 返回全局事务ID*/
    31         return session.getXid();
    32     }

    最终创建了一个GlobalSession对象并调用GlobalSession的begin方法开启事务,开启之后会激活Session生命周期监听器的onBegin方法,监听器通过sessionHolder.getRootSessionManager()获取,SessionManager有多种实现方式,可以采用Redis、File和DB几种方式来存储,默认采用数据库方式存储,所以这里就分析数据库方式存储的实现方式,实现为DataBaseSessionManager,通过TransactionStoreManager的writeSession方法进行全局事务持久化,逻辑如下:

     1 public boolean writeSession(LogOperation logOperation, SessionStorable session) {
     2         if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
     3             /** 全局事务添加*/
     4             return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
     5         } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
     6             /** 全局事务更新*/
     7             return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
     8         } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
     9             /** 全局事务移除*/
    10             return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
    11         } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
    12             /** 添加分支事务*/
    13             return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    14         } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
    15             /** 更新分支事务*/
    16             return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    17         } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
    18             /** 移除分支事务*/
    19             return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
    20         } else {
    21             throw new StoreException("Unknown LogOperation:" + logOperation.name());
    22         }
    23     }

    当全局事务和分支事务增删改时都会记录日志。

    5.1.10、分支事务注册

    由5.1.2可以seata针对数据进行了代理,采用了DataSourceProxy代理类来代理DataSource,而获取数据库连接的方法getConnection就被你DataSourceProxy重写了,返回了Connection的代理类ConnectionProxy,代码如下:

    /** DataSourceProxy 获取数据库连接*/
    @Override 
    public ConnectionProxy getConnection() throws SQLException {
            Connection targetConnection = targetDataSource.getConnection();
           /** 创建Connection对象的代理*/
            return new ConnectionProxy(this, targetConnection);
        }

    那么说明数据库事务的提交和回滚都是由ConnectionProxy来实现的。先看提交事务的commit方法,逻辑如下:

     1 @Override
     2     public void commit() throws SQLException {
     3         try {
     4             LOCK_RETRY_POLICY.execute(() -> {
     5                 /** 提交事务*/
     6                 doCommit();
     7                 return null;
     8             });
     9         } catch (SQLException e) {
    10             if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
    11                 /** 异常回滚事务*/
    12                 rollback();
    13             }
    14             throw e;
    15         } catch (Exception e) {
    16             throw new SQLException(e);
    17         }
    18     }
     1 private void doCommit() throws SQLException {
     2         /** 如果在全局事务中*/
     3         if (context.inGlobalTransaction()) {
     4             processGlobalTransactionCommit();
     5         }
     6         /** 如果在全局锁中 */
     7         else if (context.isGlobalLockRequire()) {
     8             processLocalCommitWithGlobalLocks();
     9         } else {
    10             /** 没有事务直接调用目标Connection的commit方法提交事务*/
    11             targetConnection.commit();
    12         }
    13     }
     1 /** 处理全局事务的分支事务提交*/
     2     private void processGlobalTransactionCommit() throws SQLException {
     3         try {
     4             /** 注册分支事务*/
     5             register();
     6         } catch (TransactionException e) {
     7             recognizeLockKeyConflictException(e, context.buildLockKeys());
     8         }
     9         try {
    10             /** 清理undoLog*/
    11             UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
    12             /** 目标连接提交事务*/
    13             targetConnection.commit();
    14         } catch (Throwable ex) {
    15             LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
    16             /** 上报分支事务状态*/
    17             report(false);
    18             throw new SQLException(ex);
    19         }
    20         if (IS_REPORT_SUCCESS_ENABLE) {
    21             /** 上报分支事务状态*/
    22             report(true);
    23         }
    24         /** 上下文重置*/
    25         context.reset();
    26     }

     逻辑清晰先调用register方法注册分支事务,然后调用被代理连接的commit方法提交事务,所以提交事务还是原连接的逻辑,核心是register方法的注册逻辑,逻辑如下:

    1 private void register() throws TransactionException {
    2         if (!context.hasUndoLog() || !context.hasLockKey()) {
    3             return;
    4         }
    5         /** 调用资源管理器的branchRegister方法注册分支事务 */
    6         Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
    7                 null, context.getXid(), null, context.buildLockKeys());
    8         context.setBranchId(branchId);
    9     }

    调用了资源管理器的branchRegister方法注册分支事务,最终调用AbstractResourceManager的branchRegister方法,源码如下:

     1 public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
     2         try {
     3             /** 创建注册分支请求消息对象*/
     4             BranchRegisterRequest request = new BranchRegisterRequest();
     5             request.setXid(xid);
     6             request.setLockKey(lockKeys);
     7             request.setResourceId(resourceId);
     8             request.setBranchType(branchType);
     9             request.setApplicationData(applicationData);
    10             /** 发送注册分支消息给TC,并获取注册分支的结果,得到分支ID*/
    11             BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
    12             if (response.getResultCode() == ResultCode.Failed) {
    13                 throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
    14             }
    15             return response.getBranchId();
    16         } catch (TimeoutException toe) {
    17             throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
    18         } catch (RuntimeException rex) {
    19             throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
    20         }
    21     }

    逻辑比较简单,就是创建注册分支请求的消息发送给TC服务器,然后获取注册分支事务的结果得到分支事务ID直接返回即可。TC服务器处理分支事务注册核心逻辑最终调用AbstractCore的branchRegister方法,逻辑如下:

     1 public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid,
     2                                String applicationData, String lockKeys) throws TransactionException {
     3         GlobalSession globalSession = assertGlobalSessionNotNull(xid, false);
     4         return SessionHolder.lockAndExecute(globalSession, () -> {
     5             globalSessionStatusCheck(globalSession);
     6             globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
     7             /** 创建分支会话 */
     8             BranchSession branchSession = SessionHelper.newBranchByGlobal(globalSession, branchType, resourceId,
     9                     applicationData, lockKeys, clientId);
    10             /** 日志打印*/
    11             MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(branchSession.getBranchId()));
    12             branchSessionLock(globalSession, branchSession);
    13             try {
    14                 /** 全局会话添加分支会话*/
    15                 globalSession.addBranch(branchSession);
    16             } catch (RuntimeException ex) {
    17                 branchSessionUnlock(branchSession);
    18                 throw new BranchTransactionException(FailedToAddBranch, String
    19                         .format("Failed to store branch xid = %s branchId = %s", globalSession.getXid(),
    20                                 branchSession.getBranchId()), ex);
    21             }
    22             if (LOGGER.isInfoEnabled()) {
    23                 LOGGER.info("Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
    24                         globalSession.getXid(), branchSession.getBranchId(), resourceId, lockKeys);
    25             }
    26             /** 返回分支会话的分支ID*/
    27             return branchSession.getBranchId();
    28         });
    29     }

    全局会话GlobalSession内部采用ArrayList<BranchSession> branchSessions存储分支会话记录,并且通过持久化的方式记录添加分支的记录

    5.1.11、分支事务提交

    分支事务提交的消息编码为MessageType.TYPE_BRANCH_COMMIT = 4,有兴趣同学可以直接搜索对应的处理器即可,大致交互流程差不多。

    5.1.12、全局事务提交

    全局事务的提交由TM来负责,由5.1.8.2可以最终都TransactionTemplate的commitTransaction方法实现,逻辑如下:

     1 private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
     2         try {
     3             /** 触发提交事务前置处理*/
     4             triggerBeforeCommit();
     5             /** 全局事务提交*/
     6             tx.commit();
     7             /** 触发提交事务后置处理*/
     8             triggerAfterCommit();
     9         } catch (TransactionException txe) {
    10             // 4.1 Failed to commit
    11             throw new TransactionalExecutor.ExecutionException(tx, txe,
    12                     TransactionalExecutor.Code.CommitFailure);
    13         }
    14     }

    由DefaultlGlobalTransaction的commit实现,逻辑如下:

     1 public void commit() throws TransactionException {
     2         /** 提交事务必须是Launcher角色,不可用是Participant角色*/
     3         if (role == GlobalTransactionRole.Participant) {
     4             return;
     5         }
     6         assertXIDNotNull();
     7         int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
     8         try {
     9             while (retry > 0) {
    10                 try {
    11                     /** 调用TransactionManager的commit方法提交事务*/
    12                     status = transactionManager.commit(xid);
    13                     break;
    14                 } catch (Throwable ex) {
    15                     LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
    16                     retry--;
    17                     if (retry == 0) {
    18                         throw new TransactionException("Failed to report global commit", ex);
    19                     }
    20                 }
    21             }
    22         } finally {
    23             if (xid.equals(RootContext.getXID())) {
    24                 suspend();
    25             }
    26         }
    27     }

    最终到DefaultTransactionManager的commit方法实现,逻辑如下:

    1 public GlobalStatus commit(String xid) throws TransactionException {
    2         GlobalCommitRequest globalCommit = new GlobalCommitRequest();
    3         globalCommit.setXid(xid);
    4        /** 发送全局事务提交请求,接收全局事务提交结果,返回提交状态*/
    5         GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
    6         return response.getGlobalStatus();
    7     }

    实际就是由TM向TC发送一个提交全局事务的请求,获取TC返回的结果;同理全局事务的回滚逻辑也是TM向TC发送一个全局事务回滚的请求,获取TC返回的结果即可。

    而TC接收到提交事务的通知之后,从当前持久化日志中获取全局事务信息,并获取到所有分支事务信息,遍历通知分支事务提交事务。

    全局事务回滚的逻辑和全局事务提交的逻辑如出一辙。

    5.1.13、心跳

    客户端和服务器保持着心跳保证互相确认对方没有掉线,实现原理就是通过Netty的读写空闲检测机制实现,客户端发现写空闲时间超过配置的超时时间就会主动发送心跳消息给服务器来保活。

    5.2、Seata的整体工作流程

    总结整体实现流程如下:

    5.2.1、初始化过程

    1、启动Seata Server,Seata Server是一个Netty服务器,默认监听2181端口

    2、应用项目启动,通过配置的GlobalTransacationScanner对象,在初始化该bean的时候执行init方法,分别初始化TM客户端和RM客户端,分别是TmNettyRemotingClient和RmNettyRemotingClient对象

    3、TM客户端和RM客户端和TC服务器保持通信,并通过心跳机制保持有效长连接

    4、RM初始化的时候会将自己管理的资源信息注册给TC服务器,也就是负责的数据源信息,TC采用Map<Channel,RpcContext>存储TM和RM的客户端信息

    5、客户端和服务器初始化时都会注册监听的事件编号并绑定对应的事件处理器

    6、通过代理模式分别给数据源DataSource创建代理对象DataSourceProxy,获取的Connection对象为代理对象ConnectionProxy

    5.2.2、开启事务流程

    1、通过GlobalTransactionScanner给被@GlobalTransactional注解修饰的方法创建动态代理,创建了一个方法拦截器GlobalTransactionInterceptor对象

    2、被@GlobalTransactional注解修饰的方法执行时走方法拦截器GlobalTransactionalInterceptor对象的invoke方法处理增强逻辑

    3、根据事务传播机制选择是否走事务,如果走事务就开启全局事务,由当前的TM向TC发送开启全局事务的请求

    4、TC接收开启事务请求,根据配置决定采用哪种持久化方式,可以采用Redis、DB、File等方式持久化全局事务,并返回全局事务ID

    5、TM获取到全局事务ID,存入上下文RootContext中,本质是通过ThreadLocal存储

    6、TM对应业务方调用分布式服务将XID传递过去,Dubbo采用RpcContext、HTTP采用Header的方式

    7、RM对应业务方被调用,获取XID,先注册RM到TC,然后执行本地事务并提交,并且记录undoLog到数据库,

    8、RM将本地事务提交结果通过brancheReport上报给TC

    9、TC接收分支上报结果,先更新全局事务和分支事务状态,然后进行持久化

    5.2.3、提交事务流程

    1、如果业务执行成功,TM创建全局事务提交请求并发送给TC

    2、TC接收全局事务提交请求,更新持久化的全局事务状态

    3、TC有个定时任务,发送删除UndoLog的请求给所有RM用于清理指定时间段内的UndoLog

    4、RM接收到删除UndoLog请求,删除指定时间段内的UndoLog,默认是删除超过7天的

    5.2.4、回滚事务流程

    1、如果业务执行失败,TM创建全局事务回滚请求并发送给TC

    2、TC接收全局事务回滚请求,更新持久化的全局事务状态

    3、TC根据全局事务获取全部分支事务,遍历通知分支事务回滚事务,仅通知分支事务提交成功的,对于分支事务提交失败的直接删除;

    4、分支事务从数据库查询undoLog日志,并执行undoLog日志中的命令进行回滚操作

    5、分支事务回滚成功返回分支回滚状态给TC服务器

    6、回滚成功则TC删除分支事务,

    7、如果回滚失败则会将回滚任务加入队列重试回滚

    5.2.5、整体交互流程图

    5.2.6、实现细节梳理

    1、服务器注册TM和RM的逻辑处理?

    业务系统启动时通过GlobalTransactionScanner初始化方法afterPropertiesSet,初始化TMClient和RMClient
    实际是分别创建两个Netty客户端TmNettyRemotingClient和RmNettyRemotingClient对象和TC服务器创建连接,并分别发送TM和RM注册的指令给TC
    TC处理TM和RM注册的逻辑主要如下:
    1.创建RpcContext对象封装TM客户端;
    2.创建客户端Channel和RpcContext存储已经认证的<Channel,RpcContext> IDENTIFIED_CHANNELS集合中缓存;
    3.客户端认证ID为 applicationId:clientIp
    4.TC采用ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS 存储所有TM信息,key是客户端认证ID,value是端口号和RpcContext的关联信息
    5.TC采用ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>>> RM_CHANNELS 存储所有RM信息
    存储RM信息的Map有多层,每一层的key依次为resourceId -> applicationId -> ip -> port -> RpcContext
    6.所以TM和RM注册的过程实际就是在缓存集合中存储对应的RpcContext的过程

    2、服务器注册全局事务的逻辑处理?

    被@GlobalTransactional注解修饰的方法执行时会被方法拦截器拦截,会发起全局事务开启请求,获取服务器返回的XID,绑定到全局上下文RootContext中,
    全局上下文RootContext采用key为TX_XID存储XID,实际就是采用ThreadLocal<Map<key,value>>形式存储
    服务器接收TM发起全局事务请求之后处理逻辑如下:
    1.创建GlobalSession对象表示全局事务
    2.根据持久化配置选择redis、file、DB来存储全局事务信息
    3.持久化全局事务信息包括XID、status、transactionId、transactionName、transactionServiceGroup、beginTime、applicationId等
    4.XID=ipAddress:port:transactionId
    5.返回全局事务开启结果,返回XID

    3、服务器注册分支事务的逻辑处理?

    1.服务器接收分支注册请求,先锁住全局事务会话GlobalSession
    2.根据GlobalSession创建分支事务会话BranchSession
    3.GlobalSession采用ArrayList存储branchSession
    4.将分支事务信息存储到分支事务表中
    5.返回分支事务注册结果并返回分支事务ID

    4、分支事务上报状态的逻辑处理?

    Seata采用DataSourceProxy代理DataSource,并通过ConnectionProxy代理Connection,并PreparedStatement也采用了代理PreparedStatementProxy,
    分支执行数据库操作时从上下文RootContext获取XID,如果存在全局事务XID,则执行完SQL语句之后,执行ConnectionProxy的commit方法,commit方法逻辑如下:
    1.ConnectionProxy会先注册分支事务,获取分支事务ID;
    2.记录undoLog存储在数据库
    3.然后提交本地事务
    4.上报本地事务提交的状态
    5.TC服务器更新分支事务上报的状态

    5、undoLog的使用?

    本地事务执行完成成会注册分支事务,并且本地记录undoLog,当全局事务提交成功,TC会采用定时任务通知RM删除undoLog,如果全局事务回滚,TC会通知RM进行本地事务回滚,本地事务从数据库中查询undoLog进行数据回滚
    undoLog日志清理策略有三种:
    1、TC服务器会创建定时任务每天通知一次所以RM删除7天以前的undoLog;
    2、RM有定时任务1秒执行一次,删除所有二阶段提交成功的分支事务相关的undoLog
    3、分支事务回滚后执行完undo逻辑之后会删除undoLog

    6、TC如何存储全局事务信息?

    TC可以采用redis、file、DB等方式存储全局事务xixi

    7、全局事务XID如何传递?

    业务发起方发起全局事务获取全局事务XID,调用RM服务时,会通过请求将XID传递给服务提供者,服务提供者通过Filter或Interceptor获取XID并通过ThreadLocal存在本地的RootContext上下文中
    如Dubbo采用RpcContext传递,HTTP请求采用Header传递

    8、异常回滚的逻辑处理?

    8.1、分支事务正常,业务系统异常导致回滚?

    业务系统TM会发起全局事务回滚请求给TC,TC再遍历所有提交成功的分支事务,发生分支事务回滚通知

    8.2、分支事务异常导致全局事务回滚?

    分支事务异常会上报状态给TC,当TM发起全局事务回滚时,TC遍历所有提交成功的分支事务,发生分支事务回滚通知;执行失败的分支事务不需要再进行回滚;

    8.3、全局事务回滚时,分支事务回滚失败如果解决?

    分支事务有undoLog记录,当分支事务回滚失败时,TC会每秒重试通知一次回滚

    8.4、全局事务回滚时,TC异常断开如何恢复?

    TC重启之后会再次通知需要回滚的分支事务进行回滚

    8.5、全局事务回滚时,分支事务相关数据已经被修改如何回滚?

    当分支事务需要回滚时,如果此时数据以及被其他事务修改,那么此时就会回滚失败,提示当前数据和期望数据不一致。

  • 相关阅读:
    nodejs 的序列化与反序列化
    Visual Studio 监视与快速监视即时窗口没有智能提示
    mysql 备份数据语句
    mysql 导入sql 2006
    MySql.Data.MySqlClient.MySqlException (0x80004005): Unable to connect to any of the specified MySQL hosts.
    怎么查看mysql的安装目录
    【支付宝】退款接口 报 “缺少签名参数”
    【支付宝】"验签出错,sign值与sign_type参数指定的签名类型不一致:sign_type参数值为RSA,您实际用的签名类型可能是RSA2"
    【支付宝】支付 系统繁忙,请稍后再试(ALIN10146)
    php插入日志到数据库,对象转json
  • 原文地址:https://www.cnblogs.com/jackion5/p/14788377.html
Copyright © 2011-2022 走看看