zoukankan      html  css  js  c++  java
  • 微服务痛点-基于Dubbo + Seata的分布式事务(AT)模式

    前言

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。对于Seata不太了解的朋友,可以看下我之前写的文章: 微服务开发的最大痛点-分布式事务SEATA入门简介

    AT模式

    AT模式怎么理解

    AT模式下,每个数据库被当做是一个Resource,Seata 里称为 DataSource Resource。业务通过 JDBC 标准接口访问数据库资源时,Seata 框架会对所有请求进行拦截,做一些操作。

    每个本地事务提交时,Seata RM(Resource Manager,资源管理器) 都会向 TC(Transaction Coordinator,事务协调器) 注册一个分支事务。当请求链路调用完成后,发起方通知 TC 提交或回滚分布式事务,进入二阶段调用流程。此时,TC 会根据之前注册的分支事务回调到对应参与者去执行对应资源的第二阶段。

    TC 是怎么找到分支事务与资源的对应关系呢?每个资源都有一个全局唯一的资源 ID,并且在初始化时用该 ID 向 TC 注册资源。在运行时,每个分支事务的注册都会带上其资源 ID。这样 TC 就能在二阶段调用时正确找到对应的资源。这就是我们的 AT 模式。简单总结一下,就是把每个数据库当做一个 Resource,在本地事务提交时会去注册一个分支事务。

    AT模式是一种无侵入的分布式事务解决方案。在AT模式下,用户只需关注自己的"业务SQL",用户的"业务SQL"作为第一阶段,Seata框架会自动生成事务的二阶段提交和回滚操作。

    AT模式如何做到对业务的无侵入

    • 一阶段:

    在一阶段,Seata 会拦截“业务 SQL”,首先解析 SQL 语义,找到“业务 SQL”要更新的业务数据,在业务数据被更新前,将其保存成“before image”,然后执行“业务 SQL”更新业务数据,在业务数据更新之后,再将其保存成“after image”,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

    • 二阶段提交:

    二阶段如果是提交的话,因为“业务 SQL”在一阶段已经提交至数据库, 所以 Seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。

    • 二阶段回滚:

    二阶段如果是回滚的话,Seata 就需要回滚一阶段已经执行的“业务 SQL”,还原业务数据。回滚方式便是用“before image”还原业务数据;但在还原前要首先要校验脏写,对比“数据库当前业务数据”和 “after image”,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转人工处理。

    AT 模式的一阶段、二阶段提交和回滚均由 Seata 框架自动生成,用户只需编写“业务 SQL”,便能轻松接入分布式事务,AT 模式是一种对业务无任何侵入的分布式事务解决方案。

    当然官网对AT模式也进行了细致的讲解, 大家可以看下Seata官网的Seata AT模式

    Dubbo + Seata 实战案例

    环境准备

    Dubbo

    docker-compose.yaml:

    version: '3'
    
    services:
      zookeeper:
        image: zookeeper
        ports:
          - 2181:2181
      admin:
        image: apache/dubbo-admin
        depends_on:
          - zookeeper
        ports:
          - 8080:8080
        environment:
          - admin.registry.address=zookeeper://zookeeper:2181
          - admin.config-center=zookeeper://zookeeper:2181
          - admin.metadata-report.address=zookeeper://zookeeper:2181
    

    Seata

    docker-compose.yaml:

    version: "3"
    services:
      seata-server:
        image: seataio/seata-server
        hostname: seata-server
        ports:
          - "8091:8091"
        environment:
          - SEATA_PORT=8091
          - STORE_MODE=file
    

    MySQL

    docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
    

    目录结构

    • Storage : 商品库存逻辑模块;
    • Account: 用户账户逻辑模块;
    • Order: 商品订单逻辑模块;
    • Business: 业务层逻辑模块;

    下面我通过Storage模块来描述Dubbo + Seata的接入,其他模块,例如account, order模块的接入都是相同的。

    Storage商品库存模块

    项目目录

    .
    ├── java
    │   └── cn
    │       └── mushuwei
    │           └── storage
    │               ├── SeataStorageApplication.java #应用SpringBoot启动类
    │               ├── api
    │               │   ├── StorageApi.java #库存调用Dubbo接口
    │               │   └── dto
    │               │       └── CommodityDTO.java #库存数据传输类
    │               ├── config
    │               │   └── SeataAutoConfig.java #Seata配置类
    │               ├── dao
    │               │   └── StorageDao.java #库存持久化类
    │               ├── entity
    │               │   └── StorageDO.java #库存持久化实体
    │               ├── provider
    │               │   └── StorageApiImpl.java #库存调用Dubbo接口实现类
    │               └── service
    │                   ├── StorageService.java #库存业务操作逻辑类
    │                   └── impl
    │                       └── StorageServiceImpl.java #库存业务操作逻辑实现类
    └── resources
        ├── application.yml #应用配置文件
        ├── mybatis
        │   └── storage.xml #mybatis xml文件
        └── sql
            └── storage.sql #数据库表结构和初始化数据
    
    15 directories, 12 files
    
    

    Pom.xml

            <!-- 日志相关 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>log4j-over-slf4j</artifactId>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>jul-to-slf4j</artifactId>
            </dependency>
    				
    	<!-- web服务相关 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    	<!-- mysql数据库连接 -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
    
    	<!-- dubbo微服务框架 -->
            <dependency>
                <groupId>org.apache.dubbo</groupId>
                <artifactId>dubbo-spring-boot-starter</artifactId>
            </dependency>
    
            <!-- 使用 Zookeeper 作为注册中心 -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
            </dependency>
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
            </dependency>
            <!-- seata 相关依赖-->
            <dependency>
                <groupId>io.seata</groupId>
                <artifactId>seata-spring-boot-starter</artifactId>
            </dependency>
    

    应用配置文件

    # dubbo配置项,对应DubboConfigurationProperties 配置类
    dubbo:
      application:
        name: ${spring.application.name} #应用名
      registry:
        address: zookeeper://127.0.0.1:2181 #注册中心地址
        timeout: 1000 # 指定注册到zk上超时时间,ms
      protocol:
        port: 20881 # 协议端口。使用 -1表示随机端口
        name: dubbo # 使用 `dubbo://` 协议。更多协议,可见 http://dubbo.apache.org/zh-cn/docs/user/references/protocol/introduction.html 文档
      scan:
        base-packages: cn.mushuwei.storage # 指定实现服务的包
    server:
      port: 8081
    
    
    #数据源配置
    spring:
      datasource:
        driver-class-name: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://localhost:3306/storage?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&useSSL=false
        username: root
        password: 123456
        type: com.alibaba.druid.pool.DruidDataSource
    
      application:
        name: seata-action-storage #应用名
    
    # seata相关配置
    seata:
      service:
        grouplist:
          default: 127.0.0.1:8091
        vgroup-mapping:
          service_tx_group: default
          enable-degrade: false
          disable-global-transaction: false
      enabled: true
      application-id: ${spring.application.name}
      tx-service-group: service_tx_group
      client:
        tm:
          commit-retry-count: 3
          rollback-retry-count: 3
          enable-auto-data-source-proxy: false
        rm:
          report-success-enable: true
          table-meta-check-enable: true
          report-retry-count: 5
          async-commit-buffer-limit: 1000
      transport: # Netty相关配置start
        type: TCP
        server: NIO
        heartbeat: true
        serialization: seata
        compressor: none
        enable-client-batch-send-request: true #客户端事务消息请求是否批量合并发送(默认true)
        shutdown:
          wait: 3
        thread-factory:
          boss-thread-prefix: NettyBoss
          worker-thread-prefix: NettyServerNIOWorker
          server-executor-thread-prefix: NettyServerBizHandler
          share-boss-worker: false
          client-selector-thread-prefix: NettyClientSelector
          client-selector-thread-size: 1
          client-worker-thread-prefix: NettyClientWorkerThread
    
    #数据库sql操作打印日志
    logging:
      level:
        cn.mushuwei.storage.dao: debug
    

    创建表结构和初始化数据

    # 创建商品库存表
    create table if not exists storage.storage
    (
    	id bigint auto_increment
    		primary key,
    	commodity_code varchar(50) null comment '商品编码',
    	name varchar(255) null comment '商品名称',
    	count int null comment '商品库存数'
    );
    INSERT INTO storage.storage (id, commodity_code, name, count) VALUES (1, 'cola', '可口可乐', 2000);
    
    # 新建undo_log表
    create table if not exists storage.undo_log
    (
        id bigint auto_increment
            primary key,
        branch_id bigint not null,
        xid varchar(100) not null,
        context varchar(128) not null,
        rollback_info longblob not null,
        log_status int not null,
        log_created datetime not null,
        log_modified datetime not null,
        ext varchar(100) null,
        constraint ux_undo_log
            unique (xid, branch_id)
    )
        charset=utf8;
    

    将上面的sql文件导入到新建的storage数据库中。这个文件地址在resources/sql 下。

    Seata配置类

    package cn.mushuwei.storage.config;
    
    import com.alibaba.druid.pool.DruidDataSource;
    import io.seata.rm.datasource.DataSourceProxy;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
    import org.mybatis.spring.SqlSessionFactoryBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
    
    /**
     * @author jamesmsw
     * @date 2020/12/1 11:06 上午
     */
    @Configuration
    public class SeataAutoConfig {
    
    
        /**
         * autowired datasource config
         */
        @Autowired
        private DataSourceProperties dataSourceProperties;
    
        /**
         * init durid datasource
         *
         * @Return: druidDataSource  datasource instance
         */
        @Bean
        @Primary
        public DruidDataSource druidDataSource(){
            DruidDataSource druidDataSource = new DruidDataSource();
            druidDataSource.setUrl(dataSourceProperties.getUrl());
            druidDataSource.setUsername(dataSourceProperties.getUsername());
            druidDataSource.setPassword(dataSourceProperties.getPassword());
            druidDataSource.setDriverClassName(dataSourceProperties.getDriverClassName());
            druidDataSource.setInitialSize(0);
            druidDataSource.setMaxActive(180);
            druidDataSource.setMaxWait(60000);
            druidDataSource.setMinIdle(0);
            druidDataSource.setValidationQuery("Select 1 from DUAL");
            druidDataSource.setTestOnBorrow(false);
            druidDataSource.setTestOnReturn(false);
            druidDataSource.setTestWhileIdle(true);
            druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
            druidDataSource.setMinEvictableIdleTimeMillis(25200000);
            druidDataSource.setRemoveAbandoned(true);
            druidDataSource.setRemoveAbandonedTimeout(1800);
            druidDataSource.setLogAbandoned(true);
            return druidDataSource;
        }
    
        /**
         * init datasource proxy
         * @Param: druidDataSource  datasource bean instance
         * @Return: DataSourceProxy  datasource proxy
         */
        @Bean
        public DataSourceProxy dataSourceProxy(DruidDataSource druidDataSource){
            return new DataSourceProxy(druidDataSource);
        }
    
        /**
         * init mybatis sqlSessionFactory
         * @Param: dataSourceProxy  datasource proxy
         * @Return: DataSourceProxy  datasource proxy
         */
        @Bean
        public SqlSessionFactory sqlSessionFactory(DataSourceProxy dataSourceProxy) throws Exception {
            SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
            factoryBean.setDataSource(dataSourceProxy);
            factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver()
                    .getResources("classpath*:/mybatis/*.xml"));
            factoryBean.setTransactionFactory(new JdbcTransactionFactory());
            return factoryBean.getObject();
        }
    }
    
    

    持久化操作

    1. StorageDao
    package cn.mushuwei.storage.dao;
    
    import org.apache.ibatis.annotations.Param;
    import org.springframework.stereotype.Repository;
    
    /**
     * @author jamesmsw
     * @date 2020/11/30 7:46 下午
     */
    @Repository("storageDao")
    public interface StorageDao {
    
        /**
         * 扣减商品库存
         *
         * @param commodityCode 商品code
         * @param count 扣减数量
         * @return
         */
        int decreaseStorage(@Param("commodityCode") String commodityCode, @Param("count") Integer count);
    }
    
    
    1. Storage.xml
    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="cn.mushuwei.storage.dao.StorageDao">
    
        <update id="decreaseStorage">
            update storage set count = count - #{count} where commodity_code = #{commodityCode}
        </update>
    </mapper>
    

    到此为止,商品库存操作逻辑,就大致介绍完毕了,其他Account模块是扣减用户余额的操作,Order模块是新建订单数据的,具体配置和上述描述的差不懂。

    Business业务逻辑操作

    package cn.mushuwei.business.controller;
    
    import cn.mushuwei.business.dto.BusinessDTO;
    import cn.mushuwei.business.service.BusinessService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * @author jamesmsw
     * @date 2020/12/1 9:48 下午
     */
    @RestController
    @RequestMapping("/business")
    @Slf4j
    public class BusinessController {
    
        @Resource(name = "businessService")
        private BusinessService businessService;
    
        @PostMapping("/buy")
        public String handleBusiness(@RequestBody BusinessDTO businessDTO){
            log.info("请求参数:{}",businessDTO.toString());
            Boolean result = businessService.handleBusiness(businessDTO);
            if (result) {
                return "ok";
            }
            return "fail";
        }
    }
    
    

    business模块中,我们对外暴露接口/business/buy,用于给用户进行下单操作。

    业务逻辑处理

    package cn.mushuwei.business.service.impl;
    
    import cn.mushuwei.business.dto.BusinessDTO;
    import cn.mushuwei.business.service.BusinessService;
    import cn.mushuwei.order.api.OrderApi;
    import cn.mushuwei.order.api.dto.OrderDTO;
    import cn.mushuwei.storage.api.StorageApi;
    import cn.mushuwei.storage.api.dto.CommodityDTO;
    import io.seata.core.context.RootContext;
    import io.seata.spring.annotation.GlobalTransactional;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.dubbo.config.annotation.DubboReference;
    import org.springframework.stereotype.Service;
    
    /**
     * @author jamesmsw
     * @date 2020/12/1 9:37 下午
     */
    @Slf4j
    @Service("businessService")
    public class BusinessServiceImpl implements BusinessService {
    
        @DubboReference
        private StorageApi storageApi;
    
        @DubboReference
        private OrderApi orderApi;
    
        private boolean flag;
    
        @Override
        @GlobalTransactional(timeoutMills = 300000, name = "seata-demo-business")
        public Boolean handleBusiness(BusinessDTO businessDTO) {
            flag = true;
            log.info("开始全局事务,XID = " + RootContext.getXID());
            CommodityDTO commodityDTO = new CommodityDTO();
            commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
            commodityDTO.setCount(businessDTO.getCount());
            boolean storageResult =  storageApi.decreaseStorage(commodityDTO);
    
            OrderDTO orderDTO = new OrderDTO();
            orderDTO.setUserId(businessDTO.getUserId());
            orderDTO.setCommodityCode(businessDTO.getCommodityCode());
            orderDTO.setOrderCount(businessDTO.getCount());
            orderDTO.setOrderAmount(businessDTO.getAmount());
            boolean orderResult = orderApi.createOrder(orderDTO);
    
            //打开注释测试事务发生异常后,全局回滚功能
    //        if (!flag) {
    //            throw new RuntimeException("测试抛异常后,分布式事务回滚!");
    //        }
    
            if (!storageResult || !orderResult) {
                throw new RuntimeException("失败");
            }
            return true;
        }
    }
    
    
    • 我们使用@DubboReference分布调用storageApiorderApi, 用于处理库存扣减和订单数据逻辑的操作。
    • @GlobalTransactional()在发起业务类中是必须要加的,用于全局锁等逻辑操作。

    下单正常流程

    第一阶段:在正常的下单流程中,storage、order、account和business应用分别注册到Seata这个事务协调器上,当用户进行下单时,数据更新前后的日志将会别记录到每个数据库下的undo_log表中,并形成一个全局的锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

    第二阶段: 二阶段如果是提交的话,因为“业务 SQL”在一阶段已经提交至数据库, 所以 Seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。

    下单异常流程

    第一阶段:在一阶段下单流程中,storage、order、account和business应用分别注册到Seata这个事务协调器上,当用户进行下单时,数据更新前后的日志将会别记录到每个数据库下的undo_log表中,并形成一个全局的锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。

    第二阶段: 当下单出现异常时,Seata将会对数据进行回滚,回滚的逻辑是按照一阶段的日志。

    演示

    1. 启动Dubbo、Seata、MySQ并初始化数据, 使各服务应用注册到Seata上。
      • Dubbo、Seata和MySQL服务
    mushuwei@mushuweideMacBook-Pro-2 seata % docker ps
    CONTAINER ID        IMAGE                  COMMAND                  CREATED             STATUS              PORTS                                                  NAMES
    0c9c325a039c        mysql:latest           "docker-entrypoint.s…"   2 weeks ago         Up 7 minutes        0.0.0.0:3306->3306/tcp, 33060/tcp                      mysql5.7
    b8031fa865cd        seataio/seata-server   "java -Djava.securit…"   2 weeks ago         Up 20 seconds       0.0.0.0:8091->8091/tcp                                 seata_seata-server_1
    2af927368a15        apache/dubbo-admin     "java -XX:+UnlockExp…"   2 weeks ago         Up 2 hours          0.0.0.0:8080->8080/tcp                                 dubbo_admin_1
    7afec07234c9        zookeeper              "/docker-entrypoint.…"   2 weeks ago         Up 2 hours          2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp   dubbo_zookeeper_1
    
    • 初始化数据
    mysql> use storage;
    Reading table information for completion of table and column names
    You can turn off this feature to get a quicker startup with -A
    
    Database changed
    mysql> select * from storage;
    +----+----------------+------+-------+
    | id | commodity_code | name | count |
    +----+----------------+------+-------+
    |  1 | cola           | ???? |  2000 |
    +----+----------------+------+-------+
    1 row in set (0.00 sec)
    
    mysql> use account;
    Reading table information for completion of table and column names
    You can turn off this feature to get a quicker startup with -A
    
    Database changed
    mysql> select * from account;
    +----+---------+---------+
    | id | user_id | amount  |
    +----+---------+---------+
    |  1 | user123 | 1250.00 |
    +----+---------+---------+
    1 row in set (0.00 sec)
    
    mysql> use order;
    Reading table information for completion of table and column names
    You can turn off this feature to get a quicker startup with -A
    
    Database changed
    mysql> select * from order;
    ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'order' at line 1
    mysql> select * from `order`;
    Empty set (0.00 sec)
    
    • 启动Storage、Account、Order和Business

    • Seata上各应用的注册情况

      Starting seata_seata-server_1 ... done
      Attaching to seata_seata-server_1
      seata-server_1  | [0.001s][warning][gc] -Xloggc is deprecated. Will use -Xlog:gc:/var/log/seata_gc.log instead.
      seata-server_1  | [0.015s][info   ][gc] Using G1
      seata-server_1  | [0.841s][info   ][gc] GC(0) Pause Young (Normal) (G1 Evacuation Pause) 14M->4M(32M) 11.654ms
      seata-server_1  | SLF4J: A number (18) of logging calls during the initialization phase have been intercepted and are
      seata-server_1  | SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
      seata-server_1  | SLF4J: See also http://www.slf4j.org/codes.html#replay
      seata-server_1  | 08:16:30.938  INFO --- [                     main] io.seata.server.Server                   : The server is running in container.
      seata-server_1  | 08:16:30.972  INFO --- [                     main] io.seata.config.FileConfiguration        : The file name of the operation is registry
      seata-server_1  | 08:16:30.980  INFO --- [                     main] io.seata.config.FileConfiguration        : The configuration file used is /seata-server/resources/registry.conf
      seata-server_1  | [1.385s][info   ][gc] GC(1) Pause Young (Normal) (G1 Evacuation Pause) 15M->6M(32M) 14.280ms
      seata-server_1  | 08:16:31.221  INFO --- [                     main] io.seata.config.FileConfiguration        : The file name of the operation is file.conf
      seata-server_1  | 08:16:31.222  INFO --- [                     main] io.seata.config.FileConfiguration        : The configuration file used is file.conf
      seata-server_1  | WARNING: An illegal reflective access operation has occurred
      seata-server_1  | WARNING: Illegal reflective access by net.sf.cglib.core.ReflectUtils$2 (file:/seata-server/libs/cglib-3.1.jar) to method java.lang.ClassLoader.defineClass(java.lang.String,byte[],int,int,java.security.ProtectionDomain)
      seata-server_1  | WARNING: Please consider reporting this to the maintainers of net.sf.cglib.core.ReflectUtils$2
      seata-server_1  | WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
      seata-server_1  | WARNING: All illegal access operations will be denied in a future release
      seata-server_1  | [1.734s][info   ][gc] GC(2) Pause Young (Normal) (G1 Evacuation Pause) 16M->7M(32M) 6.400ms
      seata-server_1  | [2.101s][info   ][gc] GC(3) Pause Young (Normal) (G1 Evacuation Pause) 18M->7M(32M) 4.828ms
      seata-server_1  | 08:16:31.924  INFO --- [                     main] i.s.core.rpc.netty.NettyServerBootstrap  : Server started, listen port: 8091
      seata-server_1  | 08:26:12.007  INFO --- [rverHandlerThread_1_1_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/storage', applicationId='seata-action-storage', transactionServiceGroup='service_tx_group'},channel:[id: 0xae1ea1b1, L:/172.20.0.2:8091 - R:/172.20.0.1:52380],client version:1.3.0
      seata-server_1  | 08:26:12.080  INFO --- [rverHandlerThread_1_2_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/storage', applicationId='seata-action-storage', transactionServiceGroup='service_tx_group'},channel:[id: 0xae1ea1b1, L:/172.20.0.2:8091 - R:/172.20.0.1:52380],client version:1.3.0
      seata-server_1  | 08:26:33.704  INFO --- [rverHandlerThread_1_3_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/account', applicationId='seata-action-account', transactionServiceGroup='service_tx_group'},channel:[id: 0xd949a994, L:/172.20.0.2:8091 - R:/172.20.0.1:52396],client version:1.3.0
      seata-server_1  | 08:26:33.758  INFO --- [rverHandlerThread_1_4_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/account', applicationId='seata-action-account', transactionServiceGroup='service_tx_group'},channel:[id: 0xd949a994, L:/172.20.0.2:8091 - R:/172.20.0.1:52396],client version:1.3.0
      seata-server_1  | 08:26:57.466  INFO --- [rverHandlerThread_1_5_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/order', applicationId='seata-action-order', transactionServiceGroup='service_tx_group'},channel:[id: 0xfd51f88b, L:/172.20.0.2:8091 - R:/172.20.0.1:52412],client version:1.3.0
      seata-server_1  | 08:26:57.518  INFO --- [rverHandlerThread_1_6_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://localhost:3306/order', applicationId='seata-action-order', transactionServiceGroup='service_tx_group'},channel:[id: 0xfd51f88b, L:/172.20.0.2:8091 - R:/172.20.0.1:52412],client version:1.3.0
      seata-server_1  | 08:27:10.600  INFO --- [ettyServerNIOWorker_1_4_8] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='seata-action-storage', transactionServiceGroup='service_tx_group'},channel:[id: 0x0e0b6c24, L:/172.20.0.2:8091 - R:/172.20.0.1:52424],client version:1.3.0
      seata-server_1  | 08:27:32.694  INFO --- [ettyServerNIOWorker_1_5_8] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='seata-action-account', transactionServiceGroup='service_tx_group'},channel:[id: 0x2fd20474, L:/172.20.0.2:8091 - R:/172.20.0.1:52432],client version:1.3.0
      seata-server_1  | 08:27:56.453  INFO --- [ettyServerNIOWorker_1_6_8] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='seata-action-order', transactionServiceGroup='service_tx_group'},channel:[id: 0xc8f6ba94, L:/172.20.0.2:8091 - R:/172.20.0.1:52436],client version:1.3.0
      seata-server_1  | 08:28:15.847  INFO --- [rverHandlerThread_1_7_500] i.s.c.r.processor.server.RegRmProcessor  : RM register success,message:RegisterRMRequest{resourceIds='null', applicationId='seata-action-business', transactionServiceGroup='service_tx_group'},channel:[id: 0x9ef75d68, L:/172.20.0.2:8091 - R:/172.20.0.1:52444],client version:1.3.0
      seata-server_1  | 08:28:15.863  INFO --- [ettyServerNIOWorker_1_7_8] i.s.c.r.processor.server.RegTmProcessor  : TM register success,message:RegisterTMRequest{applicationId='seata-action-business', transactionServiceGroup='service_tx_group'},channel:[id: 0x2b6c19d5, L:/172.20.0.2:8091 - R:/172.20.0.1:52440],client version:1.3.0
      
    1. 检查各服务Service在Dubbo上的情况。

    1. 正常流程-模拟用户下单,看下各应用的二阶段提交日志。
    • 执行business模块test/java目录下的business.http文件,对接口发起请求。
    Content-Type: application/json
    
    {
        "userId" : "user123",
        "commodityCode" : "cola",
        "count" : 2,
        "amount" : 5.0
    }
    
    • 各数据库数据变化

      mysql> use storage;
      Reading table information for completion of table and column names
      You can turn off this feature to get a quicker startup with -A
      
      Database changed
      mysql> select * from storage;
      +----+----------------+------+-------+
      | id | commodity_code | name | count |
      +----+----------------+------+-------+
      |  1 | cola           | ???? |  1998 |
      +----+----------------+------+-------+
      1 row in set (0.00 sec)
      
      mysql> use account;
      Reading table information for completion of table and column names
      You can turn off this feature to get a quicker startup with -A
      
      Database changed
      mysql> select * from account;
      +----+---------+---------+
      | id | user_id | amount  |
      +----+---------+---------+
      |  1 | user123 | 1245.00 |
      +----+---------+---------+
      1 row in set (0.00 sec)
      
      mysql> use order;
      Reading table information for completion of table and column names
      You can turn off this feature to get a quicker startup with -A
      
      Database changed
      mysql> select * from order;
      ERROR 1064 (42000): You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'order' at line 1
      mysql> select * from `order`;
      +----+----------------------------------+---------+------+-------+--------+
      | id | order_no                         | user_id | code | count | amount |
      +----+----------------------------------+---------+------+-------+--------+
      |  5 | dbde6ebfd72b4ad5aeba67d67ade6894 | user123 | cola |     2 |   5.00 |
      +----+----------------------------------+---------+------+-------+--------+
      1 row in set (0.00 sec)
      
      
    • 各应用下二阶段提交情况,下面日志以Storage应用为例。

      2020-12-21 16:35:41.357  INFO 5123 --- [:20881-thread-2] c.m.storage.provider.StorageApiImpl      : storage-全局事务,XID = 172.20.0.2:8091:84324557325869056
      2020-12-21 16:35:41.431  INFO 5123 --- [:20881-thread-2] i.s.c.rpc.netty.RmNettyRemotingClient    : will register resourceId:jdbc:mysql://localhost:3306/storage
      2020-12-21 16:35:41.440  INFO 5123 --- [ctor_RMROLE_1_1] io.seata.rm.AbstractRMHandler            : the rm client received response msg [version=1.5.0-SNAPSHOT,extraData=null,identified=true,resultCode=null,msg=null] from tc server.
      2020-12-21 16:35:41.444 DEBUG 5123 --- [:20881-thread-2] c.m.s.dao.StorageDao.decreaseStorage     : ==>  Preparing: update storage set count = count - ? where commodity_code = ?
      2020-12-21 16:35:41.535 DEBUG 5123 --- [:20881-thread-2] c.m.s.dao.StorageDao.decreaseStorage     : ==> Parameters: 2(Integer), cola(String)
      2020-12-21 16:35:41.665 DEBUG 5123 --- [:20881-thread-2] c.m.s.dao.StorageDao.decreaseStorage     : <==    Updates: 1
      2020-12-21 16:35:43.345  INFO 5123 --- [h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:xid=172.20.0.2:8091:84324557325869056,branchId=84324559649513472,branchType=AT,resourceId=jdbc:mysql://localhost:3306/storage,applicationData=null
      2020-12-21 16:35:43.348  INFO 5123 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch committing: 172.20.0.2:8091:84324557325869056 84324559649513472 jdbc:mysql://localhost:3306/storage null
      2020-12-21 16:35:43.349  INFO 5123 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed
      2020-12-21 16:35:43.369  INFO 5123 --- [h_RMROLE_1_2_16] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:xid=172.20.0.2:8091:84324557325869056,branchId=84324560404488192,branchType=AT,resourceId=jdbc:mysql://localhost:3306/storage,applicationData=null
      2020-12-21 16:35:43.369  INFO 5123 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch committing: 172.20.0.2:8091:84324557325869056 84324560404488192 jdbc:mysql://localhost:3306/storage null
      2020-12-21 16:35:43.369  INFO 5123 --- [h_RMROLE_1_2_16] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed
      2020-12-21 16:35:43.378  INFO 5123 --- [h_RMROLE_1_3_16] i.s.c.r.p.c.RmBranchCommitProcessor      : rm client handle branch commit process:xid=172.20.0.2:8091:84324557325869056,branchId=84324560530317312,branchType=AT,resourceId=jdbc:mysql://localhost:3306/storage,applicationData=null
      2020-12-21 16:35:43.378  INFO 5123 --- [h_RMROLE_1_3_16] io.seata.rm.AbstractRMHandler            : Branch committing: 172.20.0.2:8091:84324557325869056 84324560530317312 jdbc:mysql://localhost:3306/storage null
      2020-12-21 16:35:43.378  INFO 5123 --- [h_RMROLE_1_3_16] io.seata.rm.AbstractRMHandler            : Branch commit result: PhaseTwo_Committed
      
    1. 异常流程-模拟用户下单,看下各应用的二阶段提交日志。

      • 修改BusinessServiceImpl类,并重启。

            private boolean flag;
        
            @Override
            @GlobalTransactional(timeoutMills = 300000, name = "seata-demo-business")
            public Boolean handleBusiness(BusinessDTO businessDTO) {
                flag = false;
                log.info("开始全局事务,XID = " + RootContext.getXID());
                CommodityDTO commodityDTO = new CommodityDTO();
                commodityDTO.setCommodityCode(businessDTO.getCommodityCode());
                commodityDTO.setCount(businessDTO.getCount());
                boolean storageResult =  storageApi.decreaseStorage(commodityDTO);
        
                OrderDTO orderDTO = new OrderDTO();
                orderDTO.setUserId(businessDTO.getUserId());
                orderDTO.setCommodityCode(businessDTO.getCommodityCode());
                orderDTO.setOrderCount(businessDTO.getCount());
                orderDTO.setOrderAmount(businessDTO.getAmount());
                boolean orderResult = orderApi.createOrder(orderDTO);
        
                //打开注释测试事务发生异常后,全局回滚功能
                if (!flag) {
                    throw new RuntimeException("测试抛异常后,分布式事务回滚!");
                }
        
                if (!storageResult || !orderResult) {
                    throw new RuntimeException("失败");
                }
                return true;
            }
        
      • 执行business模块test/java目录下的business.http文件,对接口发起请求。

      POST http://localhost:8084/business/buy
      
      HTTP/1.1 500 
      Content-Type: application/json
      Transfer-Encoding: chunked
      Date: Mon, 21 Dec 2020 08:46:24 GMT
      Connection: close
      
      {
        "timestamp": "2020-12-21T08:46:24.678+00:00",
        "status": 500,
        "error": "Internal Server Error",
        "message": "",
        "path": "/business/buy"
      }
      
    • 各应用下二阶段回滚情况,下面日志以Storage应用为例。

      2020-12-21 16:46:23.665  INFO 5123 --- [:20881-thread-6] c.m.storage.provider.StorageApiImpl      : storage-全局事务,XID = 172.20.0.2:8091:84327252002611200
      2020-12-21 16:46:23.670 DEBUG 5123 --- [:20881-thread-6] c.m.s.dao.StorageDao.decreaseStorage     : ==>  Preparing: update storage set count = count - ? where commodity_code = ?
      2020-12-21 16:46:23.671 DEBUG 5123 --- [:20881-thread-6] c.m.s.dao.StorageDao.decreaseStorage     : ==> Parameters: 2(Integer), cola(String)
      2020-12-21 16:46:23.689 DEBUG 5123 --- [:20881-thread-6] c.m.s.dao.StorageDao.decreaseStorage     : <==    Updates: 1
      2020-12-21 16:46:24.461  INFO 5123 --- [h_RMROLE_1_7_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=172.20.0.2:8091:84327252002611200,branchId=84327252610785280,branchType=AT,resourceId=jdbc:mysql://localhost:3306/storage,applicationData=null
      2020-12-21 16:46:24.462  INFO 5123 --- [h_RMROLE_1_7_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.20.0.2:8091:84327252002611200 84327252610785280 jdbc:mysql://localhost:3306/storage
      2020-12-21 16:46:24.580  INFO 5123 --- [h_RMROLE_1_7_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.20.0.2:8091:84327252002611200 branch 84327252610785280, undo_log deleted with GlobalFinished
      2020-12-21 16:46:24.588  INFO 5123 --- [h_RMROLE_1_7_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked
      2020-12-21 16:46:24.596  INFO 5123 --- [h_RMROLE_1_8_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=172.20.0.2:8091:84327252002611200,branchId=84327252556259328,branchType=AT,resourceId=jdbc:mysql://localhost:3306/storage,applicationData=null
      2020-12-21 16:46:24.596  INFO 5123 --- [h_RMROLE_1_8_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.20.0.2:8091:84327252002611200 84327252556259328 jdbc:mysql://localhost:3306/storage
      2020-12-21 16:46:24.610  INFO 5123 --- [h_RMROLE_1_8_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.20.0.2:8091:84327252002611200 branch 84327252556259328, undo_log added with GlobalFinished
      2020-12-21 16:46:24.615  INFO 5123 --- [h_RMROLE_1_8_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked
      2020-12-21 16:46:24.621  INFO 5123 --- [h_RMROLE_1_9_16] i.s.c.r.p.c.RmBranchRollbackProcessor    : rm handle branch rollback process:xid=172.20.0.2:8091:84327252002611200,branchId=84327252489150464,branchType=AT,resourceId=jdbc:mysql://localhost:3306/storage,applicationData=null
      2020-12-21 16:46:24.621  INFO 5123 --- [h_RMROLE_1_9_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacking: 172.20.0.2:8091:84327252002611200 84327252489150464 jdbc:mysql://localhost:3306/storage
      2020-12-21 16:46:24.634  INFO 5123 --- [h_RMROLE_1_9_16] i.s.r.d.undo.AbstractUndoLogManager      : xid 172.20.0.2:8091:84327252002611200 branch 84327252489150464, undo_log added with GlobalFinished
      2020-12-21 16:46:24.641  INFO 5123 --- [h_RMROLE_1_9_16] io.seata.rm.AbstractRMHandler            : Branch Rollbacked result: PhaseTwo_Rollbacked
      
      
      • 大家可以观察到各数据库下的数据并没有发生变化。

    以上代码,我已经上传到GitHub中了,大家详见: https://github.com/sanshengshui/seata-dubbo-action,AT模式在master分支上。

    下一章将给大家介绍基于Dubbo + Seata的分布式事务 --- TCC模式的实战案例,敬请期待!

    参考文章

  • 相关阅读:
    2021“MINIEYE杯”中国大学生算法设计超级联赛(2)(1002 I love tree)(树状数组+树链剖分)
    周末随笔_有关变化
    20210808心情随笔
    离开那个傻叉的地方了
    如何建设符合ISO9000要求的企业文控中心
    企业云盘部署极其简单的分布式文件系统的方法
    企业云盘安全机制-文件加密存储与原文存储的优劣
    查看tomcat打开的文件数
    Centos7 Memcached 安装
    centos7 快速安装rabbitmq
  • 原文地址:https://www.cnblogs.com/sanshengshui/p/14169121.html
Copyright © 2011-2022 走看看