目录
正文
一、引子
1.1 背景
鉴于spring boot满天飞的大环境,本节样例使用Spring Boot+Atomikos(TM)+Mybatis(ORM)+Mysql(DB)的架构。
两张飞机票:
springboot官网文档:(只到docs即可,浏览可查看全部版本文档,不过都是英文的)
springboot中文文档:这个是2.0版本的,没找到1.5.10的。
2.spring boot对jta的支持
翻译自官网:
Spring Boot通过Atomkos或Bitronix的内嵌事务管理器支持跨多个XA资源的分布式JTA事务,当部署到恰当的J2EE应用服务器时也会支持JTA事务。
当发现JTA环境时,Spring Boot将使用Spring的JtaTransactionManager来管理事务。自动配置的JMS,DataSource和JPA beans将被升级以支持XA事务。你可以使用@Transactional,来参与到一个分布式事务中。如果处于JTA环境,但仍想使用本地事务,你可以将spring.jta.enabled属性设置为false来禁用JTA自动配置功能。
Atomikos是一个流行的开源事务管理器,并且可以嵌入到你的Spring Boot应用中。你可以使用spring-boot-starter-jta-atomikos去拉取Atomikos库。
默认情况下,Atomikos事务日志将被记录在应用home目录下的transaction-logs文件夹中。你可以在application.properties文件中通过设置spring.jta.log-dir属性来定义该目录,以spring.jta.atomikos.properties开头的属性能用来定义Atomikos的UserTransactionServiceIml实现,具体参考AtomikosProperties javadoc。
注:为了确保多个事务管理器能够安全地和相应的资源管理器配合,每个Atomikos实例必须设置一个唯一的ID。默认情况下,该ID是Atomikos实例运行的机器上的IP地址。为了确保生产环境中该ID的唯一性,你需要为应用的每个实例设置不同的spring.jta.transaction-manager-id属性值。
二、简单样例
2.1 业务场景

业务场景:
2个DB:
1. test库中有user用户表
2. test2库中有user_msg用户备注表
当插入一条user记录时,同时插入一条user_msg。如果出现异常,2个库中的数据都能回滚。
2.2 简单样例
1. 导包:pom.xml中导入atomikos包:
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-jta-atomikos</artifactId> 4 <version>2.1.3.RELEASE</version> 5 </dependency>
2. 主库数据源配置
MasterRepositoryConfig
1 package study.config.datasource;
2
3 import org.apache.ibatis.session.SqlSessionFactory;
4 import org.mybatis.spring.SqlSessionFactoryBean;
5 import org.mybatis.spring.SqlSessionTemplate;
6 import org.mybatis.spring.annotation.MapperScan;
7 import org.springframework.beans.factory.annotation.Qualifier;
8 import org.springframework.boot.context.properties.ConfigurationProperties;
9 import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.context.annotation.Configuration;
12 import org.springframework.context.annotation.Primary;
13 import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
14
15 import javax.sql.DataSource;
16 import java.util.Properties;
17
18 /**
19 * @author denny
20 * @Description 主库持久化配置
21 * @date 2019/5/5 下午3:12
22 */
23 @Configuration
24 @MapperScan(basePackages = {MasterRepositoryConfig.MASTER_PACKAGE}, sqlSessionFactoryRef = "masterSqlSessionFactory")
25 public class MasterRepositoryConfig {
26
27 static final String MASTER_PACKAGE = "study.repository.master";
28
29 private static final String MAPPER_LOCATIONS = "classpath*:mybatis/mapper/master/**/*.xml";
30
31 @ConfigurationProperties(prefix = "study.datasource.master")
32 @Bean(name = "masterDataSource")
33 @Primary
34 public DataSource masterDataSource() {
35 // 连接池基本属性
36 Properties p = new Properties();
37 p.setProperty("url", "jdbc:mysql://localhost:3306/" + "test");
38 p.setProperty("user", "root");
39 p.setProperty("password", "12345");
40
41 AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
42 ds.setUniqueResourceName("masterDataSource");
43 ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
44 ds.setXaProperties(p);
45 ds.setPoolSize(5);
46 return ds;
47 }
48
49 @Bean(name = "masterSqlSessionFactory")
50 @Primary
51 public SqlSessionFactory sqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception {
52 SqlSessionFactoryBean fb = new SqlSessionFactoryBean();
53 fb.setDataSource(dataSource);
54 //指定基包
55 fb.setTypeAliasesPackage(MASTER_PACKAGE);
56 //指定xml文件位置
57 fb.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATIONS));
58 return fb.getObject();
59 }
60
61 /**
62 * 基于sqlSession的操作模板类
63 *
64 * @param sqlSessionFactory
65 * @return
66 */
67 @Bean(name = "masterSqlSessionTemplate")
68 @Primary
69 public SqlSessionTemplate sqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
70 SqlSessionTemplate sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory);
71 return sqlSessionTemplate;
72 }
73 }
3.从库数据源配置
SlaveRepositoryConfig
1 package study.config.datasource;
2
3 import org.apache.ibatis.session.SqlSessionFactory;
4 import org.mybatis.spring.SqlSessionFactoryBean;
5 import org.mybatis.spring.SqlSessionTemplate;
6 import org.mybatis.spring.annotation.MapperScan;
7 import org.springframework.beans.factory.annotation.Qualifier;
8 import org.springframework.boot.context.properties.ConfigurationProperties;
9 import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
10 import org.springframework.context.annotation.Bean;
11 import org.springframework.context.annotation.Configuration;
12 import org.springframework.context.annotation.Primary;
13 import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
14
15 import javax.sql.DataSource;
16 import java.util.Properties;
17
18 /**
19 * @author denny
20 * @Description 从库持久化配置
21 * @date 2019/5/5 下午3:12
22 */
23 @Configuration
24 @MapperScan(basePackages = {SlaveRepositoryConfig.MASTER_PACKAGE}, sqlSessionFactoryRef = "slaveSqlSessionFactory")
25 public class SlaveRepositoryConfig {
26
27 static final String MASTER_PACKAGE = "study.repository.slave";
28
29 private static final String MAPPER_LOCATIONS = "classpath*:mybatis/mapper/slave/**/*.xml";
30
31 /**
32 * 从数据源:test2:user_msg表
33 *
34 * @return
35 */
36 @ConfigurationProperties(prefix = "study.datasource.slave")
37 @Bean(name = "slaveDataSource")
38 public DataSource slaveDataSource() {
39 // 连接池基本属性
40 Properties p = new Properties();
41 p.setProperty("url", "jdbc:mysql://localhost:3306/" + "test2");
42 p.setProperty("user", "root");
43 p.setProperty("password", "12345");
44
45 AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
46 ds.setUniqueResourceName("slaveDataSource");
47 ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");
48 ds.setXaProperties(p);
49 ds.setPoolSize(5);
50 return ds;
51 }
52
53 /**
54 * 会话工厂
55 *
56 * @return
57 * @throws Exception
58 */
59 @Bean(name = "slaveSqlSessionFactory")
60 @Primary
61 public SqlSessionFactory sqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception {
62 SqlSessionFactoryBean fb = new SqlSessionFactoryBean();
63 fb.setDataSource(dataSource);
64 //指定基包
65 fb.setTypeAliasesPackage(MASTER_PACKAGE);
66 //指定xml文件位置
67 fb.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(MAPPER_LOCATIONS));
68 return fb.getObject();
69 }
70
71 /**
72 * 基于sqlSession的操作模板类
73 *
74 * @param sqlSessionFactory
75 * @return
76 * @throws Exception
77 */
78 @Bean(name = "slaveSqlSessionTemplate")
79 @Primary
80 public SqlSessionTemplate sqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
81 SqlSessionTemplate sqlSessionTemplate = new SqlSessionTemplate(sqlSessionFactory);
82 return sqlSessionTemplate;
83 }
84 }
4. 配置分布式事务管理器
1 package study.config.datasource;
2
3 import com.atomikos.icatch.jta.UserTransactionImp;
4 import com.atomikos.icatch.jta.UserTransactionManager;
5 import org.springframework.context.annotation.Bean;
6 import org.springframework.context.annotation.Configuration;
7 import org.springframework.transaction.jta.JtaTransactionManager;
8
9 import javax.transaction.UserTransaction;
10
11 /**
12 * 事务管理器配置类
13 *
14 * @author denny
15 */
16 @Configuration
17 public class JtaTransactionManagerConfig {
18
19 @Bean(name = "atomikosTransactionManager")
20 public JtaTransactionManager regTransactionManager() {
21 UserTransactionManager userTransactionManager = new UserTransactionManager();
22 UserTransaction userTransaction = new UserTransactionImp();
23 return new JtaTransactionManager(userTransaction, userTransactionManager);
24 }
25 }
5. addUser方法
1 package study.service.impl;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.stereotype.Service;
5 import study.domain.User;
6 import study.repository.master.UserRepository;
7 import study.service.UserService;
8
9 import javax.annotation.Resource;
10
11 /**
12 * @Description
13 * @author denny
14 * @date 2018/8/27 下午5:31
15 */
16 @Slf4j
17 @Service
18 public class UserServiceImpl implements UserService{
19 @Resource
20 private UserRepository userRepository;
21
22 //@Transactional(propagation= Propagation.REQUIRED, rollbackFor = Exception.class)
23 @Override
24 public void addUser(int id, String name) {
25 log.info("[addUser] begin!!!");
26 User user = new User();
27 user.setId(id);
28 user.setName(name);
29 userRepository.insert(user);
30
31 log.info("[addUser] end!!! ");
32 //创造一个异常,看回滚情况
33 //throw new RuntimeException();
34 }
35
36 }
6. addUserMsg方法
1 package study.service.impl;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.springframework.stereotype.Service;
5 import org.springframework.transaction.annotation.Transactional;
6 import study.domain.UserMsg;
7 import study.repository.slave.UserMsgRepository;
8 import study.service.UserMsgService;
9 import study.service.UserService;
10
11 import javax.annotation.Resource;
12
13 /**
14 * @author denny
15 * @Description
16 * @date 2018/8/27 下午5:31
17 */
18 @Slf4j
19 @Service
20 public class UserMsgServiceImpl implements UserMsgService {
21
22 @Resource
23 private UserService userService;
24
25 @Resource
26 private UserMsgRepository userMsgRepository;
27
28 /**
29 * 新增带备注的用户:申明式分布式事务(atomikos)
30 *
31 * @param id
32 * @param name
33 * @param msg
34 */
35 @Transactional(transactionManager = "atomikosTransactionManager", rollbackFor = Exception.class)
36 @Override
37 public void addUserMsg(int id, String name, String msg) {
38 log.info("[addUserMsg] begin!!!");
39
40 // 1.插入用户
41 userService.addUser(id, name);
42
43 UserMsg userMsg = new UserMsg();
44 userMsg.setUserId(id);
45 userMsg.setMsg(msg);
46 // 2.插入用户备注
47 userMsgRepository.insert(userMsg);
48
49 log.info("[addUserMsg] end!!! ");
50 //创造一个异常,看回滚情况
51 //throw new RuntimeException();
52 }
53 }
注:MasterRepositoryConfig中定义了@Bean(name = "masterDataSource"),SlaveRepositoryConfig中定义了@Bean(name = "slaveDataSource"),
返回类型都是javax.sql.DataSource.必须使用@Primary注释一个,否则spring boot启动配置类启动时校验会报错,无法定位唯一的bean,原因:飞机票
2.3 测试验证
2.3.1 测试类
BaseTest测试基类
1 package study;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.junit.runner.RunWith;
5 import org.springframework.boot.test.context.SpringBootTest;
6 import org.springframework.test.context.junit4.SpringRunner;
7
8 @Slf4j
9 @RunWith(SpringRunner.class)
10 @SpringBootTest(classes = StudyDemoApplication.class)
11 public class BaseTest {
12
13 }
业务测试类JtaTest
1 package study.jta;
2
3 import org.junit.Test;
4 import study.BaseTest;
5 import study.service.UserMsgService;
6
7 import javax.annotation.Resource;
8
9 /**
10 * @author denny
11 * @Description 分布式事务管理测试类
12 * @date 2018/5/11 下午6:35
13 */
14 public class JtaTest extends BaseTest {
15
16 @Resource
17 private UserMsgService userMsgService;
18
19 @Test
20 public void jtaTest() {
21 this.userMsgService.addUserMsg(22, "test22", "test22备注");
22 }
23
24 }
2.3.2 结果
看控制台日志可见,大体流程:
- 初始化AtomikosDataSoureBean
- spring 使用JtaTransactionManager来管理分布式事务
- 使用自定义的atomikos包的UserTransactionImpl和UserTransactionManager。
- atomikos包下的XAResourceTransaction来和mysql交互。
下一节介绍源码剖析,深入源码理解做了什么。
