zoukankan      html  css  js  c++  java
  • Spring源码深度解析之事务

    Spring源码深度解析之事务

    目录

    一、JDBC方式下的事务使用示例

      (1)创建数据表结构

      (2)创建对应数据表的PO

      (3)创建表和实体之间的映射

      (4)创建数据操作接口

      (5)创建数据操作接口实现类

      (6)创建Spring配置文件

      (7)测试

    二、事务自定义标签

    (一)注册InfrastructureAdvisorAutoProxyCreator

    (二)获取对应class/method的增强器

      (1)寻找候选增强器

      (2)候选增强器中寻找到匹配项

    三、事务增强器

    (一)创建事务

      (1)获取事务

      (2)处理已经存在的事务

      (3)准备事务信息

    (二)回滚处理

      (1)回滚条件

      (2)回滚处理

      (3)回滚后的信息清除

    (三)事务提交

     

          Spring声明式事务让我们从复杂的事务处理中得到解脱,使我们再也不需要去处理获得连接、关闭连接、事务提交和回滚等操作。再也不需要在与事务相关的方法中处理大量的try…catch…finally代码。Spring中事务的使用虽然已经相对简单得多,但是,还是有很多的使用及配置规划,有兴趣的读者可以自己查阅相关资料进行深入的研究,这里只列举出最常用的使用方法。

          同样,我们还是以最简单的示例来进行直观的介绍。

    一、JDBC方式下事务使用示例

    (1)创建数据表结构

    1 CREATE TABLE 'user'(
    2     'id' int(11) NOT NULL auto_increment,
    3     'name' varchar(255) default null,
    4     'age' int(11) default null,
    5     'sex' varchar(255) default null,
    6     PRIMARY KEY ('id')
    7 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;

    (2)创建对应数据表的PO

    1 public class User {
    2     private int id;
    3     private String name;
    4     private int age;
    5     private String sex;
    6     //省略set/get方法
    7 }

    (3)创建表和实体之间的映射

    1 public class UserRowMapper implements RowMapper {
    2     @Override
    3     public Object mapRow(ResultSet set, int index) throws SQLException {
    4         User person = new User(set.getInt('id'), set.getString("name"), set.getInt("age"), set.getString("sex"));
    5         return person;
    6     }
    7 }

    (4)创建数据操作接口

    1 @Transactional(propagation-Propagation.REQUIRED)
    2 public interface UserService {
    3     public void save(User user) throws Exception;
    4 }

    (5)创建数据操作接口实现类

     1 public class UserServiceImpl implements UserService {
     2     private JdbcTemplate jdbcTemplate;
     3 
     4     //设置数据源
     5     public void setDataSource(DataSource dataSource) {
     6         this.jdbcTemplate = new JdbcTemplate(dataSource);
     7     }
     8 
     9     public void save(User user) throws Exception {
    10         jdbcTemplate.update("insert into user(name, age, sex) value(?, ?, ?)",
    11                 new Object[] {user.getName(), usr.getAge(), user.getSex()},
    12                 new int[] {java.sql.Types.VARCHAR, java.sql.Type.INTEGER, java.sql.Types.VARCHAR});
    13 
    14         //事务测试,加上这句代码则数据不会保存到数据库中
    15         throw new RuntimeException("aa");
    16     }
    17 
    18 }

    (6)创建Spring配置文件

     1 <?xml version="1.0" encoding="UTF-8" ?>
     2 <beans xmlns="http://www.springframework.org/schema/beans"
     3        xmlns:xsi="http://w3.org/2001/XMLSchema-instance"
     4        xmlns:tx="http://www.springframework.org/schema/tx"
     5        xmlns:context="http://www.springframework.org/schema/context"
     6        xsi:schemaLocation="http://www.springframework.org/schema/beans
     7                             http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
     8                             http://www.springframework.org/schema/context
     9                             http://www.springframework.org/schema/context/spring-context-2.5.xsd
    10                             http://www.springframework.org/schema/tx
    11                             http://www.springframework.org/schema/tx/spring-tx-2.5.xsd">
    12 
    13     <tx:annotation-driven transaction-manager="transactionManager"/>
    14 
    15     <bean id="transactionManager" class="org.springframe.jdbc.datasource.DataSourceTransactionManager">
    16         <property name="dataSource" ref="dataSource"/>
    17     </bean>
    18 
    19     //配置数据源
    20     <bean id="dataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    21         <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    22         <property name="uri" value="jdbc:mysql://localhost:3306/lexueba"/>
    23         <property name="username" value="root"/>
    24         <property name="password" value="haojia0421xixi"/>
    25         <!--连接池启动时的初始值-->
    26         <property name="initialSize" value="1"/>
    27         <!--连接池的最大值-->
    28         <property name="maxActive" value="300"/>
    29         <!--最大空闲值,当经过一个高峰时间后,连接池可以慢慢将已经用不到的连接慢慢释放一部分,一直减少到maxIdle为止-->
    30         <property name="maxIdle" value="2"/>
    31         <!--最小空闲值,当空闲的连接数少于阀值时,连接池就会预申请去一些连接,以免洪峰来时来不及申请-->
    32         <property name="minIdle" value="1"/>
    33     </bean>
    34 
    35     <!--配置业务 bean:PersonServiceBean -->
    36     <bean id="userService" class="service.UserServiceImple">
    37         <!--向属性dataSource注入数据源-->
    38         <property name="dataSource" ref="dataSource"></property>
    39     </bean>
    40 
    41 </beans>

    (7)测试

     1 public static void main(String[] args) throws Exception {
     2     ApplicationContext act = new ClassPathXmlApplicationContext("bean.xml");
     3 
     4     UserService userService = (UserService) act.getBean("userService");
     5     User user = new User();
     6     user.setName("张三CCC");
     7     user.setAge(20);
     8     user.setSex("男");
     9     //保存一条记录
    10     userService.save(user);
    11 }

          上面测测试示例中,UserServiceImpl类对接口UserService中的save函数的实现最后加入了一句抛出异常的代码:throw new RuntimeException(“aa”)。当注释掉这段代码执行测试类,那么会看到数据被成功的保存到了数据库中,但是如果加入这段代码再次运行测试类,发现此处的操作并不会将数据保存到数据库中。

          注意:默认情况下,Spring中的事务处理只对RuntimeException方法进行回滚,所以,如果此处将RuntimeException替换成普通的Exception不会产生回滚效果。

     

    二、事务自定义标签

      对于Spring中事务功能的代码分析。我们首先从配置文件开始入手,在配置文件中有这样一个配置<tx:annotation-driven/>。可以说此处配置是事务的开关,如果没有此处的配置,那么Spring中将不存在事务的功能。那么我们就从这个配置开始分析。

      根据之前的分析。我们因此可以判断,在自定义标签中的解析过程中一定是做了一些辅助性的操作,于是我们先从自定义的标签入手进行分析。

      对于关键字annotation-driven,在TxNamespaceHandler类(package org.springframework.transaction.config)中的init()方法中进行了处理:

    1     public void init() {
    2         registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
    3         registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
    4         registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
    5     }

      根据自定义标签的使用规则及上面的代码,可以知道,在遇到诸如<tx:annotation-driven为开头的配置后,Spring都会使用AnnotationDrivenBeanDefinitionParser类(package org.springframework.transaction.config)的parse()方法进行解析。

     1     public BeanDefinition parse(Element element, ParserContext parserContext) {
     2         registerTransactionalEventListenerFactory(parserContext);
     3         String mode = element.getAttribute("mode");
     4         if ("aspectj".equals(mode)) {
     5             // mode="aspectj"
     6             registerTransactionAspect(element, parserContext);
     7             if (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader())) {
     8                 registerJtaTransactionAspect(element, parserContext);
     9             }
    10         }
    11         else {
    12             // mode="proxy"
    13             AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
    14         }
    15         return null;
    16     }

      在解析中存在对于mode属性的判断,根据代码,如果我们需要使用AspectJ的方式进行事务切入(Spring中的事务是以AOP为基础的),那么可以使用这样的配置:

    <tx:annotation-driven transaction-manager="transactionManager"/ mode="aspectJ">
    (一)注册InfrastructureAdvisorAutoProxyCreator

          我们以默认配置为例子进行分析,进入AopAutoProxyConfigurer类(AnnotationDrivenBeanDefinitionParser类的内部类)的configureAutoProxyCreator函数:

     1         public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
     2             AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
     3 
     4             String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
     5             if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
     6                 Object eleSource = parserContext.extractSource(element);
     7 
     8                 // Create the TransactionAttributeSource definition.
     9                 //创建TransactionAttributeSource的bean
    10                 RootBeanDefinition sourceDef = new RootBeanDefinition(
    11                         "org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
    12                 sourceDef.setSource(eleSource);
    13                 sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    14                 //注册bean,并使用Spring中的定义规则生成beanname
    15                 String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
    16 
    17                 // Create the TransactionInterceptor definition.
    18                 //创建TransactionInterceptor的bean
    19                 RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
    20                 interceptorDef.setSource(eleSource);
    21                 interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    22                 registerTransactionManager(element, interceptorDef);
    23                 interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
    24                 String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
    25 
    26                 // Create the TransactionAttributeSourceAdvisor definition.
    27                 //创建TransactionAttributeSourceAdvisor的bean
    28                 RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
    29                 advisorDef.setSource(eleSource);
    30                 advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    31                 //将sourceName的bean注入advisorDef的transactionAttributeSource属性中
    32                 advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
    33                 //将interceptorName的bean注入advisorDef的adviceBeanName属性中
    34                 advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
    35                 //如果配置了order属性,则加入到bean中
    36                 if (element.hasAttribute("order")) {
    37                     advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
    38                 }
    39                 parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
    40 
    41                 //创建CompositeComponentDefinition
    42                 CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
    43                 compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
    44                 compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
    45                 compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
    46                 parserContext.registerComponent(compositeDef);
    47             }
    48         }

      上面的代码注册了代理类及三个bean,很多读者会直接略过,认为只是注册三个bean而已,确实,这里只注册量三个bean,但是这三个bean支撑了整个事务的功能,那么这三个bean是怎么组织起来的呢?

          首先,其中的两个bean被注册到了一个名为advisorDef的bean中,advisorDef使用BeanFactoryTransactionAttributeSourceAdvisor作为其class属性。也就是说BeanFactoryTransactionAttributeSourceAdvisor代表着当前bean,具体代码如下:

    advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);

          那么如此组装的目的是什么呢?先留下一个悬念,接着分析代码。上面configureAutoProxyCreator函数中的第一句貌似很简单但却是很重要的代码:

    AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);

          进入这个函数,AopNamespaceUtils类(package org.springframework.aop.config)的registerAutoProxyCreatorIfNecessary函数:

    1     public static void registerAutoProxyCreatorIfNecessary(
    2             ParserContext parserContext, Element sourceElement) {
    3 
    4         BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
    5                 parserContext.getRegistry(), parserContext.extractSource(sourceElement));
    6         useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);
    7         registerComponentIfNecessary(beanDefinition, parserContext);
    8     }

      上面函数又调用了AopConfigUtils类(package org.springframework.aop.config)的registerAutoProxyCreatorIfNecessary函数:

    1     public static BeanDefinition registerAutoProxyCreatorIfNecessary(
    2             BeanDefinitionRegistry registry, @Nullable Object source) {
    3 
    4         return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
    5     }

          对于解析来的额代码流程AOP中已经有所分析,上面两个函数主要的目的是注册InfrastructureAdvisorAutoProxyCreator类型(package org.springframework.aop.framework.autoproxy)的bean,那么注册这个类的目的是什么呢?

      查看该类的层次结构,可以看到InfrastructureAdvisorAutoProxyCreator继承自AbstractAdvisorAutoProxyCreator继承自AbstractAutoProxyCreator继承自SmartInstantiationAwareBeanPostProcessor继承自InstantiationAwareBeanPostProcessor(含postProcessAfterInitialization方法),也就是说在Spring中,所有bean实例化时Spring都会保证调用其postProcessAfterInitialization方法。其实现是在父类AbstractAutoProxyCreator类中实现。

          以之前的示例为例,当实例化userService的bean时便会调用此方法,AbstractAutoProxyCreator类(packageorg.springframework.aop.framework.autoproxy)的postProcessAfterInitialization方法:

     1     public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
     2         if (bean != null) {
     3             //根据给定的bean的class和name构建出key,格式:beanClassName_beanName
     4             Object cacheKey = getCacheKey(bean.getClass(), beanName);
     5             if (this.earlyProxyReferences.remove(cacheKey) != bean) {
     6                 //一个非常核心的方法:wrapIfNecessary(),如果它适合被代理,则需要封装指定的bean。
     7                 return wrapIfNecessary(bean, beanName, cacheKey);
     8             }
     9         }
    10         return bean;
    11     }

      这里实现的主要目的是对指定的bean进行封装,当然首先要确定是否需要封装,检测以及封装的的工作都委托给了wrapIfNecessary函数进行,AbstractAutoProxyCreator类的wrapIfNecessary方法:

     1     protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
     2         //如果已经处理过
     3         if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
     4             return bean;
     5         }
     6         //这个bean无需增强
     7         if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
     8             return bean;
     9         }
    10         //判断给定的bean是否是一个基础设施类,基础设施类不应代理,或者配置了指定bean不需要代理。
    11         //所谓InfrastructureClass就是指Advice/PointCut/Advisor等接口的实现类。
    12         if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
    13             this.advisedBeans.put(cacheKey, Boolean.FALSE);
    14             return bean;
    15         }
    16 
    17         // 如果存在增强方法则创建代理
    18         //获取这个bean的advice
    19         Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
    20         //如果获取到了增强则需要针对增强创建代理
    21         if (specificInterceptors != DO_NOT_PROXY) {
    22             this.advisedBeans.put(cacheKey, Boolean.TRUE);
    23             //创建代理
    24             Object proxy = createProxy(
    25                     bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
    26             this.proxyTypes.put(cacheKey, proxy.getClass());
    27             return proxy;
    28         }
    29 
    30         this.advisedBeans.put(cacheKey, Boolean.FALSE);
    31         return bean;
    32     }

          WrapIfNecessary函数功能实现起来很复杂,但是逻辑上理解起来还是相对简单的,在wrapIfNecessary函数中主要的工作如下:

          ①找出指定bean对应的增强器。

          ②根据找出的增强器创建代理。

          听起来似乎简单的逻辑,Spring中又做了哪些复杂的工作呢?对于创建代理的部分,通过之前的分析大家已经很熟悉了,但是对于增强器的获取,Spring又是怎么做的呢?

    (二)获取对应的class/method的增强器

          获取指定bean对应的增强器,其中包含两个关键字:增强器和对应。也就是说在getAdvicesAndAdvisorsForBean函数中,不但要找出增强器,而且还需要判断增强器是否满足要求。AbstractAutoProxyCreator类的wrapIfNecessary方法中调用了getAdvicesAndAdvisorsForBean,AbstractAutoProxyCreator类只对该方法进行定义,真正实现在其子类AbstractAdvisorAutoProxyCreator(package org.springframework.aop.framework.autoproxy)中:

    1     protected Object[] getAdvicesAndAdvisorsForBean(
    2             Class<?> beanClass, String beanName, @Nullable TargetSource targetSource) {
    3 
    4         List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName);
    5         if (advisors.isEmpty()) {
    6             return DO_NOT_PROXY;
    7         }
    8         return advisors.toArray();
    9     }

       上述函数调用了该类中的findEligibleAdvisors函数:

    1     protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {
    2         List<Advisor> candidateAdvisors = findCandidateAdvisors();
    3         List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);
    4         extendAdvisors(eligibleAdvisors);
    5         if (!eligibleAdvisors.isEmpty()) {
    6             eligibleAdvisors = sortAdvisors(eligibleAdvisors);
    7         }
    8         return eligibleAdvisors;
    9     }

          其实我们也渐渐地体会到Spring中代码的优秀,即使是一个很复杂的逻辑,在Spring中也会被拆分为若干个小的逻辑,然后在每个函数中实现,使得每个函数的逻辑简单到我们能快速的理解,而不会像有些人开发的那样,将一大堆的逻辑都罗列在一个函数中,给后期维护文员造成巨大的困扰。

          同样,通过上面的函数,Spring又将任务进行了拆分,分成了获取所有增强器与增强器是否匹配两个功能点。

    (1)寻找候选增强器

          在findCandidateAdvisors函数中完成的就是获取增强器的功能,AbstractAdvisorAutoProxyCreator类的findCandidateAdvisors函数:

    1     protected List<Advisor> findCandidateAdvisors() {
    2         Assert.state(this.advisorRetrievalHelper != null, "No BeanFactoryAdvisorRetrievalHelper available");
    3         return this.advisorRetrievalHelper.findAdvisorBeans();
    4     }

      上面函数调用了advisorRetrievalHelper类(package org.springframework.aop.framework.autoproxy)的findAdvisorBeans函数,进入函数:

     1     public List<Advisor> findAdvisorBeans() {
     2         //cachedAdvisorBeanNames是advisor名称的缓存
     3         String[] advisorNames = this.cachedAdvisorBeanNames;
     4         //如果cachedAdvisorBeanNames为空,则到容器中查找,并设置缓存,后续直接使用缓存即可
     5         if (advisorNames == null) {
     6             // Do not initialize FactoryBeans here: We need to leave all regular beans
     7             // uninitialized to let the auto-proxy creator apply to them!
     8             //从容器中查找Advisor类型的bean的名称
     9             advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
    10                     this.beanFactory, Advisor.class, true, false);
    11             this.cachedAdvisorBeanNames = advisorNames;
    12         }
    13         if (advisorNames.length == 0) {
    14             return new ArrayList<>();
    15         }
    16 
    17         List<Advisor> advisors = new ArrayList<>();
    18         //遍历advisorNames
    19         for (String name : advisorNames) {
    20             if (isEligibleBean(name)) {
    21                 //忽略郑州创建中的advisor bean
    22                 if (this.beanFactory.isCurrentlyInCreation(name)) {
    23                     if (logger.isTraceEnabled()) {
    24                         logger.trace("Skipping currently created advisor '" + name + "'");
    25                     }
    26                 }
    27                 else {
    28                     try {
    29                         //调用getBean方法从容器中获取名称为name的bean,并将bean添加到advisors中
    30                         advisors.add(this.beanFactory.getBean(name, Advisor.class));
    31                     }
    32                     catch (BeanCreationException ex) {
    33                         Throwable rootCause = ex.getMostSpecificCause();
    34                         if (rootCause instanceof BeanCurrentlyInCreationException) {
    35                             BeanCreationException bce = (BeanCreationException) rootCause;
    36                             String bceBeanName = bce.getBeanName();
    37                             if (bceBeanName != null && this.beanFactory.isCurrentlyInCreation(bceBeanName)) {
    38                                 if (logger.isTraceEnabled()) {
    39                                     logger.trace("Skipping advisor '" + name +
    40                                             "' with dependency on currently created bean: " + ex.getMessage());
    41                                 }
    42                                 // Ignore: indicates a reference back to the bean we're trying to advise.
    43                                 // We want to find advisors other than the currently created bean itself.
    44                                 continue;
    45                             }
    46                         }
    47                         throw ex;
    48                     }
    49                 }
    50             }
    51         }
    52         return advisors;
    53     }

          对于上面的函数,你看懂其中的奥秘了吗?首先是通过BeanFactoryUtils类提供的工具方法获取所有对应的Advisor.class的类。

    1 //从容器中查找Advisor类型的bean的名称
    2 advisorNames = BeanFactoryUtils.beanNamesForTypeIncludingAncestors(
    3       this.beanFactory, Advisor.class, true, false);

          进入BeanFactoryUtils类(package org.springframework.beans.factory)的beanNamesForTypeIncludingAncestors函数:

     1     public static String[] beanNamesForTypeIncludingAncestors(
     2             ListableBeanFactory lbf, ResolvableType type, boolean includeNonSingletons, boolean allowEagerInit) {
     3 
     4         Assert.notNull(lbf, "ListableBeanFactory must not be null");
     5         String[] result = lbf.getBeanNamesForType(type, includeNonSingletons, allowEagerInit);
     6         if (lbf instanceof HierarchicalBeanFactory) {
     7             HierarchicalBeanFactory hbf = (HierarchicalBeanFactory) lbf;
     8             if (hbf.getParentBeanFactory() instanceof ListableBeanFactory) {
     9                 String[] parentResult = beanNamesForTypeIncludingAncestors(
    10                         (ListableBeanFactory) hbf.getParentBeanFactory(), type, includeNonSingletons, allowEagerInit);
    11                 result = mergeNamesWithParent(result, parentResult, hbf);
    12             }
    13         }
    14         return result;
    15     }

          从上面代码可以看出,实际调用的是ListableBeanFactory类提供的getBeanNamesForType函数。

          而当我们知道增强器在容器中beanName时,获取增强器已经不是问题了,在BeanFactory中提供了这样的方法,可以帮助我们快速的定位对应的bean实例。

    <T> T getBean(String name, Class<T> requiredType) throws BeansException;

      或许你已经忘了之前留下的悬念,在我们讲解自定义标签时曾经注册了一个类型为BeanFactoryTransactionAttributeSourceAdvisor的bean,(package org.springframework.transaction.config 包中的AnnotationDrivenBeanDefinitionParser类的内部类AopAutoProxyConfigurer类中),而在此bean总我们又注入另外两个Bean,那么此时这个Bean就会被开始使用了。因为BeanFactoryTransactionAttributeSourceAdvisor同样也实现了Advisor接口,那么在获取所有增强器是自然也会将此bean提取出来并随着其他增强器一起在 后续的步骤中被织入代理。

    (2)候选增强器中寻找到匹配项

      当找出对应的增强后,接下来的任务就是看这些增强是否与对应的class匹配了,当然不只是class,class内部的方法如果匹配也可以通过验证,AbstractAdvisorAutoProxyCreator类(packageorg.springframework.aop.framework.autoproxy)的findAdvisorsThatCanApply函数:

     1     protected List<Advisor> findAdvisorsThatCanApply(
     2             List<Advisor> candidateAdvisors, Class<?> beanClass, String beanName) {
     3 
     4         ProxyCreationContext.setCurrentProxiedBeanName(beanName);
     5         try {
     6             return AopUtils.findAdvisorsThatCanApply(candidateAdvisors, beanClass);
     7         }
     8         finally {
     9             ProxyCreationContext.setCurrentProxiedBeanName(null);
    10         }
    11     }
      其中又调用了AopUtils类(package org.springframework.aop.support)的findAdvisorsThatCanApply函数:
     1     public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) {
     2         if (candidateAdvisors.isEmpty()) {
     3             return candidateAdvisors;
     4         }
     5         List<Advisor> eligibleAdvisors = new ArrayList<>();
     6         for (Advisor candidate : candidateAdvisors) {
     7             //刷选IntroductionAdvisor引介类型的通知器
     8             if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) {
     9                 eligibleAdvisors.add(candidate);
    10             }
    11         }
    12         boolean hasIntroductions = !eligibleAdvisors.isEmpty();
    13         for (Advisor candidate : candidateAdvisors) {
    14             if (candidate instanceof IntroductionAdvisor) {
    15                 //引介增强已经处理
    16                 continue;
    17             }
    18             //刷选普通类型的通知器
    19             if (canApply(candidate, clazz, hasIntroductions)) {
    20                 eligibleAdvisors.add(candidate);
    21             }
    22         }
    23         return eligibleAdvisors;
    24     }
      上面函数调用了该类中的canApply函数:
     1     public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
     2         if (advisor instanceof IntroductionAdvisor) {
     3             /*
     4              * 从通知器中获取类型过滤器 ClassFilter,并调用 matchers 方法进行匹配。
     5              * ClassFilter 接口的实现类 AspectJExpressionPointcut 为例,该类的
     6              * 匹配工作由 AspectJ 表达式解析器负责,具体匹配细节这个就没法分析了,我
     7              * AspectJ 表达式的工作流程不是很熟
     8              */
     9             return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
    10         }
    11         else if (advisor instanceof PointcutAdvisor) {
    12             PointcutAdvisor pca = (PointcutAdvisor) advisor;
    13             // 对于普通类型的通知器,这里继续调用重载方法进行筛选
    14             return canApply(pca.getPointcut(), targetClass, hasIntroductions);
    15         }
    16         else {
    17             // It doesn't have a pointcut so we assume it applies.
    18             return true;
    19         }
    20     }

      当前我们分析的是对于UserService是否适用于此增强方法,那么当前的advisor就是之前查找出来的类型为BeanFactoryTransactionAttributeSourceAdvisor的bean实例,而通过类的层次几个我们又知道:BeanFactoryTransactionAttributeSourceAdvisor间接实现了PointAdvisor(BeanFactoryTransactionAttributeSourceAdvisor继承自AbstractBeanFactoryPointcutAdvisor继承自AbstractPointcutAdvisor继承自PointcutAdvisor)。因此,在canApply函数中的第二个if判断时就会通过判断,会将BeanFactoryTransactionAttributeSourceAdvisor中的getPointcut()方法返回值作为参数继续调用canApply方法,而getPointcut ()方法返回的是TransactionAttributeSourcePointcut类型的实例。对于transactionAttributeSource这个属性大家还有印象吗?这是在解析自定义标签时注入进去的。BeanFactoryTransactionAttributeSourceAdvisor类中代码:

    1 private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
    2     @Override 
    3     @Nullable 
    4     protected TransactionAttributeSource getTransactionAttributeSource() {
    5         return transactionAttributeSource;
    6     }
    7 };

      那么使用TransactionAttributeSourcePointcut类型的实例作为函数参数继续跟踪canApply,AopUtils类(package org.springframework.aop.support)的CanApply函数:

     1     public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
     2         Assert.notNull(pc, "Pointcut must not be null");
     3         //使用 ClassFilter 匹配 class
     4         if (!pc.getClassFilter().matches(targetClass)) {
     5             return false;
     6         }
     7 
     8         MethodMatcher methodMatcher = pc.getMethodMatcher();
     9         if (methodMatcher == MethodMatcher.TRUE) {
    10             // No need to iterate the methods if we're matching any method anyway...
    11             return true;
    12         }
    13 
    14         IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
    15         if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
    16             introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
    17         }
    18 
    19 
    20         /*
    21          * 查找当前类及其父类(以及父类的父类等等)所实现的接口,由于接口中的方法是 public,
    22          * 所以当前类可以继承其父类,和父类的父类中所有的接口方法
    23          */
    24         Set<Class<?>> classes = new LinkedHashSet<>();
    25         if (!Proxy.isProxyClass(targetClass)) {
    26             classes.add(ClassUtils.getUserClass(targetClass));
    27         }
    28         classes.addAll(ClassUtils.getAllInterfacesForClassAsSet(targetClass));
    29 
    30         for (Class<?> clazz : classes) {
    31             // 获取当前类的方法列表,包括从父类中继承的方法
    32             Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
    33             for (Method method : methods) {
    34                 // 使用 methodMatcher 匹配方法,匹配成功即可立即返回
    35                 if (introductionAwareMethodMatcher != null ?
    36                         introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
    37                         methodMatcher.matches(method, targetClass)) {
    38                     return true;
    39                 }
    40             }
    41         }
    42 
    43         return false;
    44     }

          通过上面函数大致可以理清大体脉络,首先获取对应类的所有接口并连同类本身一起遍历,遍历过程中又对类中的方法再次遍历,一旦匹配成功便认为这个类适用于当前的增强器。

          到这里我们不禁会有疑问,对于事务的配置不仅仅局限于在函数上配置,我们都知道,在类或接口上的配置可以延续到类中的每个函数。那么,如果针对每个函数进行检测,在类本身上配置的事务属性岂不是检测不到了吗?带着这个疑问,我们继续探求matcher方法。

          做匹配的时候methodMatcher.matches(method, targetClass)会使用TransactionAttributeSourcePointcut类(package org.springframework.transaction.interceptor)的matches方法。

    1     public boolean matches(Method method, Class<?> targetClass) {
    2         //自定义标签解析时注入
    3         TransactionAttributeSource tas = getTransactionAttributeSource();
    4         return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
    5     }

      此时的tas表示AnnotationTransactionAttributeSource类型,而AnnotationTransactionAttributeSource类型的getTransactionAttribute方法如下(该方法在其父类AbstractFallbackTransactionAttributeSource中)。AbstractFallbackTransactionAttributeSource类(package org.springframework.transaction.interceptor)的getTransactionAttribute方法

     1     public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
     2         if (method.getDeclaringClass() == Object.class) {
     3             return null;
     4         }
     5 
     6         // First, see if we have a cached value.
     7         Object cacheKey = getCacheKey(method, targetClass);
     8         TransactionAttribute cached = this.attributeCache.get(cacheKey);
     9         if (cached != null) {
    10             // Value will either be canonical value indicating there is no transaction attribute,
    11             // or an actual transaction attribute.
    12             if (cached == NULL_TRANSACTION_ATTRIBUTE) {
    13                 return null;
    14             }
    15             else {
    16                 return cached;
    17             }
    18         }
    19         else {
    20             // We need to work it out.
    21             TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
    22             // Put it in the cache.
    23             if (txAttr == null) {
    24                 this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
    25             }
    26             else {
    27                 String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
    28                 if (txAttr instanceof DefaultTransactionAttribute) {
    29                     ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
    30                 }
    31                 if (logger.isTraceEnabled()) {
    32                     logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
    33                 }
    34                 this.attributeCache.put(cacheKey, txAttr);
    35             }
    36             return txAttr;
    37         }
    38     }

          很遗憾,在getTransactionAttribute函数中并没有找到我们想要的代码,这里是指常规的一贯的套路。尝试从缓存加载,如果对应信息没有被缓存的话,工作又委托给了computeTransactionAttribute函数,在computeTransactionAttribute函数中我们看到了事务标签的提取过程。AbstractFallbackTransactionAttributeSource类的computeTransactionAttribute方法:

     1     protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
     2         // Don't allow no-public methods as required.
     3         if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
     4             return null;
     5         }
     6 
     7         // The method may be on an interface, but we need attributes from the target class.
     8         // If the target class is null, the method will be unchanged.
     9         //method代表接口中的方法,specificMethod代表类中的方法
    10         Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
    11 
    12         // First try is the method in the target class.
    13         //查看方法中是否存在事务声明
    14         TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
    15         if (txAttr != null) {
    16             return txAttr;
    17         }
    18 
    19         // Second try is the transaction attribute on the target class.
    20         //查看方法所在的类是否存在事务声明
    21         txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
    22         if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
    23             return txAttr;
    24         }
    25 
    26         //如果存在接口,则到接口中去寻找
    27         if (specificMethod != method) {
    28             // Fallback is to look at the original method.
    29             //查找接口方法
    30             txAttr = findTransactionAttribute(method);
    31             if (txAttr != null) {
    32                 return txAttr;
    33             }
    34             // Last fallback is the class of the original method.
    35             //到接口的类中去寻找
    36             txAttr = findTransactionAttribute(method.getDeclaringClass());
    37             if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
    38                 return txAttr;
    39             }
    40         }
    41 
    42         return null;
    43     }

          对于事物属性的获取规则相信大家已经很清楚,如果方法中存在事务属性,则使用方法上的属性,否则使用方法所在的类上的属性,如果方法所在类的属性上还是没有搜寻到对应的事务属性,那么再搜寻接口中的方法,再没有的话,最后尝试搜寻接口的类上面的声明。对于函数computeTransactionAttribute中的逻辑与我们所认识的规则并无差别,但是上面函数中并没有真正的去做搜寻事务属性的逻辑,而是搭建了一个执行框架,将搜寻事务属性的任务委托给了findTransactionAttribute方法去执行。AnnotationTransactionAttributeSource类(package org.springframework.transaction.annotation)的findTransactionAttribute方法:

    1     protected TransactionAttribute findTransactionAttribute(Method method) {
    2         return determineTransactionAttribute(method);
    3     }

       上面函数调用了本类中的determineTransactionAttribute方法:

    1     protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
    2         for (TransactionAnnotationParser parser : this.annotationParsers) {
    3             TransactionAttribute attr = parser.parseTransactionAnnotation(element);
    4             if (attr != null) {
    5                 return attr;
    6             }
    7         }
    8         return null;
    9     }

          this.annotationParsers是在当前类AnnotationTransactionAttributeSource初始化的时候初始化的,其中的值被加入了SpringTransactionAnnotationParser,也就是当进行属性获取的时候其实是使用SpringTransactionAnnotationParser类(package org.springframework.transaction.annotation)的parseTransactionAnnotation方法进行解析的:

     1     public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
     2         AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
     3                 element, Transactional.class, false, false);
     4         if (attributes != null) {
     5             return parseTransactionAnnotation(attributes);
     6         }
     7         else {
     8             return null;
     9         }
    10     }

      至此,我们终于看到了想看到的获取注解标记的代码。首先会判断当前的类是否含有Transactional注解,这是事务属性的基础,当然如果有的话会继续调用SpringTransactionAnnotationParser类的parseTransactionAnnotation方法解析详细的属性:

     1     protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
     2         RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
     3 
     4         //解析propagation
     5         Propagation propagation = attributes.getEnum("propagation");
     6         rbta.setPropagationBehavior(propagation.value());
     7         //解析isolation
     8         Isolation isolation = attributes.getEnum("isolation");
     9         rbta.setIsolationLevel(isolation.value());
    10         //解析timeout
    11         rbta.setTimeout(attributes.getNumber("timeout").intValue());
    12         //解析readOnly
    13         rbta.setReadOnly(attributes.getBoolean("readOnly"));
    14         //解析value
    15         rbta.setQualifier(attributes.getString("value"));
    16 
    17         //解析rollbackFor
    18         List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
    19         for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
    20             rollbackRules.add(new RollbackRuleAttribute(rbRule));
    21         }
    22         //解析rollbackForClassName
    23         for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
    24             rollbackRules.add(new RollbackRuleAttribute(rbRule));
    25         }
    26         //解析noRollbackFor
    27         for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
    28             rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    29         }
    30         //noRollbackForClassName
    31         for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
    32             rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    33         }
    34         rbta.setRollbackRules(rollbackRules);
    35 
    36         return rbta;
    37     }

          上面方法中实现了对对应类或者方法的事务属性解析,你会在这个类中看到任何你常用或者不常用的属性提取。

          至此,我们终于完成了事务标签的解析。我们是不是分析得太远了,似乎已经忘记了从哪里开始了。再回顾一下,我们现在的任务是找出某个增强器是否适合于对应的类,而是否匹配的关键则在于是否从指定的类或类中的方法中找到对应的事务属性,现在,我们以UserServiceImple为例,已经在它的接口UserService中找到了事务属性,所以,它是与事务增强器匹配的,也就是它会被事务功能修饰。

          至此,事务功能的初始化工作便结束了,当判断某个bean适用于事务增强时,也就是适用于增强器BeanFactoryTransactionAttributeSourceAdvisor。没错,还是这个类,所以说,在自定义标签解析时,注入的类成为了整个事务功能的基础。

          BeanFactoryTransactionAttributeSourceAdvisor作为Advisor的实现类,自然要遵从Advisor的处理方式,当代理被调用时会调用这个类的增强方法,也就是此bean的Advisor,又因为在解析事务定义标签时我们把TransactionInterceptor类型的bean注入到了BeanFactoryTransactionAttributeSourceAdvisor中,所以,在调用事务增强器增强的代理类时会首先执行TransactionInterceptor进行增强,同时,也就是TransactionInterceptor类的invoke方法中完成了整个事务的逻辑。

     

    三、事务增强器

          TransactionInterceptor支撑着整个事务功能的架构,逻辑还是相对复杂的,那么现在我们切入正题来分析此拦截器是如何实现事务特性的。TransactionInterceptor类(package org.springframework.transaction.interceptor)继承自MethodInterceptor,所以调用该类是从其invoke方法开始的:

    1     public Object invoke(MethodInvocation invocation) throws Throwable {
    2         // Work out the target class: may be {@code null}.
    3         // The TransactionAttributeSource should be passed the target class
    4         // as well as the method, which may be from an interface.
    5         Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    6 
    7         // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    8         return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    9     }

          其中又调用了其父类TransactionAspectSupport(package org.springframework.transaction.interceptor)的invokeWithinTransaction方法。

      1     protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
      2             final InvocationCallback invocation) throws Throwable {
      3 
      4         // If the transaction attribute is null, the method is non-transactional.
      5         //获取对应事务属性
      6         TransactionAttributeSource tas = getTransactionAttributeSource();
      7         final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
      8         //获取BeanFactory中的TransactionManager
      9         final TransactionManager tm = determineTransactionManager(txAttr);
     10 
     11         if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
     12             ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
     13                 if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
     14                     throw new TransactionUsageException(
     15                             "Unsupported annotated transaction on suspending function detected: " + method +
     16                             ". Use TransactionalOperator.transactional extensions instead.");
     17                 }
     18                 ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
     19                 if (adapter == null) {
     20                     throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
     21                             method.getReturnType());
     22                 }
     23                 return new ReactiveTransactionSupport(adapter);
     24             });
     25             return txSupport.invokeWithinTransaction(
     26                     method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
     27         }
     28 
     29         PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
     30         //构造方法唯一标识(类.方法 如service.UserServiceImpl.save)
     31         final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
     32 
     33         //声明式事务处理
     34         if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
     35             // Standard transaction demarcation with getTransaction and commit/rollback calls.
     36             //创建TransactionInfo
     37             TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
     38 
     39             Object retVal;
     40             try {
     41                 // This is an around advice: Invoke the next interceptor in the chain.
     42                 // This will normally result in a target object being invoked.
     43                 //执行被增强方法
     44                 retVal = invocation.proceedWithInvocation();
     45             }
     46             catch (Throwable ex) {
     47                 // target invocation exception
     48                 //异常回滚
     49                 completeTransactionAfterThrowing(txInfo, ex);
     50                 throw ex;
     51             }
     52             finally {
     53                 //清除信息
     54                 cleanupTransactionInfo(txInfo);
     55             }
     56 
     57             if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
     58                 // Set rollback-only in case of Vavr failure matching our rollback rules...
     59                 TransactionStatus status = txInfo.getTransactionStatus();
     60                 if (status != null && txAttr != null) {
     61                     retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
     62                 }
     63             }
     64 
     65             //提交事务
     66             commitTransactionAfterReturning(txInfo);
     67             return retVal;
     68         }
     69 
     70         else {
     71             final ThrowableHolder throwableHolder = new ThrowableHolder();
     72 
     73             // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
     74             //编程式事务处理
     75             try {
     76                 Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
     77                     TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
     78                     try {
     79                         Object retVal = invocation.proceedWithInvocation();
     80                         if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
     81                             // Set rollback-only in case of Vavr failure matching our rollback rules...
     82                             retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
     83                         }
     84                         return retVal;
     85                     }
     86                     catch (Throwable ex) {
     87                         if (txAttr.rollbackOn(ex)) {
     88                             // A RuntimeException: will lead to a rollback.
     89                             if (ex instanceof RuntimeException) {
     90                                 throw (RuntimeException) ex;
     91                             }
     92                             else {
     93                                 throw new ThrowableHolderException(ex);
     94                             }
     95                         }
     96                         else {
     97                             // A normal return value: will lead to a commit.
     98                             throwableHolder.throwable = ex;
     99                             return null;
    100                         }
    101                     }
    102                     finally {
    103                         cleanupTransactionInfo(txInfo);
    104                     }
    105                 });
    106 
    107                 // Check result state: It might indicate a Throwable to rethrow.
    108                 if (throwableHolder.throwable != null) {
    109                     throw throwableHolder.throwable;
    110                 }
    111                 return result;
    112             }
    113             catch (ThrowableHolderException ex) {
    114                 throw ex.getCause();
    115             }
    116             catch (TransactionSystemException ex2) {
    117                 if (throwableHolder.throwable != null) {
    118                     logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
    119                     ex2.initApplicationException(throwableHolder.throwable);
    120                 }
    121                 throw ex2;
    122             }
    123             catch (Throwable ex2) {
    124                 if (throwableHolder.throwable != null) {
    125                     logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
    126                 }
    127                 throw ex2;
    128             }
    129         }
    130     }

          从上面的函数中,我们尝试整理下事务处理的脉络,在Spring中支持两种事务处理的方式,分别是声明式事务处理和编程式事务处理,两者相对于开发人员来讲差别很大,但是对于Spring中的实现来讲,大同小异。在invoke中我们也可以看到这两种方式的实现。考虑到对事务的应用比声明式的事务处理使用起来方便,也相对流行些,我们就此种方式进行分析。对于声明式的事务处理主要有以下几个步骤。

          ① 获取事务的属性。

          对于事务处理来说,最基础或者最首要的工作便是获取事务的属性了。这是支撑整个事务功能的基石。如果没有事务属性,其他功能也就无从谈起,在分析事务准备阶段我们已经分析了事务属性提取的功能,大家应该有所了解。

          ② 加载配置中配置的TransactionManager。

          ③ 不同的事务处理方式使用不同的逻辑。

          对于声明式事务的处理与编程式事务的处理,第一点区别在于事务属性上,因为编程式的事务处理是不需要有事务属性的,第二点区别就是在TransactionManager上,CallbackPreferringPlatformTransactionManager实现了PlatformTransactionManager接口,暴露出一个方法用于执行事务处理中的回调。所以,这两种方式都可以用作事务处理方式的判断。

          ④ 在目标方法执行前获取事务并收集事务信息。

          事务信息与事务属性并不相同,也就是TransactionInfo与TransactionAttribute并不相同,TransactionInfo包含TransactionAttribute信息,但是,除了TransactionAttribute外还有其他事务信息,例如PlatformTransactionManager以及TransactionStatus相关信息。

          ⑤ 执行目标方法。

          ⑥ 一旦出现异常,尝试异常处理。

          ⑦ 提交事务前的事务信息清除。

          ⑧ 提交事务。

          上面的步骤分析旨在让大家对事务功能与步骤有个大致的了解,具体的功能还需要详细地分析。

    (一)创建事务

          我们先分析事务创建的过程。TransactionAspectSupport类(package org.springframework.transaction.interceptor)的createTransactionIfNecessary方法:

     1     protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
     2             @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
     3 
     4         // If no name specified, apply method identification as transaction name.
     5         //如果没有名称指定则使用方法唯一标识,并使用DelegatingTransactionAttribute封装txAttr
     6         if (txAttr != null && txAttr.getName() == null) {
     7             txAttr = new DelegatingTransactionAttribute(txAttr) {
     8                 @Override
     9                 public String getName() {
    10                     return joinpointIdentification;
    11                 }
    12             };
    13         }
    14 
    15         TransactionStatus status = null;
    16         if (txAttr != null) {
    17             if (tm != null) {
    18                 //获取TransactionStatus
    19                 status = tm.getTransaction(txAttr);
    20             }
    21             else {
    22                 if (logger.isDebugEnabled()) {
    23                     logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
    24                             "] because no transaction manager has been configured");
    25                 }
    26             }
    27         }
    28         //根据指定的属性与status准备一个TransactionInfo
    29         return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    30     }

          对于createTransactionIfNecessary函数主要做了这样几件事情。

          ① 使用DelegatingTransactionAttribute封装传入的TransactionAttribute实例。

          对于传入的TransactionAttribute类型的参数txAttr,当前的实际类型是RuleBasedTransactionAttribute,是由获取事务属性时生成的,主要用于数据承载,而这里之所以使用DelegatingTransactionAttribute进行封装,当时是提供了更多的功能。

          ② 获取事务

          事务处理当然是以事务为核心,那么获取事务就是最重要的事情。

          ③ 构建事务信息

          根据之前几个步骤获取的信息构建TransactionInfo并返回。

          我们分别对以上步骤进行详细的解析。

    (1)获取事务

          Spring中使用getTransaction来处理事务的准备工作,包括事务获取以及信息的构建。getTransaction函数在实现PlatformTransactionManager接口的AbstractPlatformTransactionManager抽象类(package org.springframework.transaction.support)中给出了具体实现方法:

     1     public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
     2             throws TransactionException {
     3 
     4         // Use defaults if no transaction definition given.
     5         TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
     6 
     7         Object transaction = doGetTransaction();
     8         boolean debugEnabled = logger.isDebugEnabled();
     9 
    10         //判断当前线程是否存在事务,判读依据为当前线程记录的连接不为空且连接中(connectionHolder)中的transactionActive属性不为空
    11         if (isExistingTransaction(transaction)) {
    12             // Existing transaction found -> check propagation behavior to find out how to behave.
    13             //当前线程已经存在事务
    14             return handleExistingTransaction(def, transaction, debugEnabled);
    15         }
    16 
    17         // Check definition settings for new transaction.
    18         //事务超时设置验证
    19         if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    20             throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    21         }
    22 
    23         // No existing transaction found -> check propagation behavior to find out how to proceed.
    24         //如果当前线程不存在事务,但是PropagationBehavior却被声明为PROPAGATION_MANDATORY抛出异常
    25         if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    26             throw new IllegalTransactionStateException(
    27                     "No existing transaction found for transaction marked with propagation 'mandatory'");
    28         }
    29         else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    30                 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    31                 def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    32             //PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED都需要新疆事务
    33             //空挂起
    34             SuspendedResourcesHolder suspendedResources = suspend(null);
    35             if (debugEnabled) {
    36                 logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
    37             }
    38             try {
    39                 return startTransaction(def, transaction, debugEnabled, suspendedResources);
    40             }
    41             catch (RuntimeException | Error ex) {
    42                 resume(null, suspendedResources);
    43                 throw ex;
    44             }
    45         }
    46         else {
    47             // Create "empty" transaction: no actual transaction, but potentially synchronization.
    48             if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    49                 logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    50                         "isolation level will effectively be ignored: " + def);
    51             }
    52             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    53             return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    54         }
    55     }

       上面函数调用了本类中的startTransaction函数:

     1     private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
     2             boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
     3 
     4         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
     5         DefaultTransactionStatus status = newTransactionStatus(
     6                 definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
     7         //构造transaction,包括设置ConnectionHolder、隔离级别、timeout。如果是新连接,绑定到当前线程
     8         doBegin(transaction, definition);
     9         //新同步事务的设置,针对当前线程的设置
    10         prepareSynchronization(status, definition);
    11         return status;
    12     }

          当然,在Spring中每个负责的功能实现,并不是一次完成的,而是会通过入口函数进行一个框架的搭建,初步构建完整的逻辑,而将实现细节分摊给不同的函数。那么,让我们看看事务的准备工作都包括哪些。

           获取事务

          创建对应的事务实例,这里使用的是DataSourceTransactionManager(继承了AbstractPlatformTransactionManager抽象类)中的doGetTransaction方法,创建基于JDBC的事务实例。如果当前线程中存在关于dataSource的连接,那么直接使用。这里有一个对保存点的设置,是否开启运行保存点取决于是否设置了允许嵌入式事务。DataSourceTransactionManager类(package org.springframework.jdbc.datasource)的doGetTransaction方法:

     1     protected Object doGetTransaction() {
     2         DataSourceTransactionObject txObject = new DataSourceTransactionObject();
     3         txObject.setSavepointAllowed(isNestedTransactionAllowed());
     4         //如果当前线程已经记录数据库连接则使用原有连接
     5         ConnectionHolder conHolder =
     6                 (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
     7         //fasle表示非新创建连接。
     8         txObject.setConnectionHolder(conHolder, false);
     9         return txObject;
    10     }

          ② 如果当前线程存在事务,则转向嵌套事务的处理。

          ③ 事务超时设置验证。

          ④ 事务propagationBehavior属性的设置验证。

          ⑤ 构建DefaultTransactionStatus。

          ⑥ 完善transcation,包括设置ConnectionHolder、隔离级别、timeout,如果是新连接,则绑定到当前线程。

          对于一些隔离级别,timeout等功能的设置并不是由Spring来完成的,而是委托给底层的数据库连接去做的,而对于数据库连接的设置就是在doBegin函数中处理的。DataSourceTransactionManager类的doBegin方法:

        protected void doBegin(Object transaction, TransactionDefinition definition) {
            DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
            Connection con = null;
    
            try {
                if (!txObject.hasConnectionHolder() ||
                        txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                    Connection newCon = obtainDataSource().getConnection();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                    }
                    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
                }
    
                txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
                con = txObject.getConnectionHolder().getConnection();
    
                //设置隔离级别
                Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
                txObject.setPreviousIsolationLevel(previousIsolationLevel);
                txObject.setReadOnly(definition.isReadOnly());
    
                // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
                // so we don't want to do it unnecessarily (for example if we've explicitly
                // configured the connection pool to set it already).
                //更改自动提交设置,由Spring控制提交
                if (con.getAutoCommit()) {
                    txObject.setMustRestoreAutoCommit(true);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                    }
                    con.setAutoCommit(false);
                }
    
                prepareTransactionalConnection(con, definition);
                //设置判断当前线程是否存在事务的依据
                txObject.getConnectionHolder().setTransactionActive(true);
    
                int timeout = determineTimeout(definition);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                    txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
                }
    
                // Bind the connection holder to the thread.
                if (txObject.isNewConnectionHolder()) {
                    //将当前获取到的连接绑定到当前线程
                    TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
                }
            }
    
            catch (Throwable ex) {
                if (txObject.isNewConnectionHolder()) {
                    DataSourceUtils.releaseConnection(con, obtainDataSource());
                    txObject.setConnectionHolder(null, false);
                }
                throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
            }
        }

          可以说事务是从这个函数开始的,因为在这个函数中已经开始尝试了对数据库连接的获取,当然,在获取数据库连接的同时,一些必要的设置也是需要同步设置的。

          1) 尝试获取连接。

          当然并不是每次都会获取新的连接,如果当前线程中的ConnectionHolder已经存在,则没有必要再次获取,或者,对于事务同步表示设置为true的需要重新获取连接。

          2) 设置隔离级别以及只读标识。

          你是否有过这样的错觉?事务中的只读配置是Spring中做了一些处理呢?Spring中确实是针对只读操作做了一些处理,但是核心的实现是设置connection上的readOnly属性。通用,对于隔离级别的控制也是交由connection去控制的。

          3) 更改默认的提交设置。

          如果事务属性是自动提交的,那么需要改变这种设置,而将提交操作委托给Spring来处理。

          4) 设置标志位,标识当前连接已经被事务激活。

          5) 设置过期时间。

          6) 将connectionHolder绑定到当前线程。

          设置隔离级别的prepareConnectionForTransaction函数用于负责对底层数据库连接的设置。当然,只是包含只读标识和隔离级别的设置。由于强大的日志及异常处理,显得函数代码量比较大,但是单从业务角度去看,关键代码其实是不多的。DataSourceUtils类(package org.springframework.jdbc.datasource)的prepareConnectionForTransaction函数:

     1     public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)
     2             throws SQLException {
     3 
     4         Assert.notNull(con, "No Connection specified");
     5 
     6         // Set read-only flag.
     7         //设置数据连接的只读标识
     8         if (definition != null && definition.isReadOnly()) {
     9             try {
    10                 if (logger.isDebugEnabled()) {
    11                     logger.debug("Setting JDBC Connection [" + con + "] read-only");
    12                 }
    13                 con.setReadOnly(true);
    14             }
    15             catch (SQLException | RuntimeException ex) {
    16                 Throwable exToCheck = ex;
    17                 while (exToCheck != null) {
    18                     if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
    19                         // Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0
    20                         throw ex;
    21                     }
    22                     exToCheck = exToCheck.getCause();
    23                 }
    24                 // "read-only not supported" SQLException -> ignore, it's just a hint anyway
    25                 logger.debug("Could not set JDBC Connection read-only", ex);
    26             }
    27         }
    28 
    29         // Apply specific isolation level, if any.
    30         //设置数据库连接的隔离级别
    31         Integer previousIsolationLevel = null;
    32         if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
    33             if (logger.isDebugEnabled()) {
    34                 logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +
    35                         definition.getIsolationLevel());
    36             }
    37             int currentIsolation = con.getTransactionIsolation();
    38             if (currentIsolation != definition.getIsolationLevel()) {
    39                 previousIsolationLevel = currentIsolation;
    40                 con.setTransactionIsolation(definition.getIsolationLevel());
    41             }
    42         }
    43 
    44         return previousIsolationLevel;
    45     }

          ⑦ 将事务信息记录在当前线程中。AbstractPlatformTransactionManager类(package org.springframework.transaction.support)中的prepareSynchronization函数:

     1     protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
     2         if (status.isNewSynchronization()) {
     3             TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
     4             TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
     5                     definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
     6                             definition.getIsolationLevel() : null);
     7             TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
     8             TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
     9             TransactionSynchronizationManager.initSynchronization();
    10         }
    11     }

    (2)处理已经存在的事务

      AbstractPlatformTransactionManager类(package org.springframework.transaction.support)中的handleExistingTransaction函数:

     1     private TransactionStatus handleExistingTransaction(
     2             TransactionDefinition definition, Object transaction, boolean debugEnabled)
     3             throws TransactionException {
     4 
     5         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
     6             throw new IllegalTransactionStateException(
     7                     "Existing transaction found for transaction marked with propagation 'never'");
     8         }
     9 
    10         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    11             if (debugEnabled) {
    12                 logger.debug("Suspending current transaction");
    13             }
    14             Object suspendedResources = suspend(transaction);
    15             boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    16             return prepareTransactionStatus(
    17                     definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    18         }
    19 
    20         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    21             if (debugEnabled) {
    22                 logger.debug("Suspending current transaction, creating new transaction with name [" +
    23                         definition.getName() + "]");
    24             }
    25             //新事物的建立
    26             SuspendedResourcesHolder suspendedResources = suspend(transaction);
    27             try {
    28                 return startTransaction(definition, transaction, debugEnabled, suspendedResources);
    29             }
    30             catch (RuntimeException | Error beginEx) {
    31                 resumeAfterBeginException(transaction, suspendedResources, beginEx);
    32                 throw beginEx;
    33             }
    34         }
    35 
    36         //嵌入式事务的处理
    37         if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    38             if (!isNestedTransactionAllowed()) {
    39                 throw new NestedTransactionNotSupportedException(
    40                         "Transaction manager does not allow nested transactions by default - " +
    41                         "specify 'nestedTransactionAllowed' property with value 'true'");
    42             }
    43             if (debugEnabled) {
    44                 logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
    45             }
    46             if (useSavepointForNestedTransaction()) {
    47                 // Create savepoint within existing Spring-managed transaction,
    48                 // through the SavepointManager API implemented by TransactionStatus.
    49                 // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
    50                 //如果没有可以使用保存点的方式控制事务回滚,那么在嵌入式事务的建立初始建立保存点
    51                 DefaultTransactionStatus status =
    52                         prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    53                 status.createAndHoldSavepoint();
    54                 return status;
    55             }
    56             else {
    57                 // Nested transaction through nested begin and commit/rollback calls.
    58                 // Usually only for JTA: Spring synchronization might get activated here
    59                 // in case of a pre-existing JTA transaction.
    60                 return startTransaction(definition, transaction, debugEnabled, null);
    61             }
    62         }
    63 
    64         // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    65         if (debugEnabled) {
    66             logger.debug("Participating in existing transaction");
    67         }
    68         if (isValidateExistingTransaction()) {
    69             if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
    70                 Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    71                 if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
    72                     Constants isoConstants = DefaultTransactionDefinition.constants;
    73                     throw new IllegalTransactionStateException("Participating transaction with definition [" +
    74                             definition + "] specifies isolation level which is incompatible with existing transaction: " +
    75                             (currentIsolationLevel != null ?
    76                                     isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
    77                                     "(unknown)"));
    78                 }
    79             }
    80             if (!definition.isReadOnly()) {
    81                 if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
    82                     throw new IllegalTransactionStateException("Participating transaction with definition [" +
    83                             definition + "] is not marked as read-only but existing transaction is");
    84                 }
    85             }
    86         }
    87         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    88         return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    89     }

          对于已经存在事务的处理过程中,我们看到了很多熟悉的操作,但是,也有不同的地方,函数中对已经存在的事务处理考虑两种情况。

          ① PROPAGATION_REQUIRES_NEW表示当前方法必须在它自己的事务里运行,一个新的事务将被启动,而如果一个事务正在运行的话,则在这个方法运行期间被挂起。而Spring中国对于此种传播方式的处理与新事物建立最大的不同点在于使用suspend方法将原事务挂起。将信息挂起的目的当然是为了在当前事务执行完毕后再将原事务还原。

          ② PROPAGATION_NESTED表示如果当前正有一个事务在运行中,则该方法应该运行在一个嵌套的事务中,被嵌套的事务可以独立于封装事务进行提交或者回滚,如果封装事务不存在,行为就像PROPAGATION_REQUIRES_NEW。对于嵌入式事务的处理,Spring中主要考虑了两种方式的处理。

          1) Spring中允许嵌入事务的时候,则首选设置保存点的方式作为异常处理的回滚。

          2) 对于其他方式,比如JTA(Java Transaction API)无法使用保存点的方式,那么处理方式与PROPAGATION_REQUIRES_NEW相同,而一旦出现异常,则由Spring的事务异常处理机制去完成后续操作。

          对于挂起操作的主要目的是记录原有事务的状态,以便于后续操作对事务的恢复。AbstractPlatformTransactionManager类中的suspend函数:

     1     protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
     2         if (TransactionSynchronizationManager.isSynchronizationActive()) {
     3             List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
     4             try {
     5                 Object suspendedResources = null;
     6                 if (transaction != null) {
     7                     suspendedResources = doSuspend(transaction);
     8                 }
     9                 String name = TransactionSynchronizationManager.getCurrentTransactionName();
    10                 TransactionSynchronizationManager.setCurrentTransactionName(null);
    11                 boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
    12                 TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
    13                 Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    14                 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
    15                 boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
    16                 TransactionSynchronizationManager.setActualTransactionActive(false);
    17                 return new SuspendedResourcesHolder(
    18                         suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
    19             }
    20             catch (RuntimeException | Error ex) {
    21                 // doSuspend failed - original transaction is still active...
    22                 doResumeSynchronization(suspendedSynchronizations);
    23                 throw ex;
    24             }
    25         }
    26         else if (transaction != null) {
    27             // Transaction active but no synchronization active.
    28             Object suspendedResources = doSuspend(transaction);
    29             return new SuspendedResourcesHolder(suspendedResources);
    30         }
    31         else {
    32             // Neither transaction nor synchronization active.
    33             return null;
    34         }
    35     }

    (3)准备事务信息

          当已经建立事务连接并完成了事务信息的提取后,我们需要将所有的事务信息统一记录在TransactionInfo。TransactionAspectSupport类(package org.springframework.transaction.interceptor)的prepareTransactionInfo函数:

     1     protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
     2             @Nullable TransactionAttribute txAttr, String joinpointIdentification,
     3             @Nullable TransactionStatus status) {
     4 
     5         TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
     6         if (txAttr != null) {
     7             // We need a transaction for this method...
     8             if (logger.isTraceEnabled()) {
     9                 logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
    10             }
    11             // The transaction manager will flag an error if an incompatible tx already exists.
    12             //记录事务的状态
    13             txInfo.newTransactionStatus(status);
    14         }
    15         else {
    16             // The TransactionInfo.hasTransaction() method will return false. We created it only
    17             // to preserve the integrity of the ThreadLocal stack maintained in this class.
    18             if (logger.isTraceEnabled()) {
    19                 logger.trace("No need to create transaction for [" + joinpointIdentification +
    20                         "]: This method is not transactional.");
    21             }
    22         }
    23 
    24         // We always bind the TransactionInfo to the thread, even if we didn't create
    25         // a new transaction here. This guarantees that the TransactionInfo stack
    26         // will be managed correctly even if no transaction was created by this aspect.
    27         txInfo.bindToThread();
    28         return txInfo;
    29     }

     (二)回滚处理

          之前已经完成了目标方法运行前的事务准备工作,而这些准备工作最大的目的无非是对于程序没有安装我们期待的那样进行,也就是出现特定的错误。那么,当出现错误的时候,Spring是怎么对数据进行恢复的呢?TransactionAspectSupport类(package org.springframework.transaction.interceptor)的completeTransactionAfterThrowing函数:

     1     protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
     2         //当抛出异常时首先判断当前是否存在事务,这是基础依据
     3         if (txInfo != null && txInfo.getTransactionStatus() != null) {
     4             if (logger.isTraceEnabled()) {
     5                 logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
     6                         "] after exception: " + ex);
     7             }
     8             //这里判断是否回滚默认的依据是抛出的异常是否是RuntimeException或者是Error的类型
     9             if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
    10                 try {
    11                     //根据TransactionStatus信息进行回滚处理
    12                     txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
    13                 }
    14                 catch (TransactionSystemException ex2) {
    15                     logger.error("Application exception overridden by rollback exception", ex);
    16                     ex2.initApplicationException(ex);
    17                     throw ex2;
    18                 }
    19                 catch (RuntimeException | Error ex2) {
    20                     logger.error("Application exception overridden by rollback exception", ex);
    21                     throw ex2;
    22                 }
    23             }
    24             else {
    25                 // We don't roll back on this exception.
    26                 // Will still roll back if TransactionStatus.isRollbackOnly() is true.
    27                 //如果不满足回滚条件即使抛出异常也同样会提交
    28                 try {
    29                     txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    30                 }
    31                 catch (TransactionSystemException ex2) {
    32                     logger.error("Application exception overridden by commit exception", ex);
    33                     ex2.initApplicationException(ex);
    34                     throw ex2;
    35                 }
    36                 catch (RuntimeException | Error ex2) {
    37                     logger.error("Application exception overridden by commit exception", ex);
    38                     throw ex2;
    39                 }
    40             }
    41         }
    42     }

          在对目标方法的执行过程中,一旦出现Throwable就会被引导至此方法处理,但是并不代表所有的Throwable都会被回滚处理,比如我们最常用的Exception,默认是不会被处理的。默认情况下,即使出现异常,数据也会被正常提交,而这个关键的地方就是在txInfo.transactionAttribute.rollbackOn(ex)这个函数。

    (1)回滚条件

    1 public boolean rollbackOn(Throwable ex) {
    2    return (new instanceof RuntimeException || ex instanceof Error);
    3 }

      看到了吗?默认情况下Spring中的事务异常机制只对RuntimeException和Error两种情况感兴趣,当然你可以通过扩展来改变,不过,我们最常用的还是使用事务提供的属性设置,利用注解方式的使用,例如:

    @Transactional(propagation=Propagation.REQUIRED, rollbackFor=Exception.class)

    (2)回滚处理

          当然,一旦符合回滚条件,那么Spring就会将程序引导至回滚处理函数中。AbstractPlatformTransactionManager类(package org.springframework.transaction.support)中的rollback函数:

     1     public final void rollback(TransactionStatus status) throws TransactionException {
     2         //如果事务已经完成,那么在此回滚会抛出异常
     3         if (status.isCompleted()) {
     4             throw new IllegalTransactionStateException(
     5                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
     6         }
     7 
     8         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
     9         processRollback(defStatus, false);
    10     }

       上面函数又调用了本类中的processRollback函数:

     1     private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
     2         try {
     3             boolean unexpectedRollback = unexpected;
     4 
     5             try {
     6                 //激活所有TransactionSynchronizatioin中对应的方法
     7                 triggerBeforeCompletion(status);
     8 
     9                 if (status.hasSavepoint()) {
    10                     if (status.isDebug()) {
    11                         logger.debug("Rolling back transaction to savepoint");
    12                     }
    13                     //如果有保存点,也就是当前事务为单独的线程则会退到保存点
    14                     status.rollbackToHeldSavepoint();
    15                 }
    16                 else if (status.isNewTransaction()) {
    17                     if (status.isDebug()) {
    18                         logger.debug("Initiating transaction rollback");
    19                     }
    20                     //如果当前事务为独立的新事物,则直接回退
    21                     doRollback(status);
    22                 }
    23                 else {
    24                     // Participating in larger transaction
    25                     if (status.hasTransaction()) {
    26                         if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
    27                             if (status.isDebug()) {
    28                                 logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
    29                             }
    30                             //如果当前事务不是独立的事务,那么只能标记状态,等待事务链执行完毕后统一回滚
    31                             doSetRollbackOnly(status);
    32                         }
    33                         else {
    34                             if (status.isDebug()) {
    35                                 logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
    36                             }
    37                         }
    38                     }
    39                     else {
    40                         logger.debug("Should roll back transaction but cannot - no transaction available");
    41                     }
    42                     // Unexpected rollback only matters here if we're asked to fail early
    43                     if (!isFailEarlyOnGlobalRollbackOnly()) {
    44                         unexpectedRollback = false;
    45                     }
    46                 }
    47             }
    48             catch (RuntimeException | Error ex) {
    49                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    50                 throw ex;
    51             }
    52 
    53             //激活所有TransactionSynchronization中对应的方法
    54             triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    55 
    56             // Raise UnexpectedRollbackException if we had a global rollback-only marker
    57             if (unexpectedRollback) {
    58                 throw new UnexpectedRollbackException(
    59                         "Transaction rolled back because it has been marked as rollback-only");
    60             }
    61         }
    62         finally {
    63             //清空记录的资源并将挂起的资源恢复
    64             cleanupAfterCompletion(status);
    65         }
    66     }

          同样,对于在Spring中的复杂的逻辑处理过程,在入口函数一般都会给出个整体的处理脉络,而吧实现细节委托给其他函数去执行。我们尝试总结下Spring中对于回滚处理的大致脉络如下:

          ① 首先是自定义触发器的调用,包括在回滚前、完成回滚后的调用,当然完成回滚包括正常回滚与回滚过程中出现异常,自定义的触发器会根据这些信息作进一步处理,而对于触发器的注册,常见的是在回调过程中通过TransactionSynchronizationManager(package org.springframework.transaction.support)类中的静态方法registerSynchronization直接注册:

     1     public static void registerSynchronization(TransactionSynchronization synchronization)
     2             throws IllegalStateException {
     3 
     4         Assert.notNull(synchronization, "TransactionSynchronization must not be null");
     5         Set<TransactionSynchronization> synchs = synchronizations.get();
     6         if (synchs == null) {
     7             throw new IllegalStateException("Transaction synchronization is not active");
     8         }
     9         synchs.add(synchronization);
    10     }

         ② 除了触发监听函数外,就是真正的回滚逻辑处理了。

          当之前已经保存的事务信息中有保存点信息的时候,使用保存点信息进行回滚。常用语嵌入式事务。对于嵌入式的事务的处理,内嵌的事务异常并不会引起外部事务的回滚。

          根据保存点回滚的实现方式其实是根据底层的数据库连接进行的。AbstractTransactionStatus类(package org.springframework.transaction.support)的rollbackToHeldSavepoint函数:

     1     public void rollbackToHeldSavepoint() throws TransactionException {
     2         Object savepoint = getSavepoint();
     3         if (savepoint == null) {
     4             throw new TransactionUsageException(
     5                     "Cannot roll back to savepoint - no savepoint associated with current transaction");
     6         }
     7         getSavepointManager().rollbackToSavepoint(savepoint);
     8         getSavepointManager().releaseSavepoint(savepoint);
     9         setSavepoint(null);
    10     }

          1) 这里使用的是JDBC的方式连接数据库,那么getSavepointManager()函数返回的是JdbcTransactionObjectSupport,也就是说上面函数会调用JdbcTransactionObjectSupport类(package org.springframework.jdbc.datasource)中的rollbackToSavepoint方法:

     1     public void rollbackToSavepoint(Object savepoint) throws TransactionException {
     2         ConnectionHolder conHolder = getConnectionHolderForSavepoint();
     3         try {
     4             conHolder.getConnection().rollback((Savepoint) savepoint);
     5             conHolder.resetRollbackOnly();
     6         }
     7         catch (Throwable ex) {
     8             throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
     9         }
    10     }

           2) 当之前已经保存的事务信息中的事务为新事物,那么直接回滚。常用语单独事务的处理。对于没有保存点的回滚,Spring同样是使用底层数据库连接提供的API来操作的。由于我们使用的是DataSourceTransactionManager(package org.springframework.jdbc.datasource),那么doRollback函数会使用此类中的实现:

     1     protected void doRollback(DefaultTransactionStatus status) {
     2         DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
     3         Connection con = txObject.getConnectionHolder().getConnection();
     4         if (status.isDebug()) {
     5             logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
     6         }
     7         try {
     8             con.rollback();
     9         }
    10         catch (SQLException ex) {
    11             throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    12         }
    13     }

         3) 当前事务信息中表明是存在事务的,又不属于以上两种情况,多数用于JTA,只做回滚标识,等到提交的时候统一不提交。

    (3)回滚后的信息清除

          对于回滚逻辑执行结束后,无论回滚是否成功,都必须要做的事情就是事务结束后的收尾工作。AbstractPlatformTransactionManager类(package org.springframework.transaction.support)中的cleanupAfterCompletion函数:

     1     private void cleanupAfterCompletion(DefaultTransactionStatus status) {
     2         //设置完成状态
     3         status.setCompleted();
     4         if (status.isNewSynchronization()) {
     5             TransactionSynchronizationManager.clear();
     6         }
     7         if (status.isNewTransaction()) {
     8             doCleanupAfterCompletion(status.getTransaction());
     9         }
    10         if (status.getSuspendedResources() != null) {
    11             if (status.isDebug()) {
    12                 logger.debug("Resuming suspended transaction after completion of inner transaction");
    13             }
    14             Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
    15             //结束之前事务的挂起状态
    16             resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    17         }
    18     }

          从函数中得知,事务处理的收尾工作包括如下内容。

          ① 设置状态时对事务信息作完成标识以避免重复调用。

          ② 如果当前事务是新的同步状态,需要将绑定到当前线程的事务信息清除。

          ③ 如果是新事务需要做些清除资源的工作。DataSourceTransactionManager类(package org.springframework.jdbc.datasource)的doRollback方法:

     1     protected void doRollback(DefaultTransactionStatus status) {
     2         DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
     3         Connection con = txObject.getConnectionHolder().getConnection();
     4         if (status.isDebug()) {
     5             logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
     6         }
     7         try {
     8             con.rollback();
     9         }
    10         catch (SQLException ex) {
    11             throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    12         }
    13     }

          ④ 如果在事务执行前有事务挂起,那么当前事务执行结束后需要将挂起的事务恢复。AbstractPlatformTransactionManager类(package org.springframework.transaction.support)中的resume函数:

     1     protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
     2             throws TransactionException {
     3 
     4         if (resourcesHolder != null) {
     5             Object suspendedResources = resourcesHolder.suspendedResources;
     6             if (suspendedResources != null) {
     7                 doResume(transaction, suspendedResources);
     8             }
     9             List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
    10             if (suspendedSynchronizations != null) {
    11                 TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
    12                 TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
    13                 TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
    14                 TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
    15                 doResumeSynchronization(suspendedSynchronizations);
    16             }
    17         }
    18     }

     (三)事务提交

          之前我们分析了Spring的事务异常处理机制,那么事务的执行并没有出现任何的异常,也就意味着事务可以走正常事务提交的流程了。TransactionAspectSupport类(package org.springframework.transaction.interceptor)的commitTransactionAfterReturning函数:

    1     protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    2         if (txInfo != null && txInfo.getTransactionStatus() != null) {
    3             if (logger.isTraceEnabled()) {
    4                 logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
    5             }
    6             txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    7         }
    8     }

          在真正的数据提交之前,还需要做个判断。在我们分析事务异常处理规则的时候,当某个事务既没有保存点又不是新事务,Spring对它的处理放肆只是设置一个回滚标识。这个回滚标识在这里就会派上用场。主要的应用场景如下。

          某个事务是另一个事务的嵌入事务,但是,这些事务又不在Spring的管理范围内,或者无法设置保存点,那么Spring会通过设置回滚标识的方式来禁止提交。首先当某个嵌入事务发送回滚的时候会设置回滚标识,而等到外部事务提交时,一旦判断出当前事务流被设置了回滚标识,则由外部事务来统一进行整体事务的回滚。

          所以,当事务没有被异常捕获的时候也并不意味着一定会执行提交的过程。AbstractPlatformTransactionManager类(package org.springframework.transaction.support)的commit函数:

     1     public final void commit(TransactionStatus status) throws TransactionException {
     2         if (status.isCompleted()) {
     3             throw new IllegalTransactionStateException(
     4                     "Transaction is already completed - do not call commit or rollback more than once per transaction");
     5         }
     6 
     7         DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
     8         //如果在事务链中已经被标记回滚,那么不会尝试提交事务,直接回滚
     9         if (defStatus.isLocalRollbackOnly()) {
    10             if (defStatus.isDebug()) {
    11                 logger.debug("Transactional code has requested rollback");
    12             }
    13             processRollback(defStatus, false);
    14             return;
    15         }
    16 
    17         if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
    18             if (defStatus.isDebug()) {
    19                 logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
    20             }
    21             processRollback(defStatus, true);
    22             return;
    23         }
    24 
    25         //处理事务提交
    26         processCommit(defStatus);
    27     }

          而当事务执行一切都正常的时候,便可以真正地进入提交流程了。AbstractPlatformTransactionManager类的processCommit函数:

     1     private void processCommit(DefaultTransactionStatus status) throws TransactionException {
     2         try {
     3             boolean beforeCompletionInvoked = false;
     4 
     5             try {
     6                 boolean unexpectedRollback = false;
     7                 //预留
     8                 prepareForCommit(status);
     9                 //添加的TransactionSynchronization中对应方法的调用
    10                 triggerBeforeCommit(status);
    11                 //添加的TransactionSynchronization中对应方法的调用
    12                 triggerBeforeCompletion(status);
    13                 beforeCompletionInvoked = true;
    14 
    15                 if (status.hasSavepoint()) {
    16                     if (status.isDebug()) {
    17                         logger.debug("Releasing transaction savepoint");
    18                     }
    19                     unexpectedRollback = status.isGlobalRollbackOnly();
    20                     //如果存在保存点则清除保存点信息
    21                     status.releaseHeldSavepoint();
    22                 }
    23                 else if (status.isNewTransaction()) {
    24                     if (status.isDebug()) {
    25                         logger.debug("Initiating transaction commit");
    26                     }
    27                     unexpectedRollback = status.isGlobalRollbackOnly();
    28                     //如果是独立的事务则直接提交
    29                     doCommit(status);
    30                 }
    31                 else if (isFailEarlyOnGlobalRollbackOnly()) {
    32                     unexpectedRollback = status.isGlobalRollbackOnly();
    33                 }
    34 
    35                 // Throw UnexpectedRollbackException if we have a global rollback-only
    36                 // marker but still didn't get a corresponding exception from commit.
    37                 if (unexpectedRollback) {
    38                     throw new UnexpectedRollbackException(
    39                             "Transaction silently rolled back because it has been marked as rollback-only");
    40                 }
    41             }
    42             catch (UnexpectedRollbackException ex) {
    43                 // can only be caused by doCommit
    44                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    45                 throw ex;
    46             }
    47             catch (TransactionException ex) {
    48                 // can only be caused by doCommit
    49                 if (isRollbackOnCommitFailure()) {
    50                     doRollbackOnCommitException(status, ex);
    51                 }
    52                 else {
    53                     triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    54                 }
    55                 throw ex;
    56             }
    57             catch (RuntimeException | Error ex) {
    58                 if (!beforeCompletionInvoked) {
    59                     //添加的TransactionSynchronization中的对应方法的调用
    60                     triggerBeforeCompletion(status);
    61                 }
    62                 //提交过程中出现异常则回滚
    63                 doRollbackOnCommitException(status, ex);
    64                 throw ex;
    65             }
    66 
    67             // Trigger afterCommit callbacks, with an exception thrown there
    68             // propagated to callers but the transaction still considered as committed.
    69             try {
    70                 //添加的TransactionSynchronization中对应方法的调用
    71                 triggerAfterCommit(status);
    72             }
    73             finally {
    74                 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
    75             }
    76 
    77         }
    78         finally {
    79             cleanupAfterCompletion(status);
    80         }
    81     }

          在提交过程中也不是直接提交的,而是考虑了诸多的方面,符合提交的条件如下:

          ① 当事务状态中有保存点信息的话便不会去提交事务。

          ② 当事务非新事务的时候也不会去执行提交事务操作。

          此条件主要考虑内嵌事务的情况,对于内嵌事务,在Spring中正常的处理方式是将内嵌事务开始之前设置保存点,一旦内嵌事务出现异常便根据保存点信息进行回滚,但是如果没有出现异常,内嵌事务并不会单独提交,而是根据事务流由最外层事务负责提交。所有如果当前存在保存点信息便不是最外层事务,不做保存操作。对于是否是新事务的判断也是基于此考虑。

          如果程序通过了事务的层层把关,最后顺利进入了提交流程,那么同样,Spring会将事务提交的操作引导到底层数据库连接的API,进行事务提交。DataSourceTransactionManager类(package org.springframework.jdbc.datasource)的doCommit方法:

     1     protected void doCommit(DefaultTransactionStatus status) {
     2         DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
     3         Connection con = txObject.getConnectionHolder().getConnection();
     4         if (status.isDebug()) {
     5             logger.debug("Committing JDBC transaction on Connection [" + con + "]");
     6         }
     7         try {
     8             con.commit();
     9         }
    10         catch (SQLException ex) {
    11             throw new TransactionSystemException("Could not commit JDBC transaction", ex);
    12         }
    13     }

      本文摘自《Spring源码深度解析》事务,作者:郝佳。本文代码基于的Spring版本为5.2.4.BUILD-SNAPSHOT,和原书代码部分会略有不同。

    拓展阅读:

      Spring框架之beans源码完全解析
      Spring框架之AOP源码完全解析
      Spring框架之jdbc源码完全解析
      Spring源码深度解析之数据库连接JDBC
      Spring框架之jms源码完全解析
      Spring框架之事务源码完全解析
      Spring源码深度解析之事务
      Spring源码深度解析之Spring MVC
      Spring框架之websocket源码完全解析
      WebSocket协议中文版
      Spring框架之spring-web web源码完全解析
      Spring框架之spring-web http源码完全解析
      Spring框架之spring-webmvc源码完全解析

    博众家之所长,集群英之荟萃。遴选各IT领域精品雄文!

    欢迎关注“IT架构精选”

  • 相关阅读:
    skywalking简介
    .Net Core微服务——Consul(4):搭建集群
    .Net Core微服务——Consul(3):健康检查
    .Net Core微服务——Consul(2):自动扩展、服务调用
    .Net Core微服务——Consul(1):服务发现
    SpringBoot数据访问之整合Mybatis配置文件
    SpringBoot数据访问之Druid启动器的使用
    SpringBoot数据访问之Druid数据源的自定义使用
    Spring Boot核心技术之Restful映射以及源码的分析
    SpringBoot之yaml语法及静态资源访问
  • 原文地址:https://www.cnblogs.com/xxkj/p/14287924.html
Copyright © 2011-2022 走看看