zoukankan      html  css  js  c++  java
  • Spring源码解析--事务的实现原理和源码解析

    前言

    在上一篇文章 Spring源码解析--事务的详细讲解 中主要对Spring中事务的理论和实践作了详细整理,本文将透过现象看本质,从源码的角度入手分析下Spring中事务的实现原理及相关源码解析。

    一、事务的相关组件

    1.1、事务状态TransactionStatus

    TransactionStatus是表示事务状态的接口,继承之SavepointManager和Flushable接口,源码如下:

     1 public interface TransactionStatus extends SavepointManager, Flushable {
     2 
     3         /**
     4          * 判断事务是否是新事务
     5          */
     6         boolean isNewTransaction();
     7 
     8         /**
     9          * 判断事务是否含有保存点
    10          */
    11         boolean hasSavepoint();
    12 
    13         /**
    14          * 设置只可回滚
    15          * 如果设置了rollback-only为true,那么只要抛异常就肯定会回滚,即使将异常捕获了也不可以提交
    16          */
    17         void setRollbackOnly();
    18 
    19         /**
    20          * 是否标记为只可回滚
    21          * 如果设置了rollback-only为true,那么只要抛异常就肯定会回滚,即使将异常捕获了也不可以提交
    22          */
    23         boolean isRollbackOnly();
    24 
    25         /**
    26          * 刷新
    27          */
    28         @Override
    29         void flush();
    30 
    31         /**
    32          * 事务是否完成
    33          */
    34         boolean isCompleted();
    35 
    36     }

    另外由于TransactionStatus继承之SavepointManager接口,所以实现类同样需要实现SavepointManager接口定义的方法,定义方法如下:

     1 public interface SavepointManager {
     2         /**
     3          * 创建一个保存点
     4          */
     5         Object createSavepoint() throws TransactionException;
     6 
     7         /**
     8          * 回滚到保存点
     9          */
    10         void rollbackToSavepoint(Object savepoint) throws TransactionException;
    11 
    12         /**
    13          * 释放保存点
    14          */
    15         void releaseSavepoint(Object savepoint) throws TransactionException;
    16     }

    TransactionStatus接口的默认实现类为DefaultTransactionStatus类,构造方法及属性如下:

     1 /** 当前事务对象 */
     2         private final Object transaction;
     3 
     4         /** 是否是新事务*/
     5         private final boolean newTransaction;
     6 
     7         /** 是否同步*/
     8         private final boolean newSynchronization;
     9 
    10         /** 是否为只读事务*/
    11         private final boolean readOnly;
    12 
    13         private final boolean debug;
    14 
    15         private final Object suspendedResources;
    16 
    17 
    18         /**
    19          * 构造函数设置自身属性
    20          */
    21         public DefaultTransactionStatus(
    22                 Object transaction, boolean newTransaction, boolean newSynchronization,
    23                 boolean readOnly, boolean debug, Object suspendedResources) {
    24 
    25             this.transaction = transaction;
    26             this.newTransaction = newTransaction;
    27             this.newSynchronization = newSynchronization;
    28             this.readOnly = readOnly;
    29             this.debug = debug;
    30             this.suspendedResources = suspendedResources;
    31         }

    TransactionStatus持有一个事务的引用,并且包含当前事务的状态属性,所以每次创建一个事务时,都会被封装成一个TransactionStatus对象。而事务的创建则是通过事务管理器TransactionManager来管理的。

    1.2、事务管理器TransactionManager

    事务管理器是用于事务的管理,Spring中事务管理器通过接口PlatformTransactionManager定义,源码如下:

     1 public interface PlatformTransactionManager {
     2         /**
     3          * 获取事务状态
     4          */
     5         TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
     6 
     7         /**
     8          * 提交事务
     9          */
    10         void commit(TransactionStatus status) throws TransactionException;
    11 
    12         /**
    13          * 回滚事务
    14          */
    15         void rollback(TransactionStatus status) throws TransactionException;
    16     }

    只定义了三个方法,分别是获取事务、提交事务和回滚事务,而这三个方法的实现通过AbstractPlatformTransactionManager抽象类来实现

    1.2.1、获取事务状态方法getTransaction源码解析

     1 /**
     2      * 获取当前事务状态
     3      * */
     4     @Override
     5     public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
     6 
     7         /**
     8          * 1.调用子类方法获取事务
     9          * */
    10         Object transaction = doGetTransaction();
    11 
    12         if (definition == null) {
    13             /** 如果没有传入参数,则创建默认事务 DefaultTransactionDefinition*/
    14             definition = new DefaultTransactionDefinition();
    15         }
    16 
    17         /**
    18          * 2.如果当前线程是否已经存在事务
    19          * */
    20         if (isExistingTransaction(transaction)) {
    21             return handleExistingTransaction(definition, transaction, debugEnabled);
    22         }
    23 
    24         /******** 下面的逻辑都是当前线程不存在事务的情况 *******/
    25 
    26         /**
    27          * 3.判断当前事务是否已经超时
    28          * */
    29         if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    30             throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    31         }
    32 
    33         /**
    34          *
    35          * 4.判断当前事务的传播机制,如果是MANDATORY类型且当前线程不存在事务,则抛异常处理
    36          */
    37         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    38             throw new IllegalTransactionStateException(
    39                     "No existing transaction found for transaction marked with propagation 'mandatory'");
    40         }
    41         else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    42                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    43                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    44             SuspendedResourcesHolder suspendedResources = suspend(null);
    45             try {
    46                 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    47                 /**
    48                  * 5.创建一个新事务
    49                  * */
    50                 DefaultTransactionStatus status = newTransactionStatus(
    51                         definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    52                 /**
    53                  * 6.开始事务
    54                  * */
    55                 doBegin(transaction, definition);
    56                 /**
    57                  * 7.初始化和同步事务
    58                  * */
    59                 prepareSynchronization(status, definition);
    60                 return status;
    61             }
    62             catch (RuntimeException ex) {
    63                 resume(null, suspendedResources);
    64                 throw ex;
    65             }
    66             catch (Error err) {
    67                 resume(null, suspendedResources);
    68                 throw err;
    69             }
    70         }
    71         else {
    72             if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    73                 logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    74                         "isolation level will effectively be ignored: " + definition);
    75             }
    76             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    77             return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    78         }
    79     }

    1.2.2、提交事务方法commit源码解析

     1 @Override
     2     public final void commit(TransactionStatus status) throws TransactionException {
     3         /** 1.如果事务已经完成,抛异常*/
     4         if (status.isCompleted()) {
     5             throw new IllegalTransactionStateException(
     6                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
     7         }
     8 
     9         /**2. 如果事务需要回滚则执行 processRollback进行回滚*/
    10         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    11         if (defStatus.isLocalRollbackOnly()) {
    12             if (defStatus.isDebug()) {
    13                 logger.debug("Transactional code has requested rollback");
    14             }
    15             processRollback(defStatus);
    16             return;
    17         }
    18         if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
    19             if (defStatus.isDebug()) {
    20                 logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
    21             }
    22             /** 3. 如果事务需要回滚,则执行回滚操作*/
    23             processRollback(defStatus);
    24             if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
    25                 throw new UnexpectedRollbackException(
    26                         "Transaction rolled back because it has been marked as rollback-only");
    27             }
    28             return;
    29         }
    30         /** 4.不需要回滚最终则执行提交逻辑*/
    31         processCommit(defStatus);
    32     }

    最终执行回滚的逻辑是processCommit方法,源码如下:

     1 private void processCommit(DefaultTransactionStatus status) throws TransactionException {
     2         try {
     3             boolean beforeCompletionInvoked = false;
     4             try {
     5                 prepareForCommit(status);
     6                 triggerBeforeCommit(status);
     7                 triggerBeforeCompletion(status);
     8                 beforeCompletionInvoked = true;
     9                 boolean globalRollbackOnly = false;
    10                 if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
    11                     globalRollbackOnly = status.isGlobalRollbackOnly();
    12                 }
    13                 /**
    14                  * 1. 如果事务设置了保存点,则需要释放所有的保存点
    15                  * */
    16                 if (status.hasSavepoint()) {
    17                     if (status.isDebug()) {
    18                         logger.debug("Releasing transaction savepoint");
    19                     }
    20                     /** 释放事务的保存点*/
    21                     status.releaseHeldSavepoint();
    22                 }
    23                 else if (status.isNewTransaction()) {
    24                     if (status.isDebug()) {
    25                         logger.debug("Initiating transaction commit");
    26                     }
    27                     /** 2.提交事务 */
    28                     doCommit(status);
    29                 }
    30                 // Throw UnexpectedRollbackException if we have a global rollback-only
    31                 // marker but still didn't get a corresponding exception from commit.
    32                 if (globalRollbackOnly) {
    33                     throw new UnexpectedRollbackException(
    34                             "Transaction silently rolled back because it has been marked as rollback-only");
    35                 }
    36             }
    37             catch (UnexpectedRollbackException ex) {
    38                 // can only be caused by doCommit
    39                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    40                 throw ex;
    41             }
    42             catch (TransactionException ex) {
    43                 // can only be caused by doCommit
    44                 if (isRollbackOnCommitFailure()) {
    45                     doRollbackOnCommitException(status, ex);
    46                 }
    47                 else {
    48                     triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    49                 }
    50                 throw ex;
    51             }
    52             catch (RuntimeException ex) {
    53                 if (!beforeCompletionInvoked) {
    54                     triggerBeforeCompletion(status);
    55                 }
    56                 doRollbackOnCommitException(status, ex);
    57                 throw ex;
    58             }
    59             catch (Error err) {
    60                 if (!beforeCompletionInvoked) {
    61                     triggerBeforeCompletion(status);
    62                 }
    63                 doRollbackOnCommitException(status, err);
    64                 throw err;
    65             }
    66 
    67             // Trigger afterCommit callbacks, with an exception thrown there
    68             // propagated to callers but the transaction still considered as committed.
    69             try {
    70                 triggerAfterCommit(status);
    71             }
    72             finally {
    73                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
    74             }
    75 
    76         }
    77         finally {
    78             cleanupAfterCompletion(status);
    79         }
    80     }

    最终执行doCommit方法,而doCommit方法是一个抽象方法,实际执行是通过AbstractPlatformTransactionManager的子类去实现,常用的子类使用的是DataSourceTransactionManager类,实

     1 @Override
     2     protected void doCommit(DefaultTransactionStatus status) {
     3         DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
     4         /**
     5          * 1.获取事务的数据库连接
     6          * */
     7         Connection con = txObject.getConnectionHolder().getConnection();
     8         if (status.isDebug()) {
     9             logger.debug("Committing JDBC transaction on Connection [" + con + "]");
    10         }
    11         try {
    12             /**
    13              * 2.执行数据库连接的提交方法提交事务
    14              * */
    15             con.commit();
    16         }
    17         catch (SQLException ex) {
    18             throw new TransactionSystemException("Could not commit JDBC transaction", ex);
    19         }
    20     }

    1.2.3、回滚事务方法rollback源码解析

     1 @Override
     2     public final void rollback(TransactionStatus status) throws TransactionException {
     3         if (status.isCompleted()) {
     4             throw new IllegalTransactionStateException(
     5                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
     6         }
     7 
     8         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
     9    /** 执行processRollback方法回滚事务 */
    10         processRollback(defStatus);
    11     }

    processRollback方法和processCommit方法流程一样,最终调用doRollback方法,调用JDBC的数据库连接的rollback方法进行事务的回滚操作。

    二、编程式事务源码解析

    编程式事务逻辑比较清晰,因为事务是在业务代码中显示执行的,所以比较好理解,最常用的编程式事务采用的是事务模版,也就是TransactionTemplate模版类,TransactionTemplate实现了TransactionDefinition接口,主要作用是保存事务的全局配置,则通过事务管理的getTransaction(TransactionDefinition definition)方法获取到的事务属性都是一样的,使用方式如下:

     1 @Override
     2     public User addAndGet(User user, Long userId) {
     3         return transactionTemplate.execute(new TransactionCallback<User>() {
     4             @Override
     5             public User doInTransaction(TransactionStatus status) {
     6                 userMapper.addUser(user);
     7                 return userMapper.getUserById(userId);
     8             }
     9         });
    10     }

    所以编程式事务的实现主要是通过TransactionTemplate的execute方法来实现的,所以可以直接从execute方法入手开始分析事务的实现方式。

    1、TransactionTemplate的execute方法源码解析

     1 /**
     2      * 执行事务中的业务方法
     3      * */
     4     public <T> T execute(TransactionCallback<T> action) throws TransactionException {
     5         if (this.transactionManager instanceof CallbackPreferringPlatformTransactionManager) {
     6             return ((CallbackPreferringPlatformTransactionManager) this.transactionManager).execute(this, action);
     7         }
     8         else {
     9             /**
    10              * 1.获取当前事务的事务状态,其中transactionManager是配置的事务管理器TransactionManager实例
    11              * */
    12             TransactionStatus status = this.transactionManager.getTransaction(this);
    13             T result;
    14             try {
    15                 /**
    16                  * 2.执行具体的业务逻辑
    17                  * */
    18                 result = action.doInTransaction(status);
    19             }
    20             catch (RuntimeException ex) {
    21                 /** 业务抛RuntimeException异常则执行回滚*/
    22                 rollbackOnException(status, ex);
    23                 throw ex;
    24             }
    25             catch (Error err) {
    26                 /** 业务抛Error异常则执行回滚*/
    27                 rollbackOnException(status, err);
    28                 throw err;
    29             }
    30             catch (Throwable ex) {
    31                 /**
    32                  * 业务抛Throwable异常则执行回滚
    33                  * */
    34                 rollbackOnException(status, ex);
    35                 throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
    36             }
    37             /**
    38              * 如果没有抛异常则执行提交事务
    39              * */
    40             this.transactionManager.commit(status);
    41             return result;
    42         }
    43     }
    44 
    45     /**
    46      * 异常时回滚事务
    47      * */
    48     private void rollbackOnException(TransactionStatus status, Throwable ex) throws TransactionException {
    49         try {
    50             /**
    51              * 执行事务管理器的rollback方法回滚事务
    52              * */
    53             this.transactionManager.rollback(status);
    54         }
    55         catch (TransactionSystemException ex2) {
    56             logger.error("Application exception overridden by rollback exception", ex);
    57             ex2.initApplicationException(ex);
    58             throw ex2;
    59         }
    60         catch (RuntimeException ex2) {
    61             logger.error("Application exception overridden by rollback exception", ex);
    62             throw ex2;
    63         }
    64         catch (Error err) {
    65             logger.error("Application exception overridden by rollback error", ex);
    66             throw err;
    67         }
    68     }

    通过编程式方式的事务流程比较清晰,主要就是通过调用事务管理器transactionManager的方法来实现,首先是调用getTransaction方法获取当前事务,然后尝试执行业务逻辑并捕获error和运行时异常,如果抛异常则执行事务管理器的回滚方法进行事务回滚。

    如果没有抛异常则执行事务管理器的提交方法进行事务的提交。

    三、声明式事务源码解析

    声明式事务是通过Spring的AOP实现的,使用方法如下:

    1     @Transactional
    2     public User addAndGet(User user, Long userId) {
    3         userMapper.addUser(user);
    4         return userMapper.getUserById(userId);
    5     }

    熟悉AOP的实现流程的同学应该能够猜到,声明式事务的实现应该是拦截被@Transaction修饰的类或方法,然后通过创建增强的方式将事务的逻辑织入到业务代码中。

    而解析@Transactional注解的织入事务逻辑在MethodInterceptor的实现类TransactionInterceptor中,所以被@Transactional注解修饰的方法,执行业务逻辑时都会执行TransactionInterceptor的invoke方法,源码如下:

     1 @Override
     2     public Object invoke(final MethodInvocation invocation) throws Throwable {
     3         /**
     4          * 1.获取拦截器的目标类
     5          * */
     6         Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
     7 
     8         /**
     9          * 将事务织入业务逻辑中
    10          * */
    11         return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
    12             @Override
    13             public Object proceedWithInvocation() throws Throwable {
    14                 return invocation.proceed();
    15             }
    16         });
    17     }
     1 protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
     2             throws Throwable {
     3 
     4         /** 1.获取事务设置的属性 */
     5         final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
     6         /** 2.获取事务管理器 */
     7         final PlatformTransactionManager tm = determineTransactionManager(txAttr);
     8         final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
     9 
    10         if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    11             /** 3.创建TranscationInfo对象,相当于开启事务  */
    12             TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    13             Object retVal = null;
    14             try {
    15                 /** 4.执行被增强的方法 (业务逻辑)*/
    16                 retVal = invocation.proceedWithInvocation();
    17             }
    18             catch (Throwable ex) {
    19                 /** 5.异常之后回滚事务 */
    20                 completeTransactionAfterThrowing(txInfo, ex);
    21                 throw ex;
    22             }
    23             finally {
    24                 /** 6.finally中清除事务相关信息  */
    25                 cleanupTransactionInfo(txInfo);
    26             }
    27             /** 7. 业务逻辑执行完成之后提交事务 */
    28             commitTransactionAfterReturning(txInfo);
    29             return retVal;
    30         }
    31     }

    声明式事务实现步骤总结如下:

    1、将事务封装成AOP增强,拦截所有被@Transactional注解修饰的方法或类;或者通过标签<tx>配置的事务;将增强封装成方法拦截器TransactionInterceptor

    2、添加事务的方法执行时,会执行增强拦截器TransactionInterceptor的invoke方法

    3、先从配置中获取到事务的配置(传播机制、隔离机制、超时时间等)

    4、从Spring容器中获取事务管理器PlatformTransactionManager的bean

    5、调用事务管理器的getTransaction方法开启新事务

    6、执行被拦截的业务逻辑,并进行try/catch捕获异常

    7、如果捕获到异常,则执行事务管理器的rollback方法回滚事务

    8、如果没有捕获异常,则执行事务管理器的commit方法提交事务

     

    Tips:

    不管是编程式事务还是声明式事务的实现,实际都是通过事务管理器来实现的,都是先通过事务管理器获取当前线程的事务,然后执行业务逻辑,如果异常需要回滚则执行事务管理器的回滚方法进行回滚,如果业务执行成功则执行事务管理器的提交方法提交事务。

    而事务管理器的获取事务、回滚事务、提交事务又都是调用底层JDBC的数据库连接的相对应的方法来进行实现的,只不过是在数据库连接的获取事务、提交和回滚事务的方法之上进行了封装,使得业务代码使用事务时不需要关系底层JDBC的API 

  • 相关阅读:
    Google Kubernetes设计文档之服务篇-转
    基于kubernetes构建Docker集群管理详解-转
    Pass云Docker介绍
    Spring <context:annotation-config/> 解说
    webapp开发需要注意的浏览器内核知识
    koala编译scss文件时不支持中文字体的解决方案
    CSS3硬件加速需要注意的事项
    ios客户端快速滚动和回弹效果的实现
    mui禁止滚动条和禁止滚动
    苹果端禁用左右滑动屏幕返回上级页面
  • 原文地址:https://www.cnblogs.com/jackion5/p/13466937.html
Copyright © 2011-2022 走看看