zoukankan      html  css  js  c++  java
  • 分布式事务(五)源码详解

    正文

    系列目录

    分布式事务(一)原理概览

    分布式事务(二)JTA规范

    分布式事务(三)mysql对XA协议的支持

    分布式事务(四)简单样例

    分布式事务(五)源码详解

    分布式事务(六)总结提高

    引子

    本节我们将会从上一节的”简单样例“入手:Spring Boot+Atomikos(TM)+Mybatis(ORM)+Mysql(DB),深入源码,看看这个分布式事务是怎么定义、执行的。

    先来回忆一下第二节讲的JTA规范,如下图。Atomikos是什么角色?起到什么作用?

    角色:

    Atomikos根本上是一个事务管理器(TM)也就是JTA模型的核心,上图扇形的中心位置。

    作用:

    TM调用 【Resource Manager资源管理器】 的XAResource接口来实现事务操作。

    TM依赖 【Application Server应用服务器】 的TransactionManager接口当然如果服务器不支持事务管理,自然也就只能使用第三方包,例如Atomikos。

    TM依赖 【Application应用程序】 设置事务边界、属性,application调用UserTransaction接口控制事务开始、提交、回滚。

    一、bean定义

    1.1 JtaTransactionManager

    org.springframework.transaction.jta.JtaTransactionManager类是spring提供的分布式事务管理器。

    JtaTransactionManager类图如下:

    实现了接口如下:

    • PlatformTransactionManager :获取事务,提交事务,回滚事务
    • TransactionFactory:创建事务
    • InitializingBean:初始化bean

    JtaTransactionManager实现了InitializingBean接口的afterPropertiesSet()方法,处于bean生命周期的容器初始化->实例化期->初始化中期,如下图:

    下面我们看一下JtaTransactionManager在bean初始化中期InitializingBean接口的afterPropertiesSet()做了什么:

    复制代码
     1 /**
     2  * Initialize the UserTransaction as well as the TransactionManager handle.
     3  * @see #initUserTransactionAndTransactionManager()
     4  */
     5 @Override
     6 public void afterPropertiesSet() throws TransactionSystemException {
     7     initUserTransactionAndTransactionManager();
     8     checkUserTransactionAndTransactionManager();
     9     initTransactionSynchronizationRegistry();
    10 }
    复制代码
    • 1.initUserTransactionAndTransactionManager:初始化UserTransaction和TransactionManager接口。主要是如果没有定义的话,可以支持JNDI。

    • 2.checkUserTransactionAndTransactionManager:校验2个接口是否存在。UserTransaction必须定义,TransactionManager可以不定义。

          源码如下:

          

          对应控制台打印:

    o.s.t.jta.JtaTransactionManager          : Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@614aeccc
    o.s.t.jta.JtaTransactionManager          : Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@5116ac09
    • 3.initTransactionSynchronizationRegistry:初始化事务同步注册,这个不使用JNDI的话没啥用。

    上一节分布式事务(三)简单样例中我们配置了JtaTransactionManagerConfig类,如下:

    复制代码
     1 package study.config.datasource;
     2 
     3 import com.atomikos.icatch.jta.UserTransactionImp;
     4 import com.atomikos.icatch.jta.UserTransactionManager;
     5 import org.springframework.context.annotation.Bean;
     6 import org.springframework.context.annotation.Configuration;
     7 import org.springframework.transaction.jta.JtaTransactionManager;
     8 
     9 import javax.transaction.UserTransaction;
    10 
    11 /**
    12  * 事务管理器配置类
    13  *
    14  * @author denny
    15  */
    16 @Configuration
    17 public class JtaTransactionManagerConfig {
    18 
    19     @Bean(name = "atomikosTransactionManager")
    20     public JtaTransactionManager regTransactionManager() {
    21         UserTransactionManager userTransactionManager = new UserTransactionManager();
    22         UserTransaction userTransaction = new UserTransactionImp();
    23         return new JtaTransactionManager(userTransaction, userTransactionManager);
    24     }
    25 }
    复制代码

     如上图,我们定义了一个name = "atomikosTransactionManager"的bean,具体类型为JtaTransactionManager。其中构造了2个实现类UserTransactionImp(javax.transaction.UserTransaction接口)、UserTransactionManager(javax.transaction.TransactionManager接口)。并用这2个实现类构造了一个JtaTransactionManager。

    1.UserTransaction接口

    提供给用户操控事务的:开启,提交,回滚等等。源码如下:

    2 TransactionManager接口

    源码如下:

    相比UserTransaction,TransactionManager接口多了接口的挂起、恢复、获取事务3个接口。这3个方法明显是留给系统自己调用的。

    1.2 AtomikosDataSourceBean

    Spring 为Atomikos定制了一个org.springframework.boot.jta.atomikos.AtomikosDataSourceBean,提供了bean生命周期的一些接口:

    1. BeanNameAware:设置bean名称
    2. InitializingBean:初始化bean
    3. DisposableBean:销毁bean

    我们只需要定义这个bean即可轻松使得spring来维护。

    com.atomikos.jdbc.AtomikosDataSourceBean类图如下:

    其中核心接口:

    DataSource接口:getConnection获取数据库连接

    ConnectionPoolProperties接口:用于载入连接池的属性

    二、源码剖析

    2.1 自动配置类

    老套路哈,spring boot就这么点花花肠子,既然使用@Transactional这种注解的方式,那么我们就从springboot 容器启动时的自动配置载入(spring boot容器启动详解)开始看。在/META-INF/spring.factories中配置文件中查找,如下图:

    载入2个关于事务的自动配置类: 

    org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration,
    org.springframework.boot.autoconfigure.transaction.jta.JtaAutoConfiguration,

    由于本文是分布式事务,故2个配置文件都生效了,我们先看JtaAutoConfiguration

    2.2 JtaAutoConfiguration

    复制代码
     1 /**
     2  * {@link EnableAutoConfiguration Auto-configuration} for JTA.
     3  *
     4  * @author Josh Long
     5  * @author Phillip Webb
     6  * @since 1.2.0
     7  */
     8 @ConditionalOnClass(javax.transaction.Transaction.class)
     9 @ConditionalOnProperty(prefix = "spring.jta", value = "enabled", matchIfMissing = true)
    10 @AutoConfigureBefore({ XADataSourceAutoConfiguration.class,
    11         ActiveMQAutoConfiguration.class, ArtemisAutoConfiguration.class,
    12         HibernateJpaAutoConfiguration.class })
    13 @Import({ JndiJtaConfiguration.class, BitronixJtaConfiguration.class,
    14         AtomikosJtaConfiguration.class, NarayanaJtaConfiguration.class })
    15 @EnableConfigurationProperties(JtaProperties.class)
    16 public class JtaAutoConfiguration {
    17 
    18 }
    复制代码

    如上,JtaAutoConfiguration这个类竟然是个空壳,只有一堆注解,挑几个重要的讲一讲:

    1.@ConditionalOnClass(javax.transaction.Transaction.class):代表类路径下存在javax.transaction.Transaction.class这个类,那么JtaAutoConfiguration生效。

    2.@ConditionalOnProperty(prefix = "spring.jta", value = "enabled", matchIfMissing = true),自动开启spring.jta.enabled=true.

    3.@Import({ JndiJtaConfiguration.class, BitronixJtaConfiguration.class, AtomikosJtaConfiguration.class, NarayanaJtaConfiguration.class }),又是spring套路哈,用来导入类。这里导入了4个配置类,可见支持4种第三方事务管理器。AtomikosJtaConfiguration.class自然就是Atomikos了。

    AtomikosJtaConfiguration.class这个配置类

    复制代码
     1 @Configuration
     2 @EnableConfigurationProperties({ AtomikosProperties.class, JtaProperties.class })
     3 @ConditionalOnClass({ JtaTransactionManager.class, UserTransactionManager.class })
     4 @ConditionalOnMissingBean(PlatformTransactionManager.class)
     5 class AtomikosJtaConfiguration {
     6 
     7     private final JtaProperties jtaProperties;
     8 
     9     private final TransactionManagerCustomizers transactionManagerCustomizers;
    10 
    11     AtomikosJtaConfiguration(JtaProperties jtaProperties,
    12             ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers) {
    13         this.jtaProperties = jtaProperties;
    14         this.transactionManagerCustomizers = transactionManagerCustomizers
    15                 .getIfAvailable();
    16     }
    17 
    18     @Bean(initMethod = "init", destroyMethod = "shutdownForce")
    19     @ConditionalOnMissingBean(UserTransactionService.class)
    20     public UserTransactionServiceImp userTransactionService(
    21             AtomikosProperties atomikosProperties) {
    22         Properties properties = new Properties();
    23         if (StringUtils.hasText(this.jtaProperties.getTransactionManagerId())) {
    24             properties.setProperty("com.atomikos.icatch.tm_unique_name",
    25                     this.jtaProperties.getTransactionManagerId());
    26         }
    27         properties.setProperty("com.atomikos.icatch.log_base_dir", getLogBaseDir());
    28         properties.putAll(atomikosProperties.asProperties());
    29         return new UserTransactionServiceImp(properties);
    30     }
    31 
    32     private String getLogBaseDir() {
    33         if (StringUtils.hasLength(this.jtaProperties.getLogDir())) {
    34             return this.jtaProperties.getLogDir();
    35         }
    36         File home = new ApplicationHome().getDir();
    37         return new File(home, "transaction-logs").getAbsolutePath();
    38     }
    39 
    40     @Bean(initMethod = "init", destroyMethod = "close")
    41     @ConditionalOnMissingBean
    42     public UserTransactionManager atomikosTransactionManager(
    43             UserTransactionService userTransactionService) throws Exception {
    44         UserTransactionManager manager = new UserTransactionManager();
    45         manager.setStartupTransactionService(false);
    46         manager.setForceShutdown(true);
    47         return manager;
    48     }
    49 
    50     @Bean
    51     @ConditionalOnMissingBean(XADataSourceWrapper.class)
    52     public AtomikosXADataSourceWrapper xaDataSourceWrapper() {
    53         return new AtomikosXADataSourceWrapper();
    54     }
    55 
    56     @Bean
    57     @ConditionalOnMissingBean
    58     public static AtomikosDependsOnBeanFactoryPostProcessor atomikosDependsOnBeanFactoryPostProcessor() {
    59         return new AtomikosDependsOnBeanFactoryPostProcessor();
    60     }
    61 
    62     @Bean
    63     public JtaTransactionManager transactionManager(UserTransaction userTransaction,
    64             TransactionManager transactionManager) {
    65         JtaTransactionManager jtaTransactionManager = new JtaTransactionManager(
    66                 userTransaction, transactionManager);
    67         if (this.transactionManagerCustomizers != null) {
    68             this.transactionManagerCustomizers.customize(jtaTransactionManager);
    69         }
    70         return jtaTransactionManager;
    71     }
    72 
    73     @Configuration
    74     @ConditionalOnClass(Message.class)
    75     static class AtomikosJtaJmsConfiguration {
    76 
    77         @Bean
    78         @ConditionalOnMissingBean(XAConnectionFactoryWrapper.class)
    79         public AtomikosXAConnectionFactoryWrapper xaConnectionFactoryWrapper() {
    80             return new AtomikosXAConnectionFactoryWrapper();
    81         }
    82 
    83     }
    84 
    85 }
    复制代码

    2.3 TransactionAutoConfiguration

    这里和本地事务分析过程一致,就不再重复,飞机票spring事务详解(三)源码详解,一直看到第二节结束.这里只截个图:

    最终源码调用具体事务管理器的PlatformTransactionManager接口的3个方法:

    复制代码
    复制代码
    1 public interface PlatformTransactionManager {
    2     // 获取事务状态
    3     TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    4   // 事务提交
    5     void commit(TransactionStatus status) throws TransactionException;
    6   // 事务回滚
    7     void rollback(TransactionStatus status) throws TransactionException;
    8 }
    复制代码
    复制代码

    三、核心源码

    核心实现类图:

    如上提所示,PlatformTransactionManager顶级接口定义了最核心的事务管理方法,下面一层是AbstractPlatformTransactionManager抽象类,实现了PlatformTransactionManager接口的方法并定义了一些抽象方法,供子类拓展。最下面一层是2个经典事务管理器:

    1.DataSourceTransactionmanager: 即本地单资源事务管理器。

    2.JtaTransactionManager: 即多资源事务管理器(又叫做分布式事务管理器),其实现了JTA规范,使用XA协议进行两阶段提交。

    我们这里自然是JTA分布式环境,我们只需要从JtaTransactionManager这个实现类入手即可。

    3.1 getTransaction获取事务

    AbstractPlatformTransactionManager实现了getTransaction()方法如下:

    复制代码
     1 @Override
     2     public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
     3         Object transaction = doGetTransaction();
     4 
     5         // Cache debug flag to avoid repeated checks.
     6         boolean debugEnabled = logger.isDebugEnabled();
     7 
     8         if (definition == null) {
     9             // Use defaults if no transaction definition given.
    10             definition = new DefaultTransactionDefinition();
    11         }
    12       // 如果当前已经存在事务
    13         if (isExistingTransaction(transaction)) {
    14             // 根据不同传播机制不同处理
    15             return handleExistingTransaction(definition, transaction, debugEnabled);
    16         }
    17 
    18         // 超时不能小于默认值
    19         if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    20             throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    21         }
    22 
    23         // 当前不存在事务,传播机制=MANDATORY(支持当前事务,没事务报错),报错
    24         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    25             throw new IllegalTransactionStateException(
    26                     "No existing transaction found for transaction marked with propagation 'mandatory'");
    27         }// 当前不存在事务,传播机制=REQUIRED/REQUIRED_NEW/NESTED,这三种情况,需要新开启事务,且加上事务同步
    28         else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    29                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    30                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    31             SuspendedResourcesHolder suspendedResources = suspend(null);
    32             if (debugEnabled) {
    33                 logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
    34             }
    35             try {// 是否需要新开启同步// 开启// 开启
    36                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    37                 DefaultTransactionStatus status = newTransactionStatus(
    38                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    39                 doBegin(transaction, definition);// 开启新事务
    40                 prepareSynchronization(status, definition);//预备同步
    41                 return status;
    42             }
    43             catch (RuntimeException ex) {
    44                 resume(null, suspendedResources);
    45                 throw ex;
    46             }
    47             catch (Error err) {
    48                 resume(null, suspendedResources);
    49                 throw err;
    50             }
    51         }
    52         else {
    53             // 当前不存在事务当前不存在事务,且传播机制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,这三种情况,创建“空”事务:没有实际事务,但可能是同步。警告:定义了隔离级别,但并没有真实的事务初始化,隔离级别被忽略有隔离级别但是并没有定义实际的事务初始化,有隔离级别但是并没有定义实际的事务初始化,
    54             if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    55                 logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    56                         "isolation level will effectively be ignored: " + definition);
    57             }
    58             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    59             return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    60         }
    61     }
    复制代码

    上图核心步骤就是:

    • 1.doGetTransaction():获取事务
    • 2.doBegin:准备工作

    3.1.1 JtaTransactionManager的doGetTransaction()

    其实也就是把UserTransaction封装成一个JtaTransactionObject返回。

    复制代码
     1     @Override
     2     protected Object doGetTransaction() {
     3         UserTransaction ut = getUserTransaction();
     4         if (ut == null) {
     5             throw new CannotCreateTransactionException("No JTA UserTransaction available - " +
     6                     "programmatic PlatformTransactionManager.getTransaction usage not supported");
     7         }
     8         if (!this.cacheUserTransaction) {
     9             ut = lookupUserTransaction(
    10                     this.userTransactionName != null ? this.userTransactionName : DEFAULT_USER_TRANSACTION_NAME);
    11         }
    12         return doGetJtaTransaction(ut);
    13     }
    14 
    15     /**
    16      * Get a JTA transaction object for the given current UserTransaction.
    17      * <p>Subclasses can override this to provide a JtaTransactionObject
    18      * subclass, for example holding some additional JTA handle needed.
    19      * @param ut the UserTransaction handle to use for the current transaction
    20      * @return the JtaTransactionObject holding the UserTransaction
    21      */
    22     protected JtaTransactionObject doGetJtaTransaction(UserTransaction ut) {
    23         return new JtaTransactionObject(ut);
    24     }
    复制代码

    3.1.2 JtaTransactionManager.doBegin

    复制代码
     1 @Override
     2     protected void doBegin(Object transaction, TransactionDefinition definition) {
     3         JtaTransactionObject txObject = (JtaTransactionObject) transaction;
     4         try {
     5             doJtaBegin(txObject, definition);
     6         }
     7         catch (NotSupportedException ex) {
     8             // assume nested transaction not supported
     9             throw new NestedTransactionNotSupportedException(
    10                     "JTA implementation does not support nested transactions", ex);
    11         }
    12         catch (UnsupportedOperationException ex) {
    13             // assume nested transaction not supported
    14             throw new NestedTransactionNotSupportedException(
    15                     "JTA implementation does not support nested transactions", ex);
    16         }
    17         catch (SystemException ex) {
    18             throw new CannotCreateTransactionException("JTA failure on begin", ex);
    19         }
    20     }
    复制代码

    调用JtaTransactionManager.doJtaBegin:

    复制代码
    1 protected void doJtaBegin(JtaTransactionObject txObject, TransactionDefinition definition)
    2             throws NotSupportedException, SystemException {
    3         
    4         applyIsolationLevel(txObject, definition.getIsolationLevel());
    5         int timeout = determineTimeout(definition);
    6         applyTimeout(txObject, timeout);
    7         txObject.getUserTransaction().begin();
    8     }
    复制代码

    UserTransactionImp.begin->TransactionManagerImp.begin

    复制代码
     1 public void begin ( int timeout ) throws NotSupportedException,
     2             SystemException
     3     {
     4         CompositeTransaction ct = null;
     5         ResumePreviousTransactionSubTxAwareParticipant resumeParticipant = null;
     6         
     7         ct = compositeTransactionManager.getCompositeTransaction();
     8         if ( ct != null && ct.getProperty (  JTA_PROPERTY_NAME ) == null ) {
     9             LOGGER.logWarning ( "JTA: temporarily suspending incompatible transaction: " + ct.getTid() +
    10                     " (will be resumed after JTA transaction ends)" );
    11             ct = compositeTransactionManager.suspend();
    12             resumeParticipant = new ResumePreviousTransactionSubTxAwareParticipant ( ct );
    13         }
    14         
    15         try {
    16             ct = compositeTransactionManager.createCompositeTransaction ( ( ( long ) timeout ) * 1000 );
    17             if ( resumeParticipant != null ) ct.addSubTxAwareParticipant ( resumeParticipant );
    18             if ( ct.isRoot () && getDefaultSerial () )
    19                 ct.getTransactionControl ().setSerial ();
    20             ct.setProperty ( JTA_PROPERTY_NAME , "true" );
    21         } catch ( SysException se ) {
    22             String msg = "Error in begin()";
    23             LOGGER.logWarning( msg , se );
    24             throw new ExtendedSystemException ( msg , se
    25                     .getErrors () );
    26         }
    27         recreateCompositeTransactionAsJtaTransaction(ct);
    28     }
    复制代码

    createCompositeTransaction创建混合事务

    复制代码
     1 public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
     2     {
     3         Stack errors = new Stack();
     4         CompositeTransaction ct = null , ret = null;
     5         // 获取当前线程绑定的事务
     6         ct = getCurrentTx ();
           // 当前线程不存在事务 7 if ( ct == null ) {
    // 创建组合事务 8 ret = service_.createCompositeTransaction ( timeout ); 9 if(LOGGER.isInfoEnabled()){ 10 LOGGER.logInfo("createCompositeTransaction ( " + timeout + " ): " 11 + "created new ROOT transaction with id " + ret.getTid ()); 12 }
            // 当前线程存在事务 13 } else { 14 if(LOGGER.isInfoEnabled()) LOGGER.logInfo("createCompositeTransaction ( " + timeout + " )");
              // 创建子事务 15 ret = ct.getTransactionControl ().createSubTransaction (); 16 17 } 18 Thread thread = Thread.currentThread ();
           // 绑定当前线程和事务的2个映射map 19 setThreadMappings ( ret, thread ); 20 21 return ret; 22 }
    复制代码

    如果当前线程不存在事务,创建组合事务。如果当前线程存在事务,创建子事务。

    调用TransactionServiceImp的createCompositeTransaction创建混合事务

    复制代码
     1 public CompositeTransaction createCompositeTransaction ( long timeout ) throws SysException
     2     {
     3         if ( !initialized_ ) throw new IllegalStateException ( "Not initialized" );
     4 
     5         if ( maxNumberOfActiveTransactions_ >= 0 && 
     6              tidToTransactionMap_.size () >= maxNumberOfActiveTransactions_ ) {
     7             throw new IllegalStateException ( "Max number of active transactions reached:" + maxNumberOfActiveTransactions_ );
     8         }
     9         
    10         String tid = tidmgr_.get ();
    11         Stack lineage = new Stack ();
    12         //创建协调者
    15         CoordinatorImp cc = createCC ( null, tid, true, false, timeout );
           // 创建组合事务 16 CompositeTransaction ct = createCT ( tid, cc, lineage, false ); 17 return ct; 18 }
    复制代码

    3.2 commit 提交事务

    事务提交流程图如下:

     

    AbstractPlatformTransactionManager的commit源码如下:

    复制代码
    复制代码
     1 @Override
     2     public final void commit(TransactionStatus status) throws TransactionException {
     3         if (status.isCompleted()) {// 如果事务已完结,报错无法再次提交
     4             throw new IllegalTransactionStateException(
     5                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
     6         }
     7 
     8         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
     9         if (defStatus.isLocalRollbackOnly()) {// 如果事务明确标记为回滚,
    10             if (defStatus.isDebug()) {
    11                 logger.debug("Transactional code has requested rollback");
    12             }
    13             processRollback(defStatus);//执行回滚
    14             return;
    15         }//如果不需要全局回滚时提交 且 全局回滚
    16         if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
    17             if (defStatus.isDebug()) {
    18                 logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
    19             }//执行回滚
    20             processRollback(defStatus);
    21             // 仅在最外层事务边界(新事务)或显式地请求时抛出“未期望的回滚异常”
    23             if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
    24                 throw new UnexpectedRollbackException(
    25                         "Transaction rolled back because it has been marked as rollback-only");
    26             }
    27             return;
    28         }
    29      // 执行提交事务
    30         processCommit(defStatus);
    31     }
    复制代码
    复制代码

    如上图,各种判断:

    • 1.如果事务明确标记为本地回滚,-》执行回滚
    • 2.如果不需要全局回滚时提交 且 全局回滚-》执行回滚
    • 3.提交事务,核心方法processCommit()

    processCommit如下:

    复制代码
    复制代码
     1 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
     2         try {
     3             boolean beforeCompletionInvoked = false;
     4             try {//3个前置操作
     5                 prepareForCommit(status);
     6                 triggerBeforeCommit(status);
     7                 triggerBeforeCompletion(status);
     8                 beforeCompletionInvoked = true;//3个前置操作已调用
     9                 boolean globalRollbackOnly = false;//新事务 或 全局回滚失败
    10                 if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
    11                     globalRollbackOnly = status.isGlobalRollbackOnly();
    12                 }//1.有保存点,即嵌套事务
    13                 if (status.hasSavepoint()) {
    14                     if (status.isDebug()) {
    15                         logger.debug("Releasing transaction savepoint");
    16                     }//释放保存点
    17                     status.releaseHeldSavepoint();
    18                 }//2.新事务
    19                 else if (status.isNewTransaction()) {
    20                     if (status.isDebug()) {
    21                         logger.debug("Initiating transaction commit");
    22                     }//调用事务处理器提交事务
    23                     doCommit(status);
    24                 }
    25                 // 3.非新事务,且全局回滚失败,但是提交时没有得到异常,抛出异常
    27                 if (globalRollbackOnly) {
    28                     throw new UnexpectedRollbackException(
    29                             "Transaction silently rolled back because it has been marked as rollback-only");
    30                 }
    31             }
    32             catch (UnexpectedRollbackException ex) {
    33                 // 触发完成后事务同步,状态为回滚
    34                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    35                 throw ex;
    36             }// 事务异常
    37             catch (TransactionException ex) {
    38                 // 提交失败回滚
    39                 if (isRollbackOnCommitFailure()) {
    40                     doRollbackOnCommitException(status, ex);
    41                 }// 触发完成后回调,事务同步状态为未知
    42                 else {
    43                     triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    44                 }
    45                 throw ex;
    46             }// 运行时异常
    47             catch (RuntimeException ex) {
                // 如果3个前置步骤未完成,调用前置的最后一步操作 48 if (!beforeCompletionInvoked) { 49 triggerBeforeCompletion(status); 50 }// 提交异常回滚 51 doRollbackOnCommitException(status, ex); 52 throw ex; 53 }// 其它异常 54 catch (Error err) {  
                // 如果3个前置步骤未完成,调用前置的最后一步操作 55 if (!beforeCompletionInvoked) { 56 triggerBeforeCompletion(status); 57 }// 提交异常回滚 58 doRollbackOnCommitException(status, err); 59 throw err; 60 } 61 62 // Trigger afterCommit callbacks, with an exception thrown there 63 // propagated to callers but the transaction still considered as committed. 64 try { 65 triggerAfterCommit(status); 66 } 67 finally { 68 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); 69 } 70 71 } 72 finally { 73 cleanupAfterCompletion(status); 74 } 75 }
    复制代码
    复制代码

    如上图,commit事务时,有6个核心操作,分别是3个前置操作,3个后置操作,如下:

    1.prepareForCommit(status);源码是空的,没有拓展目前。

    2.triggerBeforeCommit(status); 提交前触发操作

    复制代码
    复制代码
    1 protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
    2         if (status.isNewSynchronization()) {
    3             if (status.isDebug()) {
    4                 logger.trace("Triggering beforeCommit synchronization");
    5             }
    6             TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
    7         }
    8     }
    复制代码
    复制代码

    triggerBeforeCommit源码如下:

    1 public static void triggerBeforeCommit(boolean readOnly) {
    2         for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
    3             synchronization.beforeCommit(readOnly);
    4         }
    5     }

     如上图,TransactionSynchronizationManager类定义了多个ThreadLocal(线程本地变量),其中一个用以保存当前线程的事务同步:

    private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

    遍历事务同步器,把每个事务同步器都执行“提交前”操作,比如咱们用的jdbc事务,那么最终就是SqlSessionUtils.beforeCommit()->this.holder.getSqlSession().commit();提交会话。

    3.triggerBeforeCompletion(status);完成前触发操作,如果是jdbc事务,那么最终就是

    SqlSessionUtils.beforeCompletion->

    TransactionSynchronizationManager.unbindResource(sessionFactory); 解绑当前线程的会话工厂

    this.holder.getSqlSession().close();关闭会话。

    4.triggerAfterCommit(status);提交事务后触发操作。TransactionSynchronizationUtils.triggerAfterCommit();->TransactionSynchronizationUtils.invokeAfterCommit,如下:

    复制代码
    复制代码
    1 public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
    2         if (synchronizations != null) {
    3             for (TransactionSynchronization synchronization : synchronizations) {
    4                 synchronization.afterCommit();
    5             }
    6         }
    7     }
    复制代码
    复制代码

    好吧,一顿找,最后在TransactionSynchronizationAdapter中复写过,并且是空的....SqlSessionSynchronization继承了TransactionSynchronizationAdapter但是没有复写这个方法。

    5. triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);

    TransactionSynchronizationUtils.TransactionSynchronizationUtils.invokeAfterCompletion,如下:

    复制代码
    复制代码
     1 public static void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) {
     2         if (synchronizations != null) {
     3             for (TransactionSynchronization synchronization : synchronizations) {
     4                 try {
     5                     synchronization.afterCompletion(completionStatus);
     6                 }
     7                 catch (Throwable tsex) {
     8                     logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
     9                 }
    10             }
    11         }
    12     }
    复制代码
    复制代码

    afterCompletion:对于JDBC事务来说,最终:

    1)如果会话任然活着,关闭会话,

    2)重置各种属性:SQL会话同步器(SqlSessionSynchronization)的SQL会话持有者(SqlSessionHolder)的referenceCount引用计数、synchronizedWithTransaction同步事务、rollbackOnly只回滚、deadline超时时间点。

    6.cleanupAfterCompletion(status);

    1)设置事务状态为已完成。

    2)  如果是新的事务同步,解绑当前线程绑定的数据库资源,重置数据库连接

    3)如果存在挂起的事务(嵌套事务),唤醒挂起的老事务的各种资源:数据库资源、同步器。

    复制代码
    复制代码
     1     private void cleanupAfterCompletion(DefaultTransactionStatus status) {
     2         status.setCompleted();//设置事务状态完成
           //如果是新的同步,清空当前线程绑定的除了资源外的全部线程本地变量:包括事务同步器、事务名称、只读属性、隔离级别、真实的事务激活状态 3 if (status.isNewSynchronization()) { 4 TransactionSynchronizationManager.clear(); 5 }//如果是新的事务同步 6 if (status.isNewTransaction()) { 7 doCleanupAfterCompletion(status.getTransaction()); 8 }//如果存在挂起的资源 9 if (status.getSuspendedResources() != null) { 10 if (status.isDebug()) { 11 logger.debug("Resuming suspended transaction after completion of inner transaction"); 12 }//唤醒挂起的事务和资源(重新绑定之前挂起的数据库资源,唤醒同步器,注册同步器到TransactionSynchronizationManager) 13 resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources()); 14 } 15 }
    复制代码
    复制代码

    对于DataSourceTransactionManager,doCleanupAfterCompletion源码如下:

    复制代码
    复制代码
     1     protected void doCleanupAfterCompletion(Object transaction) {
     2         DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
     3 
     4         // 如果是最新的连接持有者,解绑当前线程绑定的<数据库资源,ConnectionHolder>
     5         if (txObject.isNewConnectionHolder()) {
     6             TransactionSynchronizationManager.unbindResource(this.dataSource);
     7         }
     8 
     9         // 重置数据库连接(隔离级别、只读)
    10         Connection con = txObject.getConnectionHolder().getConnection();
    11         try {
    12             if (txObject.isMustRestoreAutoCommit()) {
    13                 con.setAutoCommit(true);
    14             }
    15             DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
    16         }
    17         catch (Throwable ex) {
    18             logger.debug("Could not reset JDBC Connection after transaction", ex);
    19         }
    20 
    21         if (txObject.isNewConnectionHolder()) {
    22             if (logger.isDebugEnabled()) {
    23                 logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
    24             }// 资源引用计数-1,关闭数据库连接
    25             DataSourceUtils.releaseConnection(con, this.dataSource);
    26         }
    27         // 重置连接持有者的全部属性
    28         txObject.getConnectionHolder().clear();
    29     }
    复制代码
    复制代码

    上面这6个方法是AbstractPlatformTransactionManager做的事,本地事务和分布式事务都会执行。

    doCommit就是调用事务管理器来实现事务提交。分布式事务环境下调用的是:JtaTransactionManager.doCommit()。

    3.2.1 JtaTransactionManager.doCommit

    复制代码
     1 @Override
     2     protected void doCommit(DefaultTransactionStatus status) {
     3         JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction();
     4         try {
     5             int jtaStatus = txObject.getUserTransaction().getStatus();
     6             if (jtaStatus == Status.STATUS_NO_TRANSACTION) {
     7                 // 事务状态=已完结,非事务状态。一般不会触发
    10                 throw new UnexpectedRollbackException("JTA transaction already completed - probably rolled back");
    11             }
    12             if (jtaStatus == Status.STATUS_ROLLEDBACK) {
    13                 // 回滚
    16                 try {
    17                     txObject.getUserTransaction().rollback();
    18                 }
    19                 catch (IllegalStateException ex) {
    20                     if (logger.isDebugEnabled()) {
    21                         logger.debug("Rollback failure with transaction already marked as rolled back: " + ex);
    22                     }
    23                 }
    24                 throw new UnexpectedRollbackException("JTA transaction already rolled back (probably due to a timeout)");
    25             }//核心操作:提交事务
    26             txObject.getUserTransaction().commit();
    27         }
    28         catch (RollbackException ex) {
    29             throw new UnexpectedRollbackException(
    30                     "JTA transaction unexpectedly rolled back (maybe due to a timeout)", ex);
    31         }
    32         catch (HeuristicMixedException ex) {
    33             throw new HeuristicCompletionException(HeuristicCompletionException.STATE_MIXED, ex);
    34         }
    35         catch (HeuristicRollbackException ex) {
    36             throw new HeuristicCompletionException(HeuristicCompletionException.STATE_ROLLED_BACK, ex);
    37         }
    38         catch (IllegalStateException ex) {
    39             throw new TransactionSystemException("Unexpected internal transaction state", ex);
    40         }
    41         catch (SystemException ex) {
    42             throw new TransactionSystemException("JTA failure on commit", ex);
    43         }
    44     }
    复制代码

    txObject.getUserTransaction().commit()-->调用UserTransactionImp的commit()

    复制代码
    1 public void commit () throws javax.transaction.RollbackException,
    2             javax.transaction.HeuristicMixedException,
    3             javax.transaction.HeuristicRollbackException,
    4             javax.transaction.SystemException, java.lang.IllegalStateException,
    5             java.lang.SecurityException
    6     {
    7         checkSetup ();
    8         txmgr_.commit ();
    9     }
    复制代码

    TransactionManager的commit

    复制代码
     1 public void commit () throws javax.transaction.RollbackException,
     2             javax.transaction.HeuristicMixedException,
     3             javax.transaction.HeuristicRollbackException,
     4             javax.transaction.SystemException, java.lang.IllegalStateException,
     5             java.lang.SecurityException
     6     {
     7         Transaction tx = getTransaction();
     8         if ( tx == null ) raiseNoTransaction();
     9         tx.commit();
    10     }
    复制代码

    最终调用的是TransactionImp的commit()

    复制代码
     1 public void commit() throws javax.transaction.RollbackException,
     2             javax.transaction.HeuristicMixedException,
     3             javax.transaction.HeuristicRollbackException,
     4             javax.transaction.SystemException, java.lang.SecurityException
     5     {
     6         try {
     7             ct_.commit();
     8         } catch ( HeurHazardException hh ) {
     9             rethrowAsJtaHeuristicMixedException ( hh.getMessage () , hh );
    10         } catch ( HeurRollbackException hr ) {
    11             rethrowAsJtaHeuristicRollbackException ( hr.getMessage () , hr );
    12         } catch ( HeurMixedException hm ) {
    13             rethrowAsJtaHeuristicMixedException ( hm.getMessage () , hm );
    14         } catch ( SysException se ) {
    15             LOGGER.logWarning ( se.getMessage() , se );
    16             throw new ExtendedSystemException ( se.getMessage (), se
    17                     .getErrors () );
    18         } catch ( com.atomikos.icatch.RollbackException rb ) {
    19             //see case 29708: all statements have been closed
    20             String msg = rb.getMessage ();
    21             Throwable cause = rb.getCause();
    22             if (cause == null) cause = rb;
    23             rethrowAsJtaRollbackException (msg , cause);        
    24         }
    25     }
    复制代码

    这里就是调用CompositeTransaction接口的实现类CompositeTransactionImp的commit()

    1 public void commit () throws HeurRollbackException, HeurMixedException,
    2             HeurHazardException, SysException, SecurityException,
    3             RollbackException
    4     {
    5         getTerminator().commit();
    6     }

    调用“组合命令行实现类”CompositeTerminatorImp的commit()

    复制代码
     1 public void commit () throws HeurRollbackException, HeurMixedException,
     2             HeurHazardException, SysException, java.lang.SecurityException,
     3             RollbackException
     4     {
     5         Stack errors = new Stack ();
     6         // 1.标记事务状态
     7         transaction_.doCommit ();
     8         setSiblingInfoForIncoming1pcRequestFromRemoteClient();
     9         
    10         if ( transaction_.isRoot () ) {
    11             try {//2.提交事务,核心操作
    12                 coordinator_.terminate ( true );
    13             }
    14 
    15             catch ( RollbackException rb ) {
    16                 throw rb;
    17             } catch ( HeurHazardException hh ) {
    18                 throw hh;
    19             } catch ( HeurRollbackException hr ) {
    20                 throw hr;
    21             } catch ( HeurMixedException hm ) {
    22                 throw hm;
    23             } catch ( SysException se ) {
    24                 throw se;
    25             } catch ( Exception e ) {
    26                 errors.push ( e );
    27                 throw new SysException (
    28                         "Unexpected error: " + e.getMessage (), errors );
    29             }
    30         }
    31 
    32     }
    复制代码

    如上,1.调用“组合事务实现类”CompositeTransactionImp的doCommit()这里只做标记为非活动,没有提交事务。

    2.调用CoordinatorImp的terminate()终结事务。

    复制代码
     1 protected void terminate ( boolean commit ) throws HeurRollbackException,
     2             HeurMixedException, SysException, java.lang.SecurityException,
     3             HeurCommitException, HeurHazardException, RollbackException,
     4             IllegalStateException
     5 
     6     {    
     7         synchronized ( fsm_ ) {
     8             if ( commit ) {// 如果只有一个参与者,直接一阶段提交
     9                 if ( participants_.size () <= 1 ) {
    10                     commit ( true );
    11                 } else {//二阶段提交:prepare阶段
    12                     int prepareResult = prepare ();
    13                     // 二阶段提交:commit阶段。非只读事务,才需要提交事务,
    14                     if ( prepareResult != Participant.READ_ONLY )
    15                         commit ( false );
    16                 }
    17             } else {
    18                 rollback ();
    19             }
    20         }
    21     }
    复制代码

    如上图是源码的优化精华:

    1.根据参与者判断,如果只有一个参与者,直接优化成一阶段提交。

    2.prepare完后,commit阶段如果是只读事务,不用commit。

    咱们是分布式事务,插入2个库,肯定是两阶段提交。

    CoordinatorImp的commit

    复制代码
     1 public HeuristicMessage[] commit ( boolean onePhase )
     2             throws HeurRollbackException, HeurMixedException,
     3             HeurHazardException, java.lang.IllegalStateException,
     4             RollbackException, SysException
     5     {
     6         HeuristicMessage[] ret = null;
     7         synchronized ( fsm_ ) {
     8             ret = stateHandler_.commit(onePhase);
     9         }
    10         return ret;
    11     }
    复制代码

     追踪到IndoubtStateHandler的commit,这个操作加了同步锁。具体实现如下:

    复制代码
     1 protected HeuristicMessage[] commit ( boolean onePhase )
     2             throws HeurRollbackException, HeurMixedException,
     3             HeurHazardException, java.lang.IllegalStateException,
     4             RollbackException, SysException
     5     {
     6 
     7         return commitWithAfterCompletionNotification ( new CommitCallback() {
     8             public HeuristicMessage[] doCommit()
     9                     throws HeurRollbackException, HeurMixedException,
    10                     HeurHazardException, IllegalStateException,
    11                     RollbackException, SysException {
    12                 return commitFromWithinCallback ( false, false );
    13             }              
    14         });
    15 
    16     }
    复制代码

     如上,核心接口就是CommitCallback的doCommit方法,方法体就是commitFromWithinCallback。

    复制代码
      1 protected HeuristicMessage[] commitFromWithinCallback ( boolean heuristic ,
      2             boolean onePhase ) throws HeurRollbackException,
      3             HeurMixedException, HeurHazardException,
      4             java.lang.IllegalStateException, RollbackException, SysException
      5     {
      6         Stack<Exception> errors = new Stack<Exception> ();
      7         CoordinatorStateHandler nextStateHandler = null;
      8 
      9         try {
     10 
     11             Vector<Participant> participants = coordinator_.getParticipants();
     12             int count = (participants.size () - readOnlyTable_.size ());
     13             TerminationResult commitresult = new TerminationResult ( count );
     14 
     15             // cf bug 64546: avoid committed_ being null upon recovery!
     16             committed_ = new Boolean ( true );
     17             // for replaying completion: commit decision was reached
     18             // otherwise, replay requests might only see TERMINATED!
     19 
     20             try {
     21                 coordinator_.setState ( TxState.COMMITTING );
     22             } catch ( RuntimeException error ) {
     23                 //See case 23334
     24                 String msg = "Error in committing: " + error.getMessage() + " - rolling back instead";
     25                 LOGGER.logWarning ( msg , error );
     26                 try {
     27                     rollbackFromWithinCallback(getCoordinator().isRecoverableWhileActive().booleanValue(),false);
     28                     throw new RollbackException ( msg , error );
     29                 } catch ( HeurCommitException e ) {
     30                     LOGGER.logWarning ( "Illegal heuristic commit during rollback:" + e );
     31                     throw new HeurMixedException ( e.getHeuristicMessages() );
     32                 }
     33             }
     34 
     35 
     36             // start messages
     37             Enumeration<Participant> enumm = participants.elements ();
     38             while ( enumm.hasMoreElements () ) {
     39                 Participant p = enumm.nextElement ();
     40                 if ( !readOnlyTable_.containsKey ( p ) ) {
     41                     CommitMessage cm = new CommitMessage ( p, commitresult,
     42                             onePhase );
     43 
     44                     // if onephase: set cascadelist anyway, because if the
     45                     // participant is a REMOTE one, then it might have
     46                     // multiple participants that are not visible here!
     47 
     48                     if ( onePhase && cascadeList_ != null ) { // null for OTS
     49                         Integer sibnum = (Integer) cascadeList_.get ( p );
     50                         if ( sibnum != null ) // null for local participant!
     51                             p.setGlobalSiblingCount ( sibnum.intValue () );
     52                         p.setCascadeList ( cascadeList_ );
     53                     }
     54                     propagator_.submitPropagationMessage ( cm );
     55                 }
     56             } // while
     57 
     58             commitresult.waitForReplies ();
     59             int res = commitresult.getResult ();
     60 
     61             if ( res != TerminationResult.ALL_OK ) {
     62 
     63                 if ( res == TerminationResult.HEUR_MIXED ) {
     64                     Hashtable<Participant,TxState> hazards = commitresult.getPossiblyIndoubts ();
     65                     Hashtable heuristics = commitresult
     66                             .getHeuristicParticipants ();
     67                     addToHeuristicMap ( heuristics );
     68                     enumm = participants.elements ();
     69                     while ( enumm.hasMoreElements () ) {
     70                         Participant p = (Participant) enumm.nextElement ();
     71                         if ( !heuristics.containsKey ( p ) )
     72                             addToHeuristicMap ( p, TxState.TERMINATED );
     73                     }
     74                     nextStateHandler = new HeurMixedStateHandler ( this,
     75                             hazards );
     76 
     77                     coordinator_.setStateHandler ( nextStateHandler );
     78                     throw new HeurMixedException ( getHeuristicMessages () );
     79                 }
     80 
     81                 else if ( res == TerminationResult.ROLLBACK ) {
     82                     // 1PC and rolled back before commit arrived.
     83                     nextStateHandler = new TerminatedStateHandler ( this );
     84                     coordinator_.setStateHandler ( nextStateHandler );
     85                     throw new RollbackException ( "Rolled back already." );
     86                 } else if ( res == TerminationResult.HEUR_ROLLBACK ) {
     87                     nextStateHandler = new HeurAbortedStateHandler ( this );
     88                     coordinator_.setStateHandler ( nextStateHandler );
     89                     // Here, we do NOT need to add extra information, since ALL
     90                     // participants agreed to rollback. 
     91                     // Therefore, we need not worry about who aborted and who committed.
     92                     throw new HeurRollbackException ( getHeuristicMessages () );
     93 
     94                 }
     95 
     96                 else if ( res == TerminationResult.HEUR_HAZARD ) {
     97                     Hashtable hazards = commitresult.getPossiblyIndoubts ();
     98                     Hashtable heuristics = commitresult
     99                             .getHeuristicParticipants ();
    100                     addToHeuristicMap ( heuristics );
    101                     enumm = participants.elements ();
    102                     while ( enumm.hasMoreElements () ) {
    103                         Participant p = (Participant) enumm.nextElement ();
    104                         if ( !heuristics.containsKey ( p ) )
    105                             addToHeuristicMap ( p, TxState.TERMINATED );
    106                     }
    107                     nextStateHandler = new HeurHazardStateHandler ( this,
    108                             hazards );
    109                     coordinator_.setStateHandler ( nextStateHandler );
    110                     throw new HeurHazardException ( getHeuristicMessages () );
    111                 }
    112 
    113             } else {
    114                 // all OK
    115                 if ( heuristic ) {
    116                     nextStateHandler = new HeurCommittedStateHandler ( this );
    117                     // again, here we do NOT need to preserve extra per-participant
    118                     // state mappings, since ALL participants were heur. committed.
    119                 } else
    120                     nextStateHandler = new TerminatedStateHandler ( this );
    121 
    122                 coordinator_.setStateHandler ( nextStateHandler );
    123             }
    124         } catch ( RuntimeException runerr ) {
    125             errors.push ( runerr );
    126             throw new SysException ( "Error in commit: " + runerr.getMessage (), errors );
    127         }
    128 
    129         catch ( InterruptedException intr ) {
    130             // cf bug 67457
    131             InterruptedExceptionHelper.handleInterruptedException ( intr );
    132             errors.push ( intr );
    133             throw new SysException ( "Error in commit" + intr.getMessage (), errors );
    134         }
    135 
    136         return getHeuristicMessages ();
    137 
    138     }
    复制代码

    如上,构造了一个CommitMessage,调用传播者Propagator的submitPropagationMessage()提交传播消息。-》CommitMessage的send()方法-》Participant的commit().

     -》XAResourceTransaction的commit提交XA资源-》XAResource的commit ( xid_, onePhase );饶了一大圈终于到了最最核心的代码了....我们这里XAResource接口的实现类是MysqlXAConnection。

    复制代码
     1 public void commit(Xid xid, boolean onePhase) throws XAException {
     2         StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
     3         commandBuf.append("XA COMMIT ");
     4         appendXid(commandBuf, xid);
     5 
     6         if (onePhase) {
     7             commandBuf.append(" ONE PHASE");
     8         }
     9 
    10         try {
    11             dispatchCommand(commandBuf.toString());
    12         } finally {
    13             this.underlyingConnection.setInGlobalTx(false);
    14         }
    15     }
    复制代码

    dispatchCommand调度命令如下:

    复制代码
     1 private ResultSet dispatchCommand(String command) throws XAException {
     2         Statement stmt = null;
     3 
     4         try {
     5             if (this.logXaCommands) {
     6                 this.log.logDebug("Executing XA statement: " + command);
     7             }
     8 
     9             // TODO: Cache this for lifetime of XAConnection这里还规划要做缓存 - -!
    10             stmt = this.underlyingConnection.createStatement();
    11        // 核心执行事务提交
    12             stmt.execute(command);
    13 
    14             ResultSet rs = stmt.getResultSet();
    15 
    16             return rs;
    17         } catch (SQLException sqlEx) {
    18             throw mapXAExceptionFromSQLException(sqlEx);
    19         } finally {
    20             if (stmt != null) {
    21                 try {
    22                     stmt.close();
    23                 } catch (SQLException sqlEx) {
    24                 }
    25             }
    26         }
    27     }
    复制代码

    如上就是一个经典的使用jdbc执行sql语句的过程:

    1.使用com.mysql.jdbc.Connection创建Statement。

    2.Statement执行sql命令.

    3.得到结果。

    3.3 rollback回滚事务

    AbstractPlatformTransactionManager中rollback源码如下:

    复制代码
    复制代码
    1     public final void rollback(TransactionStatus status) throws TransactionException {
    2         if (status.isCompleted()) {
    3             throw new IllegalTransactionStateException(
    4                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
    5         }
    6 
    7         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    8         processRollback(defStatus);
    9     }
    复制代码
    复制代码

     processRollback源码如下:

    复制代码
    复制代码
     1     private void processRollback(DefaultTransactionStatus status) {
     2         try {
     3             try {// 解绑当前线程绑定的会话工厂,并关闭会话
     4                 triggerBeforeCompletion(status);
     5                 if (status.hasSavepoint()) {// 1.如果有保存点,即嵌套式事务
     6                     if (status.isDebug()) {
     7                         logger.debug("Rolling back transaction to savepoint");
     8                     }//回滚到保存点
     9                     status.rollbackToHeldSavepoint();
    10                 }//2.如果就是一个简单事务
    11                 else if (status.isNewTransaction()) {
    12                     if (status.isDebug()) {
    13                         logger.debug("Initiating transaction rollback");
    14                     }//回滚核心方法
    15                     doRollback(status);
    16                 }//3.当前存在事务且没有保存点,即加入当前事务的
    17                 else if (status.hasTransaction()) {//如果已经标记为回滚 或 当加入事务失败时全局回滚(默认true)
    18                     if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
    19                         if (status.isDebug()) {//debug时会打印:加入事务失败-标记已存在事务为回滚
    20                             logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
    21                         }//设置当前connectionHolder:当加入一个已存在事务时回滚
    22                         doSetRollbackOnly(status);
    23                     }
    24                     else {
    25                         if (status.isDebug()) {
    26                             logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
    27                         }
    28                     }
    29                 }
    30                 else {
    31                     logger.debug("Should roll back transaction but cannot - no transaction available");
    32                 }
    33             }
    34             catch (RuntimeException ex) {//关闭会话,重置SqlSessionHolder属性
    35                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    36                 throw ex;
    37             }
    38             catch (Error err) {
    39                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    40                 throw err;
    41             }
    42             triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    43         }
    44         finally {、、解绑当前线程
    45             cleanupAfterCompletion(status);
    46         }
    47     }
    复制代码
    复制代码

    核心操作是doRollback,分布式环境下调用的是JtaTransactionManager的doRollback。

    复制代码
     1 @Override
     2     protected void doRollback(DefaultTransactionStatus status) {
     3         JtaTransactionObject txObject = (JtaTransactionObject) status.getTransaction();
     4         try {
     5             int jtaStatus = txObject.getUserTransaction().getStatus();
     6             if (jtaStatus != Status.STATUS_NO_TRANSACTION) {
     7                 try {
     8                     txObject.getUserTransaction().rollback();
     9                 }
    10                 catch (IllegalStateException ex) {
    11                     if (jtaStatus == Status.STATUS_ROLLEDBACK) {
    12                         // Only really happens on JBoss 4.2 in case of an early timeout...
    13                         if (logger.isDebugEnabled()) {
    14                             logger.debug("Rollback failure with transaction already marked as rolled back: " + ex);
    15                         }
    16                     }
    17                     else {
    18                         throw new TransactionSystemException("Unexpected internal transaction state", ex);
    19                     }
    20                 }
    21             }
    22         }
    23         catch (SystemException ex) {
    24             throw new TransactionSystemException("JTA failure on rollback", ex);
    25         }
    26     }
    复制代码
    调用的UserTransactionImp的rollback
    1 public void rollback () throws IllegalStateException, SystemException,
    2             SecurityException
    3     {
    4         checkSetup ();
    5         txmgr_.rollback ();
    6     }
    TransactionManagerImp的rollback
    复制代码
    1 public void rollback () throws IllegalStateException, SystemException,
    2             SecurityException
    3     {
    4         Transaction tx = getTransaction();
    5         if ( tx == null ) raiseNoTransaction();
    6         tx.rollback();
    7        
    8     }
    复制代码
    TransactionImp的rollback
    复制代码
     1 public void rollback() throws IllegalStateException, SystemException
     2     {
     3         try {
     4             ct_.rollback();
     5         } catch ( SysException se ) {
     6             LOGGER.logWarning ( se.getMessage() , se );
     7             throw new ExtendedSystemException ( se.getMessage (), se
     8                     .getErrors () );
     9         }
    10 
    11     }
    复制代码
    CompositeTransactionImp的rollback
    1  public void rollback () throws IllegalStateException, SysException
    2     {
    3         getTerminator().rollback();
    4     }
    CompositeTerminatorImp的rollback
    复制代码
     1 public void rollback () throws IllegalStateException, SysException
     2     {
     3         Stack errors = new Stack ();
     4 
     5         transaction_.doRollback ();
     6 
     7         if ( transaction_.isRoot () )
     8             try {
     9                 coordinator_.terminate ( false );
    10             } catch ( Exception e ) {
    11                 errors.push ( e );
    12                 throw new SysException ( "Unexpected error in rollback: " + e.getMessage (), errors );
    13             }
    14     }
    复制代码

    一直追踪到

    RollbackMessage.send

    复制代码
     1 protected Object send () throws PropagationException
     2     {
     3         Participant part = getParticipant ();
     4         HeuristicMessage[] msgs = null;
     5         try {
     6             msgs = part.rollback ();
     7 
     8         } catch ( HeurCommitException heurc ) {
     9             throw new PropagationException ( heurc, false );
    10         } catch ( HeurMixedException heurm ) {
    11             throw new PropagationException ( heurm, false );
    12         }
    13 
    14         catch ( Exception e ) {
    15             // only retry if might be indoubt. Otherwise ignore.
    16             if ( indoubt_ ) {
    17                 // here, participant might be indoubt!
    18                 // fill in exact heuristic msgs by using buffered effect of proxies
    19                 HeurHazardException heurh = new HeurHazardException ( part.getHeuristicMessages () );
    20                 throw new PropagationException ( heurh, true );
    21             }
    22         }
    23         return msgs;
    24     }
    复制代码
    XAResourceTransaction.rollback->MysqlXAConnection.rollback
    复制代码
     1 public void rollback(Xid xid) throws XAException {
     2         StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
     3         commandBuf.append("XA ROLLBACK ");
     4         appendXid(commandBuf, xid);
     5 
     6         try {
     7             dispatchCommand(commandBuf.toString());
     8         } finally {
     9             this.underlyingConnection.setInGlobalTx(false);
    10         }
    11     }
    复制代码
    dispatchCommand执行sql
    复制代码
     1 private ResultSet dispatchCommand(String command) throws XAException {
     2         Statement stmt = null;
     3 
     4         try {
     5             if (this.logXaCommands) {
     6                 this.log.logDebug("Executing XA statement: " + command);
     7             }
     8 
     9             // TODO: Cache this for lifetime of XAConnection
    10             stmt = this.underlyingConnection.createStatement();
    11 
    12             stmt.execute(command);
    13 
    14             ResultSet rs = stmt.getResultSet();
    15 
    16             return rs;
    17         } catch (SQLException sqlEx) {
    18             throw mapXAExceptionFromSQLException(sqlEx);
    19         } finally {
    20             if (stmt != null) {
    21                 try {
    22                     stmt.close();
    23                 } catch (SQLException sqlEx) {
    24                 }
    25             }
    26         }
    27     }
    复制代码

    debug得到command:XA ROLLBACK 0x3139322e3136382e36302e31312e746d30303030313030303437,0x3139322e3136382e36302e31312e746d31,0x41544f4d
    至此,回滚完毕。

  • 相关阅读:
    【构建二叉树】01根据前序和中序序列构造二叉树【Construct Binary Tree from Preorder and Inorder Traversal】
    PHP 语言需要避免的 10 大误区
    极客编程必备的五大PHP开发应用
    你听说过PHP 的面向方面编程吗?
    8个开发必备的PHP功能
    写给系统管理员的25个PHP安全实践
    PHP输出缓冲控制- Output Control 函数应用详解
    创建高安全性PHP网站的几个实用要点
    简化PHP开发的10个工具
    PHP文件下载原理
  • 原文地址:https://www.cnblogs.com/shoshana-kong/p/14030125.html
Copyright © 2011-2022 走看看