zoukankan      html  css  js  c++  java
  • SpringBoot+Mybatis+Druid+Jta+Atomikos 解决动态数据源分布式事务问题

    1.基本介绍

    jta(java Transaction API)+Atomikos(事务管理器) 底层原理是分布式事务的两阶段提交

    2.两阶段提交(two phase commit)

    2.1 说明

    当一个事务跨多个节点时,为了保持事务的原子性与一致性,需要引入一个协调者(Coordinator)来统一掌控所有参与者(Participant)的操作结果,并指示它们是否要把操作结果进行真正的提交(commit)或者回滚(rollback)。这里数据库充当的是参与者的角色

    2.2 原理

    image-20201220181614465

    提交请求(投票)阶段

    • 协调者向所有参与者发送prepare请求与事务内容,询问是否可以准备事务提交,并等待参与者的响应。
    • 参与者执行事务中包含的操作,并记录undo日志(用于回滚)和redo日志(用于重放),但不真正提交。
    • 参与者向协调者返回事务操作的执行结果,执行成功返回yes,否则返回no。

    提交(执行)阶段

    分为成功与失败两种情况。

    若所有参与者都返回yes,说明事务可以提交:

    • 协调者向所有参与者发送commit请求。
    • 参与者收到commit请求后,将事务真正地提交上去,并释放占用的事务资源,并向协调者返回ack。
    • 协调者收到所有参与者的ack消息,事务成功完成。

    若有参与者返回no或者超时未返回,说明事务中断,需要回滚:

    • 协调者向所有参与者发送rollback请求。
    • 参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack。
    • 协调者收到所有参与者的ack消息,事务回滚完成

    弊端

    1.同步阻塞问题。执行过程中,所有参与节点都是事务阻塞型的。所以这样很影响效率。

    2.单点故障。由于协调者的重要性,一旦协调者发生故障。参与者会一直阻塞下去

    3.仍然存在不一致风险。如果由于网络异常等意外导致只有部分参与者收到了commit请求,就会造成部分参与者提交了事务而其他参与者未提交的情况。

    3.编写代码

    许多解释在代码中均有体现.

    3.1 引入相关jar包

    <dependencies>
         <!--开启AOP切面-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>
    	
         <!--druid连接池-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.14</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
    
        <!--分布式事务-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
    

    3.2 application.yml

    spring:
      datasource:
        type: com.alibaba.druid.pool.DruidDataSource
        driverClassName: com.mysql.jdbc.Driver
        druid:
          # 主库数据源
          master:
            url: jdbc:mysql://localhost:3306/ry?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
            username: root
            password: 123456
          # 从库数据源
          slave:
            # 是否开启从数据源,默认关闭
            enabled: true
            url: jdbc:mysql://localhost:3306/data_catalog?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
            username: root
            password: 123456
          # 初始连接数
          initialSize: 5
          # 最小连接池数量
          minIdle: 10
          # 最大连接池数量
          maxActive: 20
          # 配置获取连接等待超时的时间
          maxWait: 60000
          # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
          timeBetweenEvictionRunsMillis: 60000
          # 配置一个连接在池中最小生存的时间,单位是毫秒
          minEvictableIdleTimeMillis: 300000
          # 配置一个连接在池中最大生存的时间,单位是毫秒
          maxEvictableIdleTimeMillis: 900000
          # 配置检测连接是否有效
          validationQuery: SELECT 1 FROM DUAL
          testWhileIdle: true
          testOnBorrow: false
          testOnReturn: false
          webStatFilter:
            enabled: true
          statViewServlet:
            enabled: true
            # 设置白名单,不填则允许所有访问
            allow:
            url-pattern: /druid/*
            # 控制台管理用户名和密码
            login-username:
            login-password:
          filter:
            stat:
              enabled: true
              # 慢SQL记录
              log-slow-sql: true
              slow-sql-millis: 1000
              merge-sql: true
            wall:
              config:
                multi-statement-allow: true
    server:
      port: 8000
    

    3.3 自定义注解

    @Target({ ElementType.METHOD, ElementType.TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    public @interface MyDataSource {
        /**
         * 切换数据源名称
         */
        public DataSourceType value() default DataSourceType.MASTER;
    }
    

    3.4 自定义枚举类

    public enum DataSourceType {
        /**
         * 主库
         */
        MASTER,
    
        /**
         * 从库
         */
        SLAVE
    }
    

    3.5 自定义aop切面

    @Aspect
    @Component
    @Order(1)
    public class MyDataSourceAsp {
        /**
         * 扫描所有与这个注解有关的
         * :@within:用于匹配所有持有指定注解类型内的方法和类;
         * 也就是说只要有一个类上的有这个,使用@within这个注解,就能拿到下面所有的方法
         *:@annotation:用于匹配当前执行方法持有指定注解的方法,而这个注解只针对方法
         *
         * 不添加扫描路径,应该是根据启动类的扫描范围执行的
         */
        @Pointcut("@annotation(com.shw.dynamic.annotation.MyDataSource) " +
                "|| @within(com.shw.dynamic.annotation.MyDataSource)")
        public void doPointCut() {
        }
    
        @Around("doPointCut()")
        public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
            MyDataSource dataSource = getDataSource(joinPoint);
            if (dataSource != null) {
                RoutingDataSourceContext.setDataSourceRoutingKey(dataSource.value().name());
            }
            try {
                // 继续执行
                return joinPoint.proceed();
            } finally {
                //关闭线程资源 在执行方法之后
                RoutingDataSourceContext.close();
            }
        }
    
        /**
         * 获取类或者方法上的注解
         * 先获取方法上的注解,然后在获取类上的注解,这就实现了方法上数据源切换优先于类上的
         * @param joinPoint 正在执行的连接点
         * @return 注解
         */
        private MyDataSource getDataSource(ProceedingJoinPoint joinPoint) {
            MethodSignature method = (MethodSignature) joinPoint.getSignature();
            // 获取方法上的注解
            MyDataSource annotation = method.getMethod().getAnnotation(MyDataSource.class);
            if (annotation != null) {
                return annotation;
            } else {
                // 获取到这个注解上的类
                Class<?> aClass = joinPoint.getTarget().getClass();
                // 获取到这个类上的注解
                MyDataSource dataSource = aClass.getAnnotation(MyDataSource.class);
                // 返回类上的注解
                return dataSource;
            }
        }
    }
    

    3.6 编写上下文数据源

    public class RoutingDataSourceContext  {
    
        private static Logger logger = LoggerFactory.getLogger(RoutingDataSourceContext.class);
        /**
         * 使用ThreadLocal维护变量,ThreadLocal为每个使用该变量的线程提供独立的变量副本,
         *  所以每一个线程都可以独立地改变自己的副本,而不会影响其它线程所对应的副本。
         */
        private static final ThreadLocal<String> THREAD_LOCAL_DATA_SOURCE_KEY = new ThreadLocal<>();
    
        /**
         * 得到数据源名称
         * @return
         */
        static String getDataSourceRoutingKey() {
            String key = THREAD_LOCAL_DATA_SOURCE_KEY.get();
            return key == null ? DataSourceType.MASTER.name() : key;
        }
    
        /**
         * 设置数据源
         * @param key
         */
        public static void setDataSourceRoutingKey(String key) {
            logger.info("切换到{}数据源",key);
            THREAD_LOCAL_DATA_SOURCE_KEY.set(key);
        }
    
        /**
         * 清空数据源设置
         */
        public static void close() {
            THREAD_LOCAL_DATA_SOURCE_KEY.remove();
        }
    
    }
    

    3.7 druid连接池配置参数

    @Configuration
    public class DruidProperties {
        @Value("${spring.datasource.druid.initialSize}")
        private int initialSize;
    
        @Value("${spring.datasource.druid.minIdle}")
        private int minIdle;
    
        @Value("${spring.datasource.druid.maxActive}")
        private int maxActive;
    
        @Value("${spring.datasource.druid.maxWait}")
        private int maxWait;
    
        @Value("${spring.datasource.druid.timeBetweenEvictionRunsMillis}")
        private int timeBetweenEvictionRunsMillis;
    
        @Value("${spring.datasource.druid.minEvictableIdleTimeMillis}")
        private int minEvictableIdleTimeMillis;
    
        @Value("${spring.datasource.druid.maxEvictableIdleTimeMillis}")
        private int maxEvictableIdleTimeMillis;
    
        @Value("${spring.datasource.druid.validationQuery}")
        private String validationQuery;
    
        @Value("${spring.datasource.druid.testWhileIdle}")
        private boolean testWhileIdle;
    
        @Value("${spring.datasource.druid.testOnBorrow}")
        private boolean testOnBorrow;
    
        @Value("${spring.datasource.druid.testOnReturn}")
        private boolean testOnReturn;
    
        public DruidDataSource dataSource(DruidDataSource datasource) {
            /** 配置初始化大小、最小、最大 */
            datasource.setInitialSize(initialSize);
            datasource.setMaxActive(maxActive);
            datasource.setMinIdle(minIdle);
    
            /** 配置获取连接等待超时的时间 */
            datasource.setMaxWait(maxWait);
    
            /** 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 */
            datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
    
            /** 配置一个连接在池中最小、最大生存的时间,单位是毫秒 */
            datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
            datasource.setMaxEvictableIdleTimeMillis(maxEvictableIdleTimeMillis);
    
            /**
             * 用来检测连接是否有效的sql,要求是一个查询语句,常用select 'x'。如果validationQuery为null,testOnBorrow、testOnReturn、testWhileIdle都不会起作用。
             */
            datasource.setValidationQuery(validationQuery);
            /** 建议配置为true,不影响性能,并且保证安全性。申请连接的时候检测,如果空闲时间大于timeBetweenEvictionRunsMillis,执行validationQuery检测连接是否有效。 */
            datasource.setTestWhileIdle(testWhileIdle);
            /** 申请连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
            datasource.setTestOnBorrow(testOnBorrow);
            /** 归还连接时执行validationQuery检测连接是否有效,做了这个配置会降低性能。 */
            datasource.setTestOnReturn(testOnReturn);
            return datasource;
        }
    }
    

    3.8 数据源配置(重点)

    @Configuration
    @MapperScan(basePackages = DataSourceConfig.BASE_PACKAGES, sqlSessionTemplateRef = "sqlSessionTemplate")
    public class DataSourceConfig {
    
        static final String BASE_PACKAGES = "com.shw.dynamic.mapper";
    
        private static final String MAPPER_LOCATION = "classpath:mybatis/mapper/*.xml";
    
        /***
         * 创建 DruidXADataSource master 用@ConfigurationProperties 自动配置属性
         */
        @Bean(name = "druidDataSourceMaster")
        @ConfigurationProperties("spring.datasource.druid.master")
        public DataSource druidDataSourceMaster(DruidProperties properties) {
            DruidXADataSource druidXADataSource = new DruidXADataSource();
            return properties.dataSource(druidXADataSource);
        }
    
        /***
         * 创建 DruidXADataSource slave
         */
        @Bean(name = "druidDataSourceSlave")
        @ConfigurationProperties("spring.datasource.druid.slave")
        public DataSource druidDataSourceSlave(DruidProperties properties) {
            DruidXADataSource druidXADataSource = new DruidXADataSource();
            return properties.dataSource(druidXADataSource);
        }
    
        /**
         * 创建支持 XA 事务的 Atomikos 数据源 master
         */
        @Bean(name = "dataSourceMaster")
        public DataSource dataSourceMaster(@Qualifier(value = "druidDataSourceMaster") DataSource druidDataSourceMaster) {
            AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
            sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceMaster);
            // 必须为数据源指定唯一标识
            sourceBean.setPoolSize(5);
            sourceBean.setTestQuery("SELECT 1");
            sourceBean.setUniqueResourceName("master");
            return sourceBean;
        }
    
        /**
         * 创建支持 XA 事务的 Atomikos 数据源 slave
         */
        @Bean(name = "dataSourceSlave")
        public DataSource dataSourceSlave(@Qualifier(value = "druidDataSourceSlave") DataSource druidDataSourceSlave) {
            AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
            sourceBean.setXaDataSource((DruidXADataSource) druidDataSourceSlave);
            sourceBean.setPoolSize(5);
            sourceBean.setTestQuery("SELECT 1");
            sourceBean.setUniqueResourceName("slave");
            return sourceBean;
        }
    
        /**
         * @param dataSourceMaster 数据源 master
         * @return 数据源 master 的会话工厂
         */
        @Bean(name = "sqlSessionFactoryMaster")
        @Primary
        public SqlSessionFactory sqlSessionFactoryMaster(@Qualifier(value = "dataSourceMaster") DataSource dataSourceMaster)
                throws Exception {
            return createSqlSessionFactory(dataSourceMaster);
        }
    
        /**
         * @param dataSourceSlave 数据源 slave
         * @return 数据源 slave 的会话工厂
         */
        @Bean(name = "sqlSessionFactorySlave")
        public SqlSessionFactory sqlSessionFactorySlave(@Qualifier(value = "dataSourceSlave") DataSource dataSourceSlave)
                throws Exception {
            return createSqlSessionFactory(dataSourceSlave);
        }
    
        /***
         * sqlSessionTemplate 与 Spring 事务管理一起使用,以确保使用的实际 SqlSession 是与当前 Spring 事务关联的,
         * 此外它还管理会话生命周期,包括根据 Spring 事务配置根据需要关闭,提交或回滚会话
         * @param sqlSessionFactoryMaster 数据源 master
         * @param sqlSessionFactorySlave 数据源 slave
         */
        @Bean(name = "sqlSessionTemplate")
        public MySqlSessionTemplate sqlSessionTemplate(@Qualifier(value = "sqlSessionFactoryMaster") SqlSessionFactory sqlSessionFactoryMaster,
                                                       @Qualifier(value = "sqlSessionFactorySlave") SqlSessionFactory sqlSessionFactorySlave) {
            Map<Object, SqlSessionFactory> sqlSessionFactoryMap = new HashMap<>();
            sqlSessionFactoryMap.put(DataSourceType.MASTER.name(), sqlSessionFactoryMaster);
            sqlSessionFactoryMap.put(DataSourceType.SLAVE.name(), sqlSessionFactorySlave);
            MySqlSessionTemplate customSqlSessionTemplate = new MySqlSessionTemplate(sqlSessionFactoryMaster);
            customSqlSessionTemplate.setTargetSqlSessionFactories(sqlSessionFactoryMap);
            return customSqlSessionTemplate;
        }
    
        /***
         * 自定义会话工厂
         * @param dataSource 数据源
         * @return :自定义的会话工厂
         */
        private SqlSessionFactory createSqlSessionFactory(DataSource dataSource) throws Exception {
            SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
            factoryBean.setDataSource(dataSource);
            org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
            //配置驼峰命名
            configuration.setMapUnderscoreToCamelCase(true);
            //配置sql日志
            configuration.setLogImpl(StdOutImpl.class);
            factoryBean.setConfiguration(configuration);
            ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
            //配置读取mapper.xml路径
            factoryBean.setDataSource(dataSource);
            // 配置别名
            factoryBean.setTypeAliasesPackage("com.shw.**");
    
            factoryBean.setMapperLocations(resolver.getResources(MAPPER_LOCATION));
            return factoryBean.getObject();
        }
    }
    

    3.9 (重点)重写SqlSessionTemplate,也就是把SqlSessionTemplate这个类copy一份,修改getSqlSessionFactory这个方法返回值.

    public class MySqlSessionTemplate extends SqlSessionTemplate {
        private final SqlSessionFactory sqlSessionFactory;
        private final ExecutorType executorType;
        private final SqlSession sqlSessionProxy;
        private final PersistenceExceptionTranslator exceptionTranslator;
        private Map<Object, SqlSessionFactory> targetSqlSessionFactories;
        private SqlSessionFactory defaultTargetSqlSessionFactory;
     
        /**
         * 通过Map传入
         * @param targetSqlSessionFactories
         */
        public void setTargetSqlSessionFactories(Map<Object, SqlSessionFactory> targetSqlSessionFactories) {
            this.targetSqlSessionFactories = targetSqlSessionFactories;
        }
        public void setDefaultTargetSqlSessionFactory(SqlSessionFactory defaultTargetSqlSessionFactory) {
            this.defaultTargetSqlSessionFactory = defaultTargetSqlSessionFactory;
        }
        public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
            this(sqlSessionFactory, sqlSessionFactory.getConfiguration().getDefaultExecutorType());
        }
        public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType) {
            this(sqlSessionFactory, executorType, new MyBatisExceptionTranslator(sqlSessionFactory.getConfiguration()
                    .getEnvironment().getDataSource(), true));
        }
        public MySqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
                                    PersistenceExceptionTranslator exceptionTranslator) {
            super(sqlSessionFactory, executorType, exceptionTranslator);
            this.sqlSessionFactory = sqlSessionFactory;
            this.executorType = executorType;
            this.exceptionTranslator = exceptionTranslator;
            this.sqlSessionProxy = (SqlSession) newProxyInstance(
                    SqlSessionFactory.class.getClassLoader(),
                    new Class[] { SqlSession.class },
                    new SqlSessionInterceptor());
            this.defaultTargetSqlSessionFactory = sqlSessionFactory;
        }
        //通过DataSourceContextHolder获取当前的会话工厂
        @Override
        public SqlSessionFactory getSqlSessionFactory() {
            String dataSourceKey = RoutingDataSourceContext.getDataSourceRoutingKey();
            SqlSessionFactory targetSqlSessionFactory = targetSqlSessionFactories.get(dataSourceKey);
            if (targetSqlSessionFactory != null) {
                return targetSqlSessionFactory;
            } else if (defaultTargetSqlSessionFactory != null) {
                return defaultTargetSqlSessionFactory;
            } else {
                Assert.notNull(targetSqlSessionFactories, "Property 'targetSqlSessionFactories' or 'defaultTargetSqlSessionFactory' are required");
                Assert.notNull(defaultTargetSqlSessionFactory, "Property 'defaultTargetSqlSessionFactory' or 'targetSqlSessionFactories' are required");
            }
            return this.sqlSessionFactory;
        }
     
     
        @Override
        public Configuration getConfiguration() {
            return this.getSqlSessionFactory().getConfiguration();
        }
        @Override
        public ExecutorType getExecutorType() {
            return this.executorType;
        }
        @Override
        public PersistenceExceptionTranslator getPersistenceExceptionTranslator() {
            return this.exceptionTranslator;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <T> T selectOne(String statement) {
            return this.sqlSessionProxy.<T> selectOne(statement);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <T> T selectOne(String statement, Object parameter) {
            return this.sqlSessionProxy.<T> selectOne(statement, parameter);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <K, V> Map<K, V> selectMap(String statement, String mapKey) {
            return this.sqlSessionProxy.<K, V> selectMap(statement, mapKey);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey) {
            return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <K, V> Map<K, V> selectMap(String statement, Object parameter, String mapKey, RowBounds rowBounds) {
            return this.sqlSessionProxy.<K, V> selectMap(statement, parameter, mapKey, rowBounds);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <E> List<E> selectList(String statement) {
            return this.sqlSessionProxy.<E> selectList(statement);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <E> List<E> selectList(String statement, Object parameter) {
            return this.sqlSessionProxy.<E> selectList(statement, parameter);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
            return this.sqlSessionProxy.<E> selectList(statement, parameter, rowBounds);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void select(String statement, ResultHandler handler) {
            this.sqlSessionProxy.select(statement, handler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void select(String statement, Object parameter, ResultHandler handler) {
            this.sqlSessionProxy.select(statement, parameter, handler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void select(String statement, Object parameter, RowBounds rowBounds, ResultHandler handler) {
            this.sqlSessionProxy.select(statement, parameter, rowBounds, handler);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public int insert(String statement) {
            return this.sqlSessionProxy.insert(statement);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public int insert(String statement, Object parameter) {
            return this.sqlSessionProxy.insert(statement, parameter);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public int update(String statement) {
            return this.sqlSessionProxy.update(statement);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public int update(String statement, Object parameter) {
            return this.sqlSessionProxy.update(statement, parameter);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public int delete(String statement) {
            return this.sqlSessionProxy.delete(statement);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public int delete(String statement, Object parameter) {
            return this.sqlSessionProxy.delete(statement, parameter);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public <T> T getMapper(Class<T> type) {
            return getConfiguration().getMapper(type, this);
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void commit() {
            throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void commit(boolean force) {
            throw new UnsupportedOperationException("Manual commit is not allowed over a Spring managed SqlSession");
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void rollback() {
            throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void rollback(boolean force) {
            throw new UnsupportedOperationException("Manual rollback is not allowed over a Spring managed SqlSession");
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void close() {
            throw new UnsupportedOperationException("Manual close is not allowed over a Spring managed SqlSession");
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void clearCache() {
            this.sqlSessionProxy.clearCache();
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public Connection getConnection() {
            return this.sqlSessionProxy.getConnection();
        }
        /**
         * {@inheritDoc}
         * @since 1.0.2
         */
        @Override
        public List<BatchResult> flushStatements() {
            return this.sqlSessionProxy.flushStatements();
        }
        /**
         * Proxy needed to route MyBatis method calls to the proper SqlSession got from Spring's Transaction Manager It also
         * unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to pass a {@code PersistenceException} to
         * the {@code PersistenceExceptionTranslator}.
         */
        private class SqlSessionInterceptor implements InvocationHandler {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                final SqlSession sqlSession = getSqlSession(
                        MySqlSessionTemplate.this.getSqlSessionFactory(),
                        MySqlSessionTemplate.this.executorType,
                        MySqlSessionTemplate.this.exceptionTranslator);
                try {
                    Object result = method.invoke(sqlSession, args);
                    if (!isSqlSessionTransactional(sqlSession, MySqlSessionTemplate.this.getSqlSessionFactory())) {
                        sqlSession.commit(true);
                    }
                    return result;
                } catch (Throwable t) {
                    Throwable unwrapped = unwrapThrowable(t);
                    if (MySqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
                        Throwable translated = MySqlSessionTemplate.this.exceptionTranslator
                                .translateExceptionIfPossible((PersistenceException) unwrapped);
                        if (translated != null) {
                            unwrapped = translated;
                        }
                    }
                    throw unwrapped;
                } finally {
                    closeSqlSession(sqlSession, MySqlSessionTemplate.this.getSqlSessionFactory());
                }
            }
        }
    }
    

    3.10 事务管理器配置

    @Configuration
    @EnableTransactionManagement
    public class XATransactionManagerConfig {
     
        @Bean
        public UserTransaction userTransaction() throws Throwable {
            UserTransactionImp userTransactionImp = new UserTransactionImp();
            userTransactionImp.setTransactionTimeout(10000);
            return userTransactionImp;
        }
     
        @Bean
        public TransactionManager atomikosTransactionManager() {
            UserTransactionManager userTransactionManager = new UserTransactionManager();
            userTransactionManager.setForceShutdown(true);
            return userTransactionManager;
        }
     
        @Bean
        public PlatformTransactionManager transactionManager(UserTransaction userTransaction,
                                                             TransactionManager transactionManager) {
            return new JtaTransactionManager(userTransaction, transactionManager);
        }
    }
    

    4.测试

    @Controller
    @RestController
    public class Hello {
    
        @Autowired
        private HelloMapper helloMapper;
    
        @GetMapping("/hello")
        @Transactional(rollbackFor = Exception.class)
        public List<Map> hello() {
            List<Map> school = helloMapper.getSchool();
            System.out.println(school);
            List<Map> user = helloMapper.getCatalog();
            System.out.println(user);
            return null;
        }
    
        @GetMapping("/hi")
        @Transactional(rollbackFor = Exception.class)
        public List<Map> hi() {
            helloMapper.insertCatalog();
            int i = 1/0;
            helloMapper.insertSchool();
            return null;
        }
    
    }
    
    public interface HelloMapper {
    
        @MyDataSource(DataSourceType.SLAVE)
        List<Map> getCatalog();
    
        List<Map> getSchool();
    
        @MyDataSource(DataSourceType.SLAVE)
        void insertCatalog();
    
        void insertSchool();
    
    }
    

    结论

    在以上代码的情况下,在需要进行数据源切换的时候,在接口上或方法上添加注解@MyDataSource(DataSourceType.SLAVE)切换到slave数据源,如果在方法上添加了事务,数据源依旧可以切换成功,且当添加事务的方法中发生了异常,整个方法都会回滚.至此,多数据源切换分布式事务问题解决成功.

    参考文章:

    https://blog.csdn.net/qq_35387940/article/details/103474353

    git仓库地址:

    https://github.com/sunhuawei0517/dynamicDataSource/tree/jta

    mster分支为多数据源切换,jta分支为多数据源+分布式事务

  • 相关阅读:
    剑指offer-矩形覆盖
    剑指offer-变态跳台阶
    剑指offer-跳台阶
    剑指offer-斐波那契数列
    剑指offer-旋转数组的最小数字
    剑指offer-用俩个栈实现队列
    剑指offer-重建二叉树
    剑指offer-从尾到头打印链表
    http头
    mysql-8.0解压缩版安装配置完整过程
  • 原文地址:https://www.cnblogs.com/sun2020/p/14164962.html
Copyright © 2011-2022 走看看