zoukankan      html  css  js  c++  java
  • 分布式事务的学习

    解决分布式事务的理论方案

    • 全局事务XA规范
    • 最终一致性:2PC,TCC
    • 可靠消息服务(例如rocketmq的事务消息;rabbitmq基于ack的消息确认)
    • 最大努力通知

    分布式事务场景有如下几种形式:

    • 一个应用中操作多个数据库需要保证数据一致性
    • 微服务不同模块操作同一个数据库需要保证数据一致性
    • 微服务不同模块操作不同数据库需要保证数据一致性

    理论落地实现技术-Atomikos

    Atomikos是基于XA规范的Java实现版本JTA进行开发的一项技术,这项技术也比较成熟了。针对mysql数据库Atomikos只有在InnoDB存储引擎下才有效,针对MyISAM就无法控制事务了,开发的时候需要注意这点

    XA是强一致性事务,性能不佳,互联网项目中中的应该不多,因此Atomikos更适合在一个单体应用中对多个数据源进行分布式事务

    使用Atomikos开发,必须将不同数据源的mapper进行分包管理,也即不同数据源的Dao层放在不同的包下,这样mybatis的mapperScan扫描不同包下就可以获得正确的连接,建议将mapper的xml映射,实体类也进行分包管理

    Atomikos和Spring整合

    1.引入依赖

    <dependency>
        <groupId>javax.transaction</groupId>
        <artifactId>jta</artifactId>
        <version>1.1</version>
    </dependency>
    <dependency>
        <groupId>com.atomikos</groupId>
        <artifactId>atomikos-util</artifactId>
        <version>4.0.6</version>
    </dependency>
    <dependency>
        <groupId>com.atomikos</groupId>
        <artifactId>transactions</artifactId>
        <version>4.0.6</version>
    </dependency>
    <dependency>
        <groupId>com.atomikos</groupId>
        <artifactId>transactions-api</artifactId>
        <version>4.0.6</version>
    </dependency>
    <dependency>
        <groupId>com.atomikos</groupId>
        <artifactId>transactions-jdbc</artifactId>
        <version>4.0.6</version>
    </dependency>
    <dependency>
        <groupId>com.atomikos</groupId>
        <artifactId>transactions-jta</artifactId>
        <version>4.0.6</version>
    </dependency>
    

    2.多个数据源连接信息配置

    # 数据源1
    jdbc.driverClass=com.mysql.jdbc.Driver
    jdbc.jdbcUrl=jdbc:mysql:///ssm_crud?useSSl=false&characterEncoding=utf-8
    jdbc.user=root
    jdbc.password=root
    
    # 数据源2
    jdbc.shop.driverClass=com.mysql.jdbc.Driver
    jdbc.shop.jdbcUrl=jdbc:mysql:///shop?useSSl=false&characterEncoding=utf-8
    jdbc.shop.user=root
    jdbc.shop.password=root
    

    3.在spring的application.properties文件中配置如下内容

    • 配置dataSource,这里需要使用AtomikosDataSourceBean
    <!-- 引入jdbc.properties文件 -->
    <context:property-placeholder location="classpath:jdbc.properties"/>
    
    <!--这里配置一个抽象的bean 将数据源一些共有配置抽取出来,不是必须这样配置,只是为了抽出共有配置-->
    <bean id="abstractXADataSource" class="com.atomikos.jdbc.AtomikosDataSourceBean" init-method="init" destroy-method="close" abstract="true">
        <property name="xaDataSourceClassName" value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource"/>
        <property name="maxPoolSize" value="30"/>
        <property name="minPoolSize" value="10"/>
        <property name="borrowConnectionTimeout" value="60"/>
        <property name="reapTimeout" value="20"/>
        <property name="maxIdleTime" value="60"/>
        <property name="maintenanceInterval" value="60"/>
    </bean>
        <!--数据库1基本配置-->
        <bean id="dataSourceOne" parent="abstractXADataSource">
            <property name="uniqueResourceName" value="dataSourceOne"/>
            <!--数据库驱动-->
            <property name="xaProperties">
                <props>
                    <prop key="url">${jdbc.jdbcUrl}</prop>
                    <prop key="user">${jdbc.user}</prop>
                    <prop key="password">${jdbc.password}</prop>
                </props>
            </property>
        </bean>
        <!--数据库2基本配置-->
        <bean id="dataSourceTwo" parent="abstractXADataSource">
            <property name="uniqueResourceName" value="dataSourceTwo"/>
            <!--数据库驱动-->
            <property name="xaProperties">
                <props>
                    <prop key="url">${jdbc.shop.jdbcUrl}</prop>
                    <prop key="user">${jdbc.shop.user}</prop>
                    <prop key="password">${jdbc.shop.password}</prop>
                </props>
            </property>
    </bean>
    
    • 配置sqlSessionFactory (mapperLocations,typeAliasesPackage也可以配置不同的路径)
    <!--有几个数据源就需要配置几个sqlSessionFactory-->
    <bean id="sqlSessionFactoryOne" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="configLocation" value="classpath:mybatis/sqlMapConfig.xml"></property>
        <property name="dataSource" ref="dataSourceOne"></property>
        <!-- 指定mybatis,mapper文件的位置 -->
        <property name="mapperLocations" value="classpath:mapper/*.xml"></property>
        <!-- 配置实体类别名 -->
        <property name="typeAliasesPackage" value="cn.lynu.model"></property>
    </bean>
    <bean id="sqlSessionFactoryTwo" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="configLocation" value="classpath:mybatis/sqlMapConfig.xml"></property>
        <property name="dataSource" ref="dataSourceTwo"></property>
        <!-- 指定mybatis,mapper文件的位置 -->
        <property name="mapperLocations" value="classpath:mapper/*.xml"></property>
        <!-- 配置实体类别名 -->
        <property name="typeAliasesPackage" value="cn.lynu.model"></property>
    </bean>
    
    • 配置mapper扫描器(注意这里basePackage扫描的路径)
    <!--有几个数据源就需要配置几个mapper扫描器-->
    <bean id="mapperScannerConfigurerOne" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage" value="cn.lynu.mapper"></property>
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryOne"></property>
    </bean>
    <bean id="mapperScannerConfigurerTwo" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage" value="cn.lynu.shopmapper"></property>
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactoryTwo"></property>
    </bean>
    
    • 配置事务管理
    <!--配置atomikos事务管理器-->
    <bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" init-method="init" destroy-method="close">
        <property name="forceShutdown" value="false"/>
    </bean>
    <!--配置atomikos事务-->
    <bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
        <property name="transactionTimeout" value="300"/>
    </bean>
    <!--配置spring与jta事务整合-->
    <bean id="springTransactionManager" class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManager" ref="atomikosTransactionManager"/>
        <property name="userTransaction" ref="atomikosUserTransaction"/>
        <property name="allowCustomIsolationLevels" value="true"/>
    </bean>
    
    • 配置事务切面,这里使用的是切面的方式配置事务,实际上使用注解配置也可以
    <!-- 配置通知 -->
    <tx:advice id="txAdvice" transaction-manager="springTransactionManager">
        <tx:attributes>
            <!-- 匹配业务类中的方法 -->
            <tx:method name="save*" propagation="REQUIRED"/>
            <tx:method name="insert*" propagation="REQUIRED"/>
            <tx:method name="update*" propagation="REQUIRED"/>
            <tx:method name="delete*" propagation="REQUIRED"/>
            <tx:method name="find*" propagation="SUPPORTS" read-only="true"/>
            <tx:method name="select*" propagation="SUPPORTS" read-only="true"/>
            <tx:method name="get*" propagation="SUPPORTS" read-only="true"/>
        </tx:attributes>
    </tx:advice>
            
    <!-- 配置aop -->
    <aop:config>
        <!-- 配置切点 -->
        <aop:pointcut expression="execution(* cn.lynu.service.*.*(..))" id="cut"/>
        <!-- 配置切面 -->
        <aop:advisor advice-ref="txAdvice" pointcut-ref="cut"/>
    </aop:config>
    

    如果是注解方式就如下配置:

    <tx:annotation-driven transaction-manager="springTransactionManager"/>
    

    重点就是这里的transaction-manager

    接下来就可以进行测试了:

    package cn.lynu.service;
    
    import cn.lynu.mapper.EmployeeMapper;
    import cn.lynu.model.Employee;
    import cn.lynu.model.ShopProduct;
    import cn.lynu.shopmapper.ShopProductMapper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import java.math.BigDecimal;
    
    @Service
    public class AtomikosService {
    
        @Autowired
        private EmployeeMapper employeeMapper;
        @Autowired
        private ShopProductMapper shopProductMapper;
    
        public void saveMethod() {
            ShopProduct product = new ShopProduct();
            product.setPname("测试1");
            product.setPprice(new BigDecimal("100"));
            product.setStock(100);
            shopProductMapper.insertProduct(product);
    
    
            Employee employee = new Employee();
            employee.setEmpName("测试1");
            employee.setEmpSex("1");
            employee.setEmpEmail("123@qq.com");
            employee.setdId(1);
            employeeMapper.insertSelective(employee);
            
            int i = 1 / 0;
        }
    }
    

    查看运行日志,出现如下内容就表示分布式事务正常回滚

    Atomikos和SpringBoot整合

    这里我用的是Mybatis-Plus,Mybatis配置方法也类似,可以参考:springboot jta atomikos实现分布式事物管理

    1.引入依赖

    <!--使用atomikos实现分布式事务-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>
    

    2.修改SpringBoot配置文件

    spring:
      # 数据源配置
      datasource:
        databases1:
    #      type: com.alibaba.druid.pool.DruidDataSource
          driver-class-name: com.mysql.jdbc.Driver
          url: jdbc:mysql:///ssm_crud?useSSl=false&characterEncoding=utf-8
          username: root
          password: root
        databases2:
    #      type: com.alibaba.druid.pool.DruidDataSource
          driver-class-name: com.mysql.jdbc.Driver
          url: jdbc:mysql:///shop?useSSl=false&characterEncoding=utf-8
          username: root
          password: root
    
    mybatis-plus:
      #Mapper扫描在MyBatisPlusConfigDS.java配置类中进行了配置
      #实体扫描,多个package用逗号或者分号分隔
      typeAliasesPackage: com.ds.entity
      global-config:
        db-config:
          #主键类型 AUTO:"数据库ID自增", INPUT:"用户输入ID", ID_WORKER:"全局唯一ID (默认,数字类型唯一ID)", UUID:"全局唯一ID UUID";
          id-type: AUTO
          #字段策略 IGNORED:"忽略判断", NOT_NULL:"非 NULL 判断"), NOT_EMPTY:"非空判断"
          field-strategy: NOT_NULL
          #是否开启大写命名,默认false不开启
          #capital-mode: true
      configuration:
        #配置返回数据库(column下划线命名&&返回java实体是驼峰命名),自动匹配无需as(没开启这个,SQL需要写as: select user_id as userId)
        map-underscore-to-camel-case: true
        cache-enabled: false
        #配置JdbcTypeForNull, oracle数据库必须配置
        jdbc-type-for-null: 'null'
        #打印sql日志
    #    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
    

    3.Mybatis-Plus配置类

    因为有两个数据源需要控制分布式事务,所以就需要写两个配置类,并且将basePackages扫描不同包下的mapper,这里也需要对不同数据源的Dao层进行分包,也建议将mapper的xml映射,实体类也进行分包管理

    import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
    import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
    import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
    import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
    import org.apache.ibatis.plugin.Interceptor;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionTemplate;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
    
    import javax.sql.DataSource;
    
    @Configuration
    // 扫描的Dao层 多个数据源对应的Mapper文件以及XML文件需要分开配置(放在不同的包下) 如果放在同一个文件夹会报错
    @MapperScan(basePackages = "com.ds.mapper", sqlSessionTemplateRef = "ds1SqlSessionTemplate")
    public class MyBatisPlusConfigDS1 {
    
        /**
         * 数据源1配置信息(url password username...)
         */
        @Bean(name = "ds1DataSourceProperties")
        @ConfigurationProperties(prefix = "spring.datasource.databases1")
        public DataSourceProperties ds1DataSourceProperties() {
            return new DataSourceProperties();
        }
    
        /**
         * 配置数据源1
         * @param config 数据源配置信息
         * @return
         */
        @Primary
        @Bean(name = "ds1DataSource")
        public DataSource ds1DataSource(@Qualifier("ds1DataSourceProperties") DataSourceProperties config) {
            // 创建Mysql实现XA规范的分布式数据源
            MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
            // 设置连接信息
            mysqlXaDataSource.setUrl(config.getUrl());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
            mysqlXaDataSource.setPassword(config.getPassword());
            mysqlXaDataSource.setUser(config.getUsername());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    
    		// 数据源改为Atomikos,将事务交给Atomikos统一管理
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(mysqlXaDataSource);
    		// Druid对于XA的支持,将上面代码注释,打开以下注释
    		/*DruidXADataSource druidXADataSource = new DruidXADataSource();
            druidXADataSource.setUrl(config.getUrl());
            druidXADataSource.setPassword(config.getPassword());
            druidXADataSource.setUsername(config.getUsername());
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(druidXADataSource);*/
    		
            xaDataSource.setUniqueResourceName("ds1DataSource");
    
    //        xaDataSource.setMinPoolSize(config.getMinPoolSize());
    //        xaDataSource.setMaxPoolSize(config.getMaxPoolSize());
    //        xaDataSource.setMaxLifetime(config.getMaxLifetime());
    //        xaDataSource.setBorrowConnectionTimeout(config.getBorrowConnectionTimeout());
    //        xaDataSource.setLoginTimeout(config.getLoginTimeout());
    //        xaDataSource.setMaintenanceInterval(config.getMaintenanceInterval());
    //        xaDataSource.setMaxIdleTime(config.getMaxIdleTime());
    //        xaDataSource.setTestQuery(config.getTestQuery());
            return xaDataSource;
        }
    
        /**
         * 配置数据源1的sqlSessionFactory
         * 使用MybatisPlus的MybatisSqlSessionFactoryBean 替换Mybatis的SqlSessionFactoryBean
         * @param dataSource 数据源
         * @return
         * @throws Exception
         */
        @Bean(name = "ds1SqlSessionFactory")
        public SqlSessionFactory ds1SqlSessionFactory(@Qualifier("ds1DataSource") DataSource dataSource)
                throws Exception {
            MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
            sqlSessionFactoryBean.setDataSource(dataSource);
            sqlSessionFactoryBean.setPlugins(new Interceptor[]{
                    new PaginationInterceptor(), // 分页插件
                    new PerformanceInterceptor().setFormat(true) // SQL格式化插件
            });
            // 配置为扫描数据源1所有的mapper
            sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/com/ds/mapper/*.xml"));
            return sqlSessionFactoryBean.getObject();
        }
    
        /**
         * 配置数据源1的SqlSessionTemplate
         * @param sqlSessionFactory 数据源1的sqlSessionFactory
         */
        @Bean(name = "ds1SqlSessionTemplate")
        public SqlSessionTemplate ds1SqlSessionTemplate(@Qualifier("ds1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
            return new SqlSessionTemplate(sqlSessionFactory);
        }
        
    }
    
    import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
    import com.baomidou.mybatisplus.extension.plugins.PerformanceInterceptor;
    import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
    import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
    import org.apache.ibatis.plugin.Interceptor;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionTemplate;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
    
    import javax.sql.DataSource;
    import java.sql.SQLException;
    
    @Configuration
    @MapperScan(basePackages = "com.ds.mapper2", sqlSessionTemplateRef = "ds2SqlSessionTemplate")
    public class MyBatisPlusConfigDS2 {
    
        @Bean(name = "ds2DataSourceProperties")
        @ConfigurationProperties(prefix = "spring.datasource.databases2")
        public DataSourceProperties ds1DataSourceProperties() {
            return new DataSourceProperties();
        }
    
        // 配置数据源
        @Bean(name = "ds2DataSource")
        public DataSource ds2DataSource(@Qualifier("ds2DataSourceProperties") DataSourceProperties config) throws SQLException {
            MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
            mysqlXaDataSource.setUrl(config.getUrl());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
            mysqlXaDataSource.setPassword(config.getPassword());
            mysqlXaDataSource.setUser(config.getUsername());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
    
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(mysqlXaDataSource);
    		
    		// Druid对于XA的支持,将上面代码注释,打开以下注释
    		/*DruidXADataSource druidXADataSource = new DruidXADataSource();
            druidXADataSource.setUrl(config.getUrl());
            druidXADataSource.setPassword(config.getPassword());
            druidXADataSource.setUsername(config.getUsername());
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(druidXADataSource);*/
    		
            xaDataSource.setUniqueResourceName("ds2DataSource");
    
    //        xaDataSource.setMinPoolSize(config.getMinPoolSize());
    //        xaDataSource.setMaxPoolSize(config.getMaxPoolSize());
    //        xaDataSource.setMaxLifetime(config.getMaxLifetime());
    //        xaDataSource.setBorrowConnectionTimeout(config.getBorrowConnectionTimeout());
    //        xaDataSource.setLoginTimeout(config.getLoginTimeout());
    //        xaDataSource.setMaintenanceInterval(config.getMaintenanceInterval());
    //        xaDataSource.setMaxIdleTime(config.getMaxIdleTime());
    //        xaDataSource.setTestQuery(config.getTestQuery());
            return xaDataSource;
        }
    
        @Bean(name = "ds2SqlSessionFactory")
        public SqlSessionFactory ds2SqlSessionFactory(@Qualifier("ds2DataSource") DataSource dataSource)
                throws Exception {
            MybatisSqlSessionFactoryBean sqlSessionFactoryBean = new MybatisSqlSessionFactoryBean();
            sqlSessionFactoryBean.setDataSource(dataSource);
            sqlSessionFactoryBean.setPlugins(new Interceptor[]{
                    new PaginationInterceptor(), // 分页插件
                    new PerformanceInterceptor().setFormat(true) // SQL格式化插件
            });
            sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/com/ds/mapper2/*.xml"));
            return sqlSessionFactoryBean.getObject();
        }
    
        @Bean(name = "ds2SqlSessionTemplate")
        public SqlSessionTemplate ds2SqlSessionTemplate(
                @Qualifier("ds2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
            return new SqlSessionTemplate(sqlSessionFactory);
        }
    
    }
    

    不同数据库连接池对Atomikos的支持

    默认的MysqlXADataSource是支持XA规范的;而我们常用的数据库连接池如:HikariCP,druid,dbcp,c3p0等貌似有些还未支持,具体情况如下图:

    理论落地实现技术-Seata

    Seata官网

    Seata是阿里开源的分布式,比较适合在分布式微服务项目中使用,如SpringCloud,Dubbo,官方也给出一些整合的案例

    Seata中有TC,TM,RM的概念解释如下:

    TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。Seata提供实现好的seata-server作为TC。

    TM (Transaction Manager) - 事务管理器:定义全局事务的范围:开始全局事务、提交或回滚全局事务。

    RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。RM更多表示的就是每一个数据源。

    TC已经由Seata提供了,我们只需要将从下载地址后配置运行即可,在项目中配置还需要我们配置TC和RM,这两者通过jar包的方式引入项目

    Seata和SpringCloud整合

    1.修改seata-server配置并运行

    主要修改的是conf目录下的registry.conf,file.conf这两个文件。registry.conf只要配置注册中心和配置中心,Seata的注册中心可以是file 、nacos 、eureka、redis、zk、consul、etcd3、sofa类型,而配置中心可以是file、nacos 、apollo、zk、consul、etcd3这几种类型,如果我们将配置中心使用file类型,那么就需要修改file.conf这个文件了。这里记录的就是用nacos作为Seata注册中心,file作为配置中心的方案,如果需要将配置中心也使用nacos就需要修改conf/nacos-config.txt,修改后运行conf/nacos-config.sh脚本,更详细可以参看[文档](nacos (seata.io))

    这里我们使用的是Seata 0.9.0版本的 seata-server-0.9.0

    registry.conf:

    registry {
      # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
      # 这里将注册中心type改为naccos
      type = "nacos"
    
      # 配置正确的naocs地址
      nacos {
        serverAddr = "localhost:8848"
        namespace = ""
        cluster = "default"
      }
      eureka {
        serviceUrl = "http://localhost:8761/eureka"
        application = "default"
        weight = "1"
      }
      redis {
        serverAddr = "localhost:6379"
        db = "0"
      }
      zk {
        cluster = "default"
        serverAddr = "127.0.0.1:2181"
        session.timeout = 6000
        connect.timeout = 2000
      }
      consul {
        cluster = "default"
        serverAddr = "127.0.0.1:8500"
      }
      etcd3 {
        cluster = "default"
        serverAddr = "http://localhost:2379"
      }
      sofa {
        serverAddr = "127.0.0.1:9603"
        application = "default"
        region = "DEFAULT_ZONE"
        datacenter = "DefaultDataCenter"
        cluster = "default"
        group = "SEATA_GROUP"
        addressWaitTime = "3000"
      }
      file {
        name = "file.conf"
      }
    }
    
    config {
      # file、nacos 、apollo、zk、consul、etcd3
      # 使用file类型的配置中心
      type = "file"
    
      nacos {
        serverAddr = "localhost"
        namespace = ""
      }
      consul {
        serverAddr = "127.0.0.1:8500"
      }
      apollo {
        app.id = "seata-server"
        apollo.meta = "http://192.168.1.204:8801"
      }
      zk {
        serverAddr = "127.0.0.1:2181"
        session.timeout = 6000
        connect.timeout = 2000
      }
      etcd3 {
        serverAddr = "http://localhost:2379"
      }
      # 因为使用了file类型的配置中心,所以接下来就需要修改file.conf文件
      file {
        name = "file.conf"
      }
    }
    

    file.conf:

    这里我们使用数据库持久化配置信息,并且vgroup_mapping.xxx配置的值很关键,值的内容无所谓但是之后我们会用到,这里配置的my_tx_group并没什么特殊意义

    transport {
      # tcp udt unix-domain-socket
      type = "TCP"
      #NIO NATIVE
      server = "NIO"
      #enable heartbeat
      heartbeat = true
      #thread factory for netty
      thread-factory {
        boss-thread-prefix = "NettyBoss"
        worker-thread-prefix = "NettyServerNIOWorker"
        server-executor-thread-prefix = "NettyServerBizHandler"
        share-boss-worker = false
        client-selector-thread-prefix = "NettyClientSelector"
        client-selector-thread-size = 1
        client-worker-thread-prefix = "NettyClientWorkerThread"
        # netty boss thread size,will not be used for UDT
        boss-thread-size = 1
        #auto default pin or 8
        worker-thread-size = 8
      }
      shutdown {
        # when destroy server, wait seconds
        wait = 3
      }
      serialization = "seata"
      compressor = "none"
    }
    service {
      #vgroup->rgroup 这里的值很关键,值的内容无所谓但是之后我们会用到,这里用的my_tx_group并没什么特殊意义
      vgroup_mapping.my_test_tx_group = "my_tx_group"
      #only support single node
      default.grouplist = "localhost:8091"
      #degrade current not support
      enableDegrade = false
      #disable
      disable = false
      #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
      max.commit.retry.timeout = "-1"
      max.rollback.retry.timeout = "-1"
    }
    
    client {
      async.commit.buffer.limit = 10000
      lock {
        retry.internal = 10
        retry.times = 30
      }
      report.retry.count = 5
      tm.commit.retry.count = 1
      tm.rollback.retry.count = 1
    }
    
    ## transaction log store
    store {
      ## store mode: file、db
      # 使用数据库持久化配置信息
      mode = "db"
    
      ## file store
      file {
        dir = "sessionStore"
    
        # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
        max-branch-session-size = 16384
        # globe session size , if exceeded throws exceptions
        max-global-session-size = 512
        # file buffer size , if exceeded allocate new buffer
        file-write-buffer-cache-size = 16384
        # when recover batch read size
        session.reload.read_size = 100
        # async, sync
        flush-disk-mode = async
      }
    
      ## database store
      # 配置正确的数据库信息
      db {
        ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
        datasource = "dbcp"
        ## mysql/oracle/h2/oceanbase etc.
        db-type = "mysql"
        driver-class-name = "com.mysql.jdbc.Driver"
        url = "jdbc:mysql://localhost:3306/seata"
        user = "root"
        password = "root"
        min-conn = 1
        max-conn = 3
        global.table = "global_table"
        branch.table = "branch_table"
        lock-table = "lock_table"
        query-limit = 100
      }
    }
    lock {
      ## the lock store mode: local、remote
      mode = "remote"
    
      local {
        ## store locks in user's database
      }
    
      remote {
        ## store locks in the seata's server
      }
    }
    recovery {
      #schedule committing retry period in milliseconds
      committing-retry-period = 1000
      #schedule asyn committing retry period in milliseconds
      asyn-committing-retry-period = 1000
      #schedule rollbacking retry period in milliseconds
      rollbacking-retry-period = 1000
      #schedule timeout retry period in milliseconds
      timeout-retry-period = 1000
    }
    
    transaction {
      undo.data.validation = true
      undo.log.serialization = "jackson"
      undo.log.save.days = 7
      #schedule delete expired undo_log in milliseconds
      undo.log.delete.period = 86400000
      undo.log.table = "undo_log"
    }
    
    ## metrics settings
    metrics {
      enabled = false
      registry-type = "compact"
      # multi exporters use comma divided
      exporter-list = "prometheus"
      exporter-prometheus-port = 9898
    }
    
    support {
      ## spring
      spring {
        # auto proxy the DataSource bean
        datasource.autoproxy = false
      }
    }
    

    因为需要在数据库持久化,所以就需要在数据库中提前生成表结构,建表语句在conf/db_store.sql,内容如下:

    -- the table to store GlobalSession data
    drop table if exists `global_table`;
    create table `global_table` (
      `xid` varchar(128)  not null,
      `transaction_id` bigint,
      `status` tinyint not null,
      `application_id` varchar(32),
      `transaction_service_group` varchar(32),
      `transaction_name` varchar(128),
      `timeout` int,
      `begin_time` bigint,
      `application_data` varchar(2000),
      `gmt_create` datetime,
      `gmt_modified` datetime,
      primary key (`xid`),
      key `idx_gmt_modified_status` (`gmt_modified`, `status`),
      key `idx_transaction_id` (`transaction_id`)
    );
    
    -- the table to store BranchSession data
    drop table if exists `branch_table`;
    create table `branch_table` (
      `branch_id` bigint not null,
      `xid` varchar(128) not null,
      `transaction_id` bigint ,
      `resource_group_id` varchar(32),
      `resource_id` varchar(256) ,
      `lock_key` varchar(128) ,
      `branch_type` varchar(8) ,
      `status` tinyint,
      `client_id` varchar(64),
      `application_data` varchar(2000),
      `gmt_create` datetime,
      `gmt_modified` datetime,
      primary key (`branch_id`),
      key `idx_xid` (`xid`)
    );
    
    -- the table to store lock data
    drop table if exists `lock_table`;
    create table `lock_table` (
      `row_key` varchar(128) not null,
      `xid` varchar(96),
      `transaction_id` long ,
      `branch_id` long,
      `resource_id` varchar(256) ,
      `table_name` varchar(32) ,
      `pk` varchar(36) ,
      `gmt_create` datetime ,
      `gmt_modified` datetime,
      primary key(`row_key`)
    );
    

    当这两个配置文件修改完成之后运行bin/seata-server.sh/.bat就可以启动Seata-Server即可

    2.在每一个涉及到分布式事务的项目中引入依赖

    因为spring-cloud-starter-alibaba-seata会自动引入Seata的依赖,但是自动引入的依赖版本可能不是我们需要的,就可以同故宫exclusion排除掉,这里我们用Seata 0.9.0版本

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
        <exclusions>
            <exclusion>
                <groupId>io.seata</groupId>
                <artifactId>seata-all</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-all</artifactId>
        <version>0.9.0</version>
    </dependency>
    

    3.修改对应项目中修改SpringBoot配置文件

    我们在SpringBoot的配置文将中添加如下配置:

    spring:
      cloud:
        alibaba:
          seata:
            # 这里自定义的事务组名必须与seata-Server file.conf文件中配置的vgroup_mapping.xxx值一样
            tx-service-group: my_tx_group
    

    4.将registry.conf和file.conf文件放到对应项目的resources目录下

    实际上registry.conf和file.conf文件中多余的配置项可以删除,这里就放一个删除后的版本

    registry.conf:

    registry {
      # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
      type = "nacos"
    
      nacos {
        serverAddr = "localhost:8848"
        namespace = ""
        cluster = "default"
      }
    }
    
    config {
      # file、nacos 、apollo、zk、consul、etcd3
      type = "file"
    
      file {
        name = "file.conf"
      }
    }
    

    file.conf:这个file文件配置的vgroup_mapping.xxx就不与Seata Serve中的配置不同了,xxx必须是Seata Serve file.conf中配置的值,也就是my_tx_group

    transport {
      # tcp udt unix-domain-socket
      type = "TCP"
      #NIO NATIVE
      server = "NIO"
      #enable heartbeat
      heartbeat = true
      #thread factory for netty
      thread-factory {
        boss-thread-prefix = "NettyBoss"
        worker-thread-prefix = "NettyServerNIOWorker"
        server-executor-thread-prefix = "NettyServerBizHandler"
        share-boss-worker = false
        client-selector-thread-prefix = "NettyClientSelector"
        client-selector-thread-size = 1
        client-worker-thread-prefix = "NettyClientWorkerThread"
        # netty boss thread size,will not be used for UDT
        boss-thread-size = 1
        #auto default pin or 8
        worker-thread-size = 8
      }
      shutdown {
        # when destroy server, wait seconds
        wait = 3
      }
      serialization = "seata"
      compressor = "none"
    }
    service {
      #vgroup->rgroup
      vgroup_mapping.my_tx_group = "default"
      #only support single node
      default.grouplist = "127.0.0.1:8091"
      #degrade current not support
      enableDegrade = false
      #disable
      disable = false
      #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
      max.commit.retry.timeout = "-1"
      max.rollback.retry.timeout = "-1"
    }
    
    client {
      async.commit.buffer.limit = 10000
      lock {
        retry.internal = 10
        retry.times = 30
      }
      report.retry.count = 5
      tm.commit.retry.count = 1
      tm.rollback.retry.count = 1
    }
    
    ## transaction log store
    store {
      ## store mode: file、db
      mode = "db"
    
      ## database store
      db {
        ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
        datasource = "dbcp"
        ## mysql/oracle/h2/oceanbase etc.
        db-type = "mysql"
        driver-class-name = "com.mysql.jdbc.Driver"
        url = "jdbc:mysql://localhost:3306/seata"
        user = "root"
        password = "root"
        min-conn = 1
        max-conn = 3
        global.table = "global_table"
        branch.table = "branch_table"
        lock-table = "lock_table"
        query-limit = 100
      }
    }
    lock {
      ## the lock store mode: local、remote
      mode = "remote"
    
      remote {
        ## store locks in the seata's server
      }
    }
    recovery {
      #schedule committing retry period in milliseconds
      committing-retry-period = 1000
      #schedule asyn committing retry period in milliseconds
      asyn-committing-retry-period = 1000
      #schedule rollbacking retry period in milliseconds
      rollbacking-retry-period = 1000
      #schedule timeout retry period in milliseconds
      timeout-retry-period = 1000
    }
    
    transaction {
      undo.data.validation = true
      undo.log.serialization = "jackson"
      undo.log.save.days = 7
      #schedule delete expired undo_log in milliseconds
      undo.log.delete.period = 86400000
      undo.log.table = "undo_log"
    }
    
    ## metrics settings
    metrics {
      enabled = false
      registry-type = "compact"
      # multi exporters use comma divided
      exporter-list = "prometheus"
      exporter-prometheus-port = 9898
    }
    
    support {
      ## spring
      spring {
        # auto proxy the DataSource bean
        datasource.autoproxy = false
      }
    }
    

    5.在对应项目中配置Seata提供的代理数据源

    package com.lynu.config;
    
    import com.alibaba.druid.pool.DruidDataSource;
    import io.seata.rm.datasource.DataSourceProxy;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.SqlSessionFactoryBean;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    
    import javax.sql.DataSource;
    
    /**
     * 使用seata配置数据源DataSourceProxy以及mybatis的SqlSessionFactory
     * 需要在启动类排除调用spring默认的mybatis的自动配置DataSourceAutoConfiguration
     */
    @Configuration
    public class DataSourceProxyConfig {
    
    
        @Bean
        @ConfigurationProperties(prefix = "spring.datasource")
        public DataSource druidDataSource() {
            return new DruidDataSource();
        }
    
    
        @Primary
        @Bean("dataSource")
        public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
            return new DataSourceProxy(druidDataSource);
        }
    
        @Bean(name = "sqlSessionFactory")
        public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) {
            SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
            bean.setDataSource(dataSourceProxy);
    
            SqlSessionFactory factory;
            try {
                factory = bean.getObject();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            return factory;
        }
    
    }
    

    因为使用的Mybatis操作数据库,所以配置的就是SqlSessionFactory,其他的持久层框架与之类似,接下来需要我们将SpringBoot自带的数据源自动配置排除掉,因为我们需要使用自己的配置,在SpringBoot启动类上:

    @SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
    

    6.在每一个涉及到分布式事务的数据库中添加undo_log表

    undo_log表结构在conf/db_undo_log.sql

    drop table `undo_log`;
    CREATE TABLE `undo_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `branch_id` bigint(20) NOT NULL,
      `xid` varchar(100) NOT NULL,
      `context` varchar(128) NOT NULL,
      `rollback_info` longblob NOT NULL,
      `log_status` int(11) NOT NULL,
      `log_created` datetime NOT NULL,
      `log_modified` datetime NOT NULL,
      `ext` varchar(100) DEFAULT NULL,
      PRIMARY KEY (`id`),
      UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
    

    接下来我们在需要开启分布式事务的Service方法上添加@GlobalTransactional注解即可

    理论落地实现技术-TX-LCN

    TX-LCN官网文档

    TX-LCN比较适合在分布式微服务项目中使用,如SpringCloud,Dubbo

    TX-LCN 主要有两个模块,Tx-Client(TC) Tx-Manager(TM). TC作为微服务下的依赖,TM是独立的服务,这里的TM,TC与Seata中的概念有些相反

    版本说明:该框架作者正在开发6.0版本,而本次记录的是以5.0.2版本为准,TM下载地址

    TX-LCN和SpringCloud整合

    1.启动TM服务

    TX-LCN的TM需要依赖MySQL+Redis,所以在下载目录txlcn-tm/src/main/resources/tx-manager.sql有建库建表语句

    CREATE DATABASE IF NOT EXISTS  `tx-manager` DEFAULT CHARSET utf8 COLLATE utf8_general_ci;
    USE `tx-manager`;
    
    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
    
    DROP TABLE IF EXISTS `t_tx_exception`;
    CREATE TABLE `t_tx_exception`  (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `group_id` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
      `unit_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
      `mod_id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
      `transaction_state` tinyint(4) NULL DEFAULT NULL,
      `registrar` tinyint(4) NULL DEFAULT NULL,
      `ex_state` tinyint(4) NULL DEFAULT NULL COMMENT '0 待处理 1已处理',
      `remark` varchar(10240) NULL DEFAULT NULL COMMENT '备注',
      `create_time` datetime(0) NULL DEFAULT NULL,
      PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB AUTO_INCREMENT = 967 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;
    
    SET FOREIGN_KEY_CHECKS = 1;
    

    修改配置信息(txlcn-tm/src/main/resources/application.properties)

    spring.application.name=TransactionManager
    server.port=7970
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.datasource.url=jdbc:mysql://127.0.0.1:3306/tx-manager?characterEncoding=UTF-8&useSSL=false
    spring.datasource.username=root
    spring.datasource.password=root
    spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
    spring.jpa.hibernate.ddl-auto=update
    
    spring.redis.host=127.0.0.1
    spring.redis.port=6379
    

    修改pom.xml文件配置的plugins,注释掉配置的插件,添加SpringBoot-Maven插件

    <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <configuration>
                        <fork>true</fork>
                        <addResources>true</addResources>
                    </configuration>
                </plugin>
            </plugins>
    </build>
    

    修改完这些文件之后就可以通过Maven构建了

    mvn clean package -Dmaven.test.skip=true
    java -jar target/txlcn-tm-5.0.2.RELEASE.jar
    

    2.在每一个涉及到分布式事务的项目中引入依赖

    <dependency>
        <groupId>com.codingapi.txlcn</groupId>
        <artifactId>txlcn-tc</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>com.codingapi.txlcn</groupId>
        <artifactId>txlcn-txmsg-netty</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    

    3.在对应项目开启分布式事务注解

    在每一个涉及到分布式事务的微服务主类上使用@EnableDistributedTransaction

    @SpringBootApplication
    @EnableDistributedTransaction
    public class DemoAApplication {
        public static void main(String[] args) {
            SpringApplication.run(DemoDubboClientApplication.class, args);
        }
    }
    

    接下来再在每一个涉及到分布式事务的业务方法配置@LcnTransaction,注意这里不同于Seata,只要涉及到分布式事务的每一个微服务中的业务方法上都需要添加@LcnTransaction注解。添加注解之后分布式事务就生效了

    TX-LCN的默认TM的本机默认端口:127.0.0.1:8070,如果需要修改就在SpringBoot配置文件中修改:

    tx-lcn:
      client:
        manager-address: 127.0.0.1:8070
    
  • 相关阅读:
    Redis Hashes 巧用sort排序
    Redis 压缩存储的配置
    计算
    关于时间大小判断的坑和网上工具类的看法
    Mysql中字段类型之时间戳大坑2
    Mysql中字段类型之时间戳大坑
    Spring和springmvc父子容器注解扫描问题详解
    JXL导出Excel工具类
    Maven学习
    MySQL之账户管理
  • 原文地址:https://www.cnblogs.com/lz2017/p/15116913.html
Copyright © 2011-2022 走看看