zoukankan      html  css  js  c++  java
  • JPA+atomikos实现分布式事务

    一、整合JTA(atomikos)

    通过maven坐标引入JTA atomikos

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jta-atomikos</artifactId>
    </dependency>
    

    把jdbc-url改成url,然后在spring前缀下面加(注意这里是jta配置,而不是jpa配置,jta表示分布式事务):

    spring:
      jta:
        atomikos:
          datasource:
            max-pool-size: 20
            borrow-connection-timeout: 60
          connectionfactory:
            max-pool-size: 20
            borrow-connection-timeout: 60
    

    max-pool-size表示数据连接池最大连接数,请根据自己的应用规模进行调整
    borrow-connection-timeout表示连接从连接池“借出”之后的超时时间,超时将抛出异常

    二、分布式事务管理器



    以下AtomikosJtaPlatform为固定代码不需修改,代表上图中深蓝色部分:

    package com.dj.config;
    
    import org.hibernate.engine.transaction.jta.platform.internal.AbstractJtaPlatform;
    
    import javax.transaction.TransactionManager;
    import javax.transaction.UserTransaction;
    
    public class AtomikosJtaPlatform extends AbstractJtaPlatform {
    
        private static final long serialVersionUID = 1L;
    
        static TransactionManager transactionManager;
        static UserTransaction transaction;
    
        @Override
        protected TransactionManager locateTransactionManager() {
            return transactionManager;
        }
    
        @Override
        protected UserTransaction locateUserTransaction() {
            return transaction;
        }
    }
    

    事务管理器的配置JPAAtomikosTransactionConfig ,以下除了设置JPA特性的部分,为固定代码不需修改,代表上图中的粉色部分:

    package com.dj.config;
    
    import com.atomikos.icatch.jta.UserTransactionImp;
    import com.atomikos.icatch.jta.UserTransactionManager;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
    import org.springframework.orm.jpa.JpaVendorAdapter;
    import org.springframework.orm.jpa.vendor.Database;
    import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.annotation.EnableTransactionManagement;
    import org.springframework.transaction.jta.JtaTransactionManager;
    
    import javax.transaction.TransactionManager;
    import javax.transaction.UserTransaction;
    
    @Configuration
    @ComponentScan
    @EnableTransactionManagement
    public class JPAAtomikosTransactionConfig {
    
        @Bean
        public PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
            return new PropertySourcesPlaceholderConfigurer();
        }
    
        //设置JPA特性
        @Bean
        public JpaVendorAdapter jpaVendorAdapter() {
            HibernateJpaVendorAdapter hibernateJpaVendorAdapter = new HibernateJpaVendorAdapter();
            //显示sql
            hibernateJpaVendorAdapter.setShowSql(true);
            //自动生成/更新表
            hibernateJpaVendorAdapter.setGenerateDdl(true);
            //设置数据库类型
            hibernateJpaVendorAdapter.setDatabase(Database.MYSQL);
            return hibernateJpaVendorAdapter;
        }
    
        @Bean(name = "userTransaction")
        public UserTransaction userTransaction() throws Throwable {
            UserTransactionImp userTransactionImp = new UserTransactionImp();
            userTransactionImp.setTransactionTimeout(10000);
            return userTransactionImp;
        }
    
        @Bean(name = "atomikosTransactionManager", initMethod = "init", destroyMethod = "close")
        public TransactionManager atomikosTransactionManager() throws Throwable {
            UserTransactionManager userTransactionManager = new UserTransactionManager();
            userTransactionManager.setForceShutdown(false);
            AtomikosJtaPlatform.transactionManager = userTransactionManager;
            return userTransactionManager;
        }
    
        @Bean(name = "transactionManager")
        @DependsOn({"userTransaction", "atomikosTransactionManager"})
        public PlatformTransactionManager transactionManager() throws Throwable {
            UserTransaction userTransaction = userTransaction();
            AtomikosJtaPlatform.transaction = userTransaction;
            TransactionManager atomikosTransactionManager = atomikosTransactionManager();
            return new JtaTransactionManager(userTransaction, atomikosTransactionManager);
        }
    
    }
    
    

    三、数据源及实体管理配置(多数据源多份)

    设置第一个数据库的primary数据源及实体扫描管理(扫描testdb目录),实体管理器、数据源都要加上primary,以示区分:
    需要改的地方是第一个数据源的实体类位置和持久层数据位置

    package com.dj.config;
    
    import com.atomikos.jdbc.AtomikosDataSourceBean;
    import com.mysql.cj.jdbc.MysqlXADataSource;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.context.annotation.Primary;
    import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
    import org.springframework.orm.jpa.JpaVendorAdapter;
    import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
    
    import javax.sql.DataSource;
    import java.sql.SQLException;
    import java.util.HashMap;
    
    @Configuration
    @DependsOn("transactionManager")
    @EnableJpaRepositories(basePackages = "com.dj.dao.testssm",  //注意这里  放的是持久层@Repository
            entityManagerFactoryRef = "primaryEntityManager",
            transactionManagerRef = "transactionManager")
    public class JPAPrimaryConfig {
        @Autowired
        private JpaVendorAdapter jpaVendorAdapter;
    
        //primary
        @Primary
        @Bean(name = "primaryDataSourceProperties")
        @ConfigurationProperties(prefix = "spring.datasource.primary")     //注意这里
        public DataSourceProperties primaryDataSourceProperties() {
            return new DataSourceProperties();
        }
    
        @Primary
        @Bean(name = "primaryDataSource", initMethod = "init", destroyMethod = "close")
        @ConfigurationProperties(prefix = "spring.datasource.primary")
        public DataSource primaryDataSource() throws SQLException {
            MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
            mysqlXaDataSource.setUrl(primaryDataSourceProperties().getUrl());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
            mysqlXaDataSource.setPassword(primaryDataSourceProperties().getPassword());
            mysqlXaDataSource.setUser(primaryDataSourceProperties().getUsername());
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(mysqlXaDataSource);
            xaDataSource.setUniqueResourceName("primary");
            xaDataSource.setBorrowConnectionTimeout(60);
            xaDataSource.setMaxPoolSize(20);
            return xaDataSource;
        }
    
        @Primary
        @Bean(name = "primaryEntityManager")
        @DependsOn("transactionManager")
        public LocalContainerEntityManagerFactoryBean primaryEntityManager() throws Throwable {
    
            HashMap<String, Object> properties = new HashMap<String, Object>();
            properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
            properties.put("javax.persistence.transactionType", "JTA");
            LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
            entityManager.setJtaDataSource(primaryDataSource());
            entityManager.setJpaVendorAdapter(jpaVendorAdapter);
            //这里要修改成主数据源的扫描包
            entityManager.setPackagesToScan("com.dj.model.modelssm");
            entityManager.setPersistenceUnitName("primaryPersistenceUnit");
            entityManager.setJpaPropertyMap(properties);
            return entityManager;
        }
    }
    

    设置第二个数据库的数据源及实体扫描管理

    package com.dj.config;
    
    import com.atomikos.jdbc.AtomikosDataSourceBean;
    import com.mysql.cj.jdbc.MysqlXADataSource;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
    import org.springframework.orm.jpa.JpaVendorAdapter;
    import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
    
    import javax.sql.DataSource;
    import java.sql.SQLException;
    import java.util.HashMap;
    
    @Configuration
    @DependsOn("transactionManager")
    @EnableJpaRepositories(basePackages = "com.dj.dao.testssm2",   //注意这里
            entityManagerFactoryRef = "secondaryEntityManager",
            transactionManagerRef = "transactionManager")
    public class JPASecondaryConfig {
        @Autowired
        private JpaVendorAdapter jpaVendorAdapter;
    
    
        @Bean(name = "secondaryDataSourceProperties")
        @ConfigurationProperties(prefix = "spring.datasource.secondary")    //注意这里
        public DataSourceProperties masterDataSourceProperties() {
            return new DataSourceProperties();
        }
    
    
        @Bean(name = "secondaryDataSource", initMethod = "init", destroyMethod = "close")
        @ConfigurationProperties(prefix = "spring.datasource.secondary")
        public DataSource masterDataSource() throws SQLException {
            MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
            mysqlXaDataSource.setUrl(masterDataSourceProperties().getUrl());
            mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
            mysqlXaDataSource.setPassword(masterDataSourceProperties().getPassword());
            mysqlXaDataSource.setUser(masterDataSourceProperties().getUsername());
            AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
            xaDataSource.setXaDataSource(mysqlXaDataSource);
            xaDataSource.setUniqueResourceName("secondary");
            xaDataSource.setBorrowConnectionTimeout(60);
            xaDataSource.setMaxPoolSize(20);
            return xaDataSource;
        }
    
        @Bean(name = "secondaryEntityManager")
        @DependsOn("transactionManager")
        public LocalContainerEntityManagerFactoryBean masterEntityManager() throws Throwable {
    
            HashMap<String, Object> properties = new HashMap<String, Object>();
            properties.put("hibernate.transaction.jta.platform", AtomikosJtaPlatform.class.getName());
            properties.put("javax.persistence.transactionType", "JTA");
            LocalContainerEntityManagerFactoryBean entityManager = new LocalContainerEntityManagerFactoryBean();
            entityManager.setJtaDataSource(masterDataSource());
            entityManager.setJpaVendorAdapter(jpaVendorAdapter);
            //这里要修改成主数据源的扫描包
            entityManager.setPackagesToScan("com.dj.model.modelssm2");
            entityManager.setPersistenceUnitName("secondaryPersistenceUnit");
            entityManager.setJpaPropertyMap(properties);
            return entityManager;
        }
    }
    

    DataSourceProperties数据源配置、Datasource数据源、EntityManager实体管理器都是2套。分别是primary和secondary
    实体和Repository的扫描目录也是2组,分别是testdb和testdb2
    但是事务管理器只有一个,那就是transactionManager,是基于atomikos实现的。事务管理器只有一个,决定了不同的数据源使用同一个事务管理器,从而实现分布式事务。

    四、测试

    service里面分别向testdb插入article,testdb2插入message,数据插入都成功。人为制造一个被除数为0的异常,数据插入都失败。

    package com.dj;
    
    import com.dj.dao.testssm.ArticleRepository;
    import com.dj.dao.testssm2.UserRepository;
    import com.dj.model.modelssm.Article;
    import com.dj.model.modelssm2.User;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.Date;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringBootjpaTest {
    
        @Autowired
        private ArticleRepository articleRepository;
        @Autowired
        private UserRepository userRepository;
    
        @Test
        @Transactional
        public void jpaTest(){
            Article article = Article.builder()
                    .id(25)
                    .author("0zimug6666321")
                    .content("spring boot 从青铜到王者")
                    .createTime(new Date())
                    //.reader(readers)
                    .title("t133333").build();
    
            User user = User.builder()
                    .user_name("056zimug321")
                    .address("ok")
                    .birthday(new Date())
                    .sex("南")
                    .build();
    
            //先构造一个Article对象article,这个操作针对testdb
            articleRepository.save(article);
            //在构造一个Message对象message,这个操作针对testdb2
            userRepository.save(user);
            int a = 2/0;
    
        }
    }
    
    

    controller中加入测试

    package com.dj.controller;
    
    import com.dj.dao.testssm.ArticleRepository;
    import com.dj.dao.testssm2.UserRepository;
    import com.dj.model.modelssm.Article;
    import com.dj.model.modelssm2.User;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    @RestController
    public class springboottest {
    
        @Autowired
        private ArticleRepository articleRepository;
        @Autowired
        private UserRepository userRepository;
    
        @RequestMapping("test")
        public String test(){
            return "112s";
        }
    
        @Transactional
        @RequestMapping("addtest")
        public  String  jpsTest(){
            Article article = Article.builder()
                    .id(25)
                    .author("550zimug6666321")
                    .content("spring boot 从青铜到王者")
                    .createTime(new Date())
                    //.reader(readers)
                    .title("t133333").build();
    
            User user = User.builder()
                    .user_name("55056zimug321")
                    .address("ok")
                    .birthday(new Date())
                    .sex("南")
                    .build();
    
            //先构造一个Article对象article,这个操作针对testdb
            articleRepository.save(article);
            //在构造一个Message对象message,这个操作针对testdb2
            userRepository.save(user);
            return "success";
        }
    
    }
    

    源码地址

    参考链接:https://www.kancloud.cn/hanxt/springboot2/1177607
    https://my.oschina.net/u/3066875/blog/3055188
    https://my.oschina.net/u/3066875/blog/3055190

  • 相关阅读:
    sqlldr、sqluldr2_w64案例
    查看oracle的sid和sevice_name
    杂记
    GAN学习
    Leetcode 第 217 场周赛
    牛客编程巅峰赛S2第4场
    SAR图像变化检测的一点想法
    Fire! UVA
    HDU
    HDU
  • 原文地址:https://www.cnblogs.com/weidaijie/p/14346707.html
Copyright © 2011-2022 走看看