zoukankan      html  css  js  c++  java
  • 分布式事务(3)强一致性分布式事务Atomikos实战

    分布式事务(1)-理论基础

    分布式事务(2)---强一致性分布式事务解决方案

    分布式事务(4)---最终一致性方案之TCC

    前面介绍强一致性分布式解决方案,这里用Atomikos框架写一个实战的demo。模拟下单扣减库存的操作。

    使用Atomikos,mybatis-plus框架搭建项目,springboot版本 2.3.2.RELEASE。

    1.项目搭建

    依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>com.baomidou</groupId>
                <artifactId>mybatis-plus-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jta-atomikos</artifactId>
            </dependency>

    库存:

    import com.baomidou.mybatisplus.annotation.IdType;
    import com.baomidou.mybatisplus.annotation.TableId;
    import lombok.Data;
    
    @Data
    public class Storage {
        @TableId(type= IdType.AUTO)
        private Integer id;
        
        private Integer commodityId;
        
        private Integer quantity;
        
    }

    订单:

    import com.baomidou.mybatisplus.annotation.IdType;
    import com.baomidou.mybatisplus.annotation.TableId;
    import com.baomidou.mybatisplus.annotation.TableName;
    import lombok.Data;
    
    import java.math.BigDecimal;
    
    
    @Data
    @TableName("t_order")
    public class Order {
    
        @TableId(type= IdType.AUTO)
        private Integer id;
        
        private String userId;
    
        private Integer commodityId;
        
        private Integer quantity;
        
        private BigDecimal price;
        
        private Integer status;
    }

    初始化sql:需要建两个数据库,我这里建了一个njytest1和njytest2,让订单表和存库表在不同数据库生成初始化表数据

    CREATE TABLE `storage` (
                               `id` int(11) NOT NULL AUTO_INCREMENT,
                               `commodity_id` int(11) NOT NULL,
                               `quantity` int(11) NOT NULL,
                               PRIMARY KEY (`id`),
                               UNIQUE KEY `idx_commodity_id` (`commodity_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
    INSERT INTO `storage` (`id`, `commodity_id`, `quantity`) VALUES (1, 1, 10);
    
    CREATE TABLE `t_order` (
                               `id` int(11) NOT NULL AUTO_INCREMENT,
                               `user_id` varchar(255) DEFAULT NULL,
                               `commodity_id` int(11) NOT NULL,
                               `quantity` int(11) DEFAULT 0,
                               `price` decimal (10,2) DEFAULT NULL ,
                               `status` int(11) DEFAULT NULL,
                               PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;

    2.配置

    数据库1配置类,用于接收数据源1的配置:

    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    /**
     * Description:
     * Created by nijunyang on 2021/12/2 23:57
     */
    @Data
    @ConfigurationProperties(prefix = "mysql.datasource1")
    @Component
    public class DBConfig1 {
        private String url;
        private String username;
        private String password;
        private int minPoolSize;
        private int maxPoolSize;
        private int maxLifetime;
        private int borrowConnectionTimeout;
        private int loginTimeout;
        private int maintenanceInterval;
        private int maxIdleTime;
        private String testQuery;
    }

    数据库2的配置类,用于接收数据源2的配置:

    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    /**
     * Description:
     * Created by nijunyang on 2021/12/3 0:00
     */
    @Data
    @ConfigurationProperties(prefix = "mysql.datasource2")
    @Component
    public class DBConfig2 {
        private String url;
        private String username;
        private String password;
        private int minPoolSize;
        private int maxPoolSize;
        private int maxLifetime;
        private int borrowConnectionTimeout;
        private int loginTimeout;
        private int maintenanceInterval;
        private int maxIdleTime;
        private String testQuery;
    }

    application.yml配置文件中对应的两个数据源配置

    mysql:
      datasource1:
        url: jdbc:mysql://localhost:3306/njytest1?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
        username: root
        password: root
        minPoolsize: 3
        maxPoolSize: 25
        maxLifetime: 30000
        borrowConnectionTimeout: 30
        loginTimeout: 30
        maintenanceInterval: 60
        maxIdleTime: 60
        testQuery: SELECT 1
    
      datasource2:
        url: jdbc:mysql://localhost:3306/njytest2?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
        username: root
        password: root
        minPoolsize: 3
        maxPoolSize: 25
        maxLifetime: 30000
        borrowConnectionTimeout: 30
        loginTimeout: 30
        maintenanceInterval: 60
        maxIdleTime: 60
        testQuery: SELECT 1
    logging:
      level:
        com.nijunyang.tx.xa.mapper1: debug
        com.nijunyang.tx.xa.mapper2: debug

    我们需要将我们的mapper放到两个不同的包下面,才能给两个mapper配置不同的数据源。

    import com.baomidou.mybatisplus.core.mapper.BaseMapper;
    import com.nijunyang.tx.common.entity.Order;
    import org.springframework.stereotype.Repository;
    
    /**
     * Description:
     * Created by nijunyang on 2021/12/3 0:09
     */
    @Repository
    public interface OrderMapper extends BaseMapper<Order> {
    
    }
    import com.baomidou.mybatisplus.core.mapper.BaseMapper;
    import com.nijunyang.tx.common.entity.Storage;
    import org.apache.ibatis.annotations.Update;
    import org.springframework.stereotype.Repository;
    
    /**
     * Description:
     * Created by nijunyang on 2021/12/3 0:10
     */
    @Repository
    public interface StorageMapper extends BaseMapper<Storage> {
    
        @Update("UPDATE storage SET quantity = quantity - #{quantity} WHERE commodity_id = #{commodityId} and quantity >= #{quantity}")
        void reduce(Integer commodityId, Integer quantity);
    }

    分别配置两个数据源的mybatis配置:

    MyBatisConfig1 制定使用com.nijunyang.tx.xa.mapper1包, 并且指定其sqlSessionTemplate 为名为orderSqlSessionTemplate的bean;
    MyBatisConfig2 制定使用com.nijunyang.tx.xa.mapper2包, 并且指定其sqlSessionTemplate 为名为storageSqlSessionTemplate的bean;
    也就是这两个配置:
    @MapperScan(basePackages = "com.nijunyang.tx.xa.mapper1", sqlSessionTemplateRef = "orderSqlSessionTemplate")
    @MapperScan(basePackages = "com.nijunyang.tx.xa.mapper2", sqlSessionTemplateRef = "storageSqlSessionTemplate")

    import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
    import com.mysql.cj.jdbc.MysqlXADataSource;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionTemplate;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    
    import javax.sql.DataSource;
    import java.sql.SQLException;
    
    /**
     * Description:
     * Created by nijunyang on 2021/12/3 0:11
     */
    @Configuration
    /**
     * 制定此mapper使用哪个sqlSessionTemplate
     */
    @MapperScan(basePackages = "com.nijunyang.tx.xa.mapper1", sqlSessionTemplateRef = "orderSqlSessionTemplate")
    public class MyBatisConfig1 {
    
        //配置XA数据源
        @Primary
        @Bean(name = "orderDataSource")
        public DataSource orderDataSource(DBConfig1 dbConfig1) throws SQLException {
            MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
            mysqlXaDataSource.setUrl(dbConfig1.getUrl());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
            mysqlXaDataSource.setPassword(dbConfig1.getPassword());
            mysqlXaDataSource.setUser(dbConfig1.getUsername());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(mysqlXaDataSource);
            xaDataSource.setUniqueResourceName("orderDataSource");
    
            xaDataSource.setMinPoolSize(dbConfig1.getMinPoolSize());
            xaDataSource.setMaxPoolSize(dbConfig1.getMaxPoolSize());
            xaDataSource.setMaxLifetime(dbConfig1.getMaxLifetime());
            xaDataSource.setBorrowConnectionTimeout(dbConfig1.getBorrowConnectionTimeout());
            xaDataSource.setLoginTimeout(dbConfig1.getLoginTimeout());
            xaDataSource.setMaintenanceInterval(dbConfig1.getMaintenanceInterval());
            xaDataSource.setMaxIdleTime(dbConfig1.getMaxIdleTime());
            xaDataSource.setTestQuery(dbConfig1.getTestQuery());
            return xaDataSource;
        }
    
        @Primary
        @Bean(name = "orderSqlSessionFactory")
        public SqlSessionFactory orderSqlSessionFactory(@Qualifier("orderDataSource") DataSource dataSource) throws Exception {
    //        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
    //        bean.setDataSource(dataSource);
            // 这里用 MybatisSqlSessionFactoryBean 代替了 SqlSessionFactoryBean,否则 MyBatisPlus 不会生效
            MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
            bean.setDataSource(dataSource);
            return bean.getObject();
        }
    
        @Primary
        @Bean(name = "orderSqlSessionTemplate")
        public SqlSessionTemplate orderSqlSessionTemplate(
                @Qualifier("orderSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
            SqlSessionTemplate sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory);
            return sqlSessionTemplate;
        }
    import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
    import com.mysql.cj.jdbc.MysqlXADataSource;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionTemplate;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.sql.DataSource;
    import java.sql.SQLException;
    
    /**
     * Description:
     * Created by nijunyang on 2021/12/3 0:16
     */
    @Configuration
    /**
     * 制定此mapper使用哪个sqlSessionTemplate
     */
    @MapperScan(basePackages = "com.nijunyang.tx.xa.mapper2", sqlSessionTemplateRef = "storageSqlSessionTemplate")
    public class MyBatisConfig2 {
    
        //配置XA数据源
        @Bean(name = "storageDataSource")
        public DataSource storageDataSource(DBConfig2 dbConfig2) throws SQLException {
            MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
            mysqlXaDataSource.setUrl(dbConfig2.getUrl());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
            mysqlXaDataSource.setPassword(dbConfig2.getPassword());
            mysqlXaDataSource.setUser(dbConfig2.getUsername());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(mysqlXaDataSource);
            xaDataSource.setUniqueResourceName("storageDataSource");
    
            xaDataSource.setMinPoolSize(dbConfig2.getMinPoolSize());
            xaDataSource.setMaxPoolSize(dbConfig2.getMaxPoolSize());
            xaDataSource.setMaxLifetime(dbConfig2.getMaxLifetime());
            xaDataSource.setBorrowConnectionTimeout(dbConfig2.getBorrowConnectionTimeout());
            xaDataSource.setLoginTimeout(dbConfig2.getLoginTimeout());
            xaDataSource.setMaintenanceInterval(dbConfig2.getMaintenanceInterval());
            xaDataSource.setMaxIdleTime(dbConfig2.getMaxIdleTime());
            xaDataSource.setTestQuery(dbConfig2.getTestQuery());
            return xaDataSource;
        }
    
        @Bean(name = "storageSqlSessionFactory")
        public SqlSessionFactory storageSqlSessionFactory(@Qualifier("storageDataSource") DataSource dataSource) throws Exception {
    //        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
    //        bean.setDataSource(dataSource);
            // 这里用 MybatisSqlSessionFactoryBean 代替了 SqlSessionFactoryBean,否则 MyBatisPlus 不会生效
            MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
            bean.setDataSource(dataSource);
            return bean.getObject();
        }
    
        @Bean(name = "storageSqlSessionTemplate")
        public SqlSessionTemplate storageSqlSessionTemplate(
                @Qualifier("storageSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
            SqlSessionTemplate sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory);
            return sqlSessionTemplate;
        }
    }

    因为我们要使用自己的数据源,所以启动类需要剔除数据源的自动配置

     3.业务代码

    import com.nijunyang.tx.common.entity.Order;
    import com.nijunyang.tx.xa.service.OrderService;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import javax.annotation.Resource;
    
    /**
     * Description:
     * Created by nijunyang on 2021/12/3 0:20
     */
    @RestController
    @RequestMapping("order")
    public class OrderController {
    
        @Resource
        private OrderService orderService;
    
        //127.0.0.1:8080/order?userId=1&commodityId=1&quantity=2&price=10
        @GetMapping
        public Object create(Order order) {
            orderService.create(order);
            return 1;
        }
    
    }
    import com.nijunyang.tx.common.entity.Order;
    import com.nijunyang.tx.xa.mapper1.OrderMapper;
    import com.nijunyang.tx.xa.mapper2.StorageMapper;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import javax.annotation.Resource;
    
    /**
     * Description:
     * Created by nijunyang on 2021/12/3 0:21
     */
    @Service
    public class OrderService {
    
        @Resource
        private OrderMapper orderMapper;
        @Resource
        private StorageMapper storageMapper;
    
        @Transactional(rollbackFor = Exception.class)
        public void create(Order order) {
            orderMapper.insert(order);
            storageMapper.reduce(order.getCommodityId(), order.getQuantity());
    //        int a = 1/0;
        }
    }

    至此项目搭建完毕,访问  127.0.0.1:8080/order?userId=1&commodityId=1&quantity=2&price=10,可以发现两个数据库的t_order表和 storage数据正常写入。

    当我在业务层构造一个异常 int a = 1/0时,会发现两个库均不会写入数据。

    实际上通过@Transactional注解拿到的是这个事务管理器org.springframework.transaction.jta.JtaTransactionManager#doBegin,最终开启事务是由com.atomikos.icatch.jta.UserTransactionManager#begin来开启事务,这个就是Atomikos提供的事务管理器;

    发生异常回滚也是com.atomikos.icatch.jta.UserTransactionManager#rollback,最终com.atomikos.icatch.imp.TransactionStateHandler#rollback会将所有的事务都回滚。

  • 相关阅读:
    Git学习的网址
    (转)读懂diff
    如何让Beamer的logo放在右上角
    测试面试的一些分享
    python学习-使用制表符或者换行符来添加空白
    python学习-python变量的命名和使用
    python学习-运行.py时,python做了啥
    2020年,很特殊的1年
    python md5验签
    postman使用当前时间戳
  • 原文地址:https://www.cnblogs.com/nijunyang/p/15652241.html
Copyright © 2011-2022 走看看