zoukankan      html  css  js  c++  java
  • SpringAOP——事务实现

    承接上文,<tx:annotation-driven />开启声明式事务时,在SpringIOC容器中初始化了4个Bean

        <!-- 事务管理 -->
        <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="dataSource"/>
        </bean>
    
        <!-- 启用声明式事务管理 支持注解@Transaction-->
        <tx:annotation-driven proxy-target-class="false" transaction-manager="transactionManager"/>
    TransactionalEventListenerFactory
    AnnotationTransactionAttributeSource
    TransactionInterceptor//SpringAOP方法执行时的责任链拦截器
    BeanFactoryTransactionAttributeSourceAdvisor//直接用bean创建的Advisor

    由于前面没有具体深入了解每个Bean的作用以及实现,所以面试被难住了,补充一下Spring事务的具体实现。弄清楚三点:

    • 每个组件的作用
    • 事务的实现流程
    • 与Mybatis的对接实现

    一、组件初始化

    /*  org.springframework.transaction.config.AnnotationDrivenBeanDefinitionParser#registerTransactionalEventListenerFactory */
        private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
            //创建TransactionEvenListenerFactory的Beandefinition
            RootBeanDefinition def = new RootBeanDefinition();
            def.setBeanClass(TransactionalEventListenerFactory.class);
            parserContext.registerBeanComponent(new BeanComponentDefinition(def,
                    TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
        }
    
    /* org.springframework.transaction.config.AnnotationDrivenBeanDefinitionParser.AopAutoProxyConfigurer#configureAutoProxyCreator */
    public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
                //若未注册,先注册SpringAOP相关Beandefinition
                AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
    
                String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
                if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
                    Object eleSource = parserContext.extractSource(element);
    
                    //Source的Beandefinition
                    //beanName = org.springframework.transaction.annotation.AnnotationTransactionAttributeSource#0
                    RootBeanDefinition sourceDef = new RootBeanDefinition("org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
                    //实际eleSource == null
                    sourceDef.setSource(eleSource);
                    sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
                    //sourceName == org.springframework.transaction.annotation.AnnotationTransactionAttributeSource#0
                    String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
    
                    //Inteceptor的BeanDefinition
                    //beanName=org.springframework.transaction.interceptor.TransactionInterceptor#0
                    RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
                    interceptorDef.setSource(eleSource);
                    interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
                    //初始化时的重要信息,设置TransactionInterceptor的属性trancationManagerBeanName的beanName,初始化时依赖注入
                    //实际获取<tx:annotation-driven transaction-manager="" >中的transaction-manage属性,默认值:beanName = transactionManager
                    registerTransactionManager(element, interceptorDef);
                    //TransactionInterception.trancationAttributeSource = AnnotationTransactionAttributeSource
                    interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
                    //interceptorName = org.springframework.transaction.interceptor.TransactionInterceptor#0
                    String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
                    
                    //advisor的BeanDefinition
                    //beanName = org.springframework.transaction.config.internalTransactionAdvisor
                    RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
                    advisorDef.setSource(eleSource);
                    advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
                    //设置两个属性
                    //advisor.transactionAttributeSource = AnnotationTransactionAttributeSource的beanName
                    //advisor.adviceBeanName = TransactionInterceptor的beanName
                    advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
                    advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
                    if (element.hasAttribute("order")) {
                        advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
                    }
                    //将组件的BeanDefinition注册到DefaultListableBeanFactory中
                    parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
                    
                    CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
                    compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
                    compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
                    compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
                    parserContext.registerComponent(compositeDef);
                }
            }

    总结下组件的依赖关系:

    //Inteceptor
    TransactionInterceptor.trancationManagerBeanName = "transactionManager"
    TransactionInterceptor.transactionAttributeSource = AnnotationTransactionAttributeSource
    //advisor
    advisor.transactionAttributeSource = AnnotationTransactionAttributeSource
    advisor.advice = TransactionInterceptor 

    二、事务的AOP实现

    SpringAOP实现流程:

    ① <aop:aspectj-autoproxy />开启SpringAOP代理支持后,会初始化AOP相关组件AnnotationAwareAspectJAutoProxyCreator

    ② 所有的Bean在初始化的最后阶段都会调用AnnotationAwareAspectJAutoProxyCreator.postprocessAfterInitialzation,判断是否需要生成代理类

    ③ List<Advisor> candidateAdvisors = findCandidateAdvisors();查找所有Advisor,

    ④ List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);Bean是否是切面关注点的判断,从所有advisor保留对应的advisor,

    ⑤ advisor不为空则会创建代理实例proxy object,为空不创建代理,直接返回原实例 target object

    ⑥ 创建一个ProxyFactory实例记录advisor,将ProxyFactory放入到InvocationHandler实例(jdk动态代理对应JdkDynamicAopProxy)中

    ⑦ 方法调用时,通过动态代理实现,调用代理类的方法,实际是调用InvocationHandler.invoke()方法(jdk动态代理对应JDKDynamicAopProxy.invoke)

    ⑧ invocationHandler.invoke具体是实现是:从proxyFactory中找到对应的advisor,然后调用advisor.getAdvice获取具体的advice操作侯然先执行advice后执行target object的方法

    事务的实现流程是基于SpringAOP实现的。

    对应流程③查找所有的advisor,会找到BeanFactoryTransactionAttributeSourceAdvisor

    对应流程④Bean是否是切面关注点,根据BeanFactoryTransactionAttributeSourceAdvisor中的Pointcut判断,实际判断是否被@Transaction注解修饰

    /* org.springframework.transaction.interceptor.BeanFactoryTransactionAttributeSourceAdvisor#getPointcut */
        /*
        * BeanFactoryTransactionAttributeSourceAdvisor中的pointCut初始化
        */
        public Pointcut getPointcut() {
            return this.pointcut;
        }
    
        //pointCut实际是一个TransactionAttributeSourcePointcut实例
        private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
            @Override
            @Nullable
            protected TransactionAttributeSource getTransactionAttributeSource() {
                return transactionAttributeSource;
            }
        };
    
        protected TransactionAttributeSourcePointcut() {
            setClassFilter(new TransactionAttributeSourceClassFilter());
        }
    
        private class TransactionAttributeSourceClassFilter implements ClassFilter {
            //判断Bean是否是poincut的一个join point(切面关注点)
            @Override
            public boolean matches(Class<?> clazz) {
                if (TransactionalProxy.class.isAssignableFrom(clazz) ||
                        PlatformTransactionManager.class.isAssignableFrom(clazz) ||
                        PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
                    //三种基础类型直接返回false
                    return false;
                }
                //这里tas就是上面设置的AnnotationTrancationAttributeSource
                TransactionAttributeSource tas = getTransactionAttributeSource();
                return (tas == null || tas.isCandidateClass(clazz));
            }
        }

    AnnotationTrancationAttributeSource.isCandidateClass():

    /* org.springframework.transaction.annotation.AnnotationTransactionAttributeSource#isCandidateClass */
        public boolean isCandidateClass(Class<?> targetClass) {
            for (TransactionAnnotationParser parser : this.annotationParsers) {
                if (parser.isCandidateClass(targetClass)) {
                    return true;
                }
            }
            return false;
        }
    
        //annotationParsers初始化
        //annotationParsers.add(new SpringTransactionAnnotationParser())
        //系统支持jta12时annotationParsers.add(new SpringTransactionAnnotationParser())
        //系统支持ejb3时this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
        public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
            this.publicMethodsOnly = publicMethodsOnly;
            if (jta12Present || ejb3Present) {
                this.annotationParsers = new LinkedHashSet<>(4);
                this.annotationParsers.add(new SpringTransactionAnnotationParser());
                if (jta12Present) {
                    this.annotationParsers.add(new JtaTransactionAnnotationParser());
                }
                if (ejb3Present) {
                    this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
                }
            }
            else {
                this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
            }
        }
    
        //默认的SpringTransactionAnnotationParse.isCandidateClass
        //判断类中包含@Transactional注解,
        public boolean isCandidateClass(Class<?> targetClass) {
            return AnnotationUtils.isCandidateClass(targetClass, Transactional.class);
        }

    对应流程⑤ 被@Transactional注解修饰的类的Bean,advisor =  BeanFactoryTransactionAttributeSourceAdvisor,不为空创建代理实例

    对应流程⑥ 将advisor放入到创建ProxFactory实例的advisors容器中

    对应流程⑦ ⑧方法调用时获取BeanFactoryTransactionAttributeSourceAdvisor.getAdvice(),先执行advice.invoke,再执行target object.invoke。即开启事务执行sql提交(失败回退)。这里BeanFactoryTransactionAttributeSourceAdvisor.getAdvice()=TransactionInterceptor

    三、事务的执行(创建、提交、回滚)

    TransactionInterceptor.invoke()

    /* org.springframework.transaction.interceptor.TransactionInterceptor#invoke */
        public Object invoke(MethodInvocation invocation) throws Throwable {
            // target object可能为null,所以这里获取targetClass
            //如果存在targetObject时 targetClass = targetObject.getClass
            //如果不存在targetObject时 targetClass = null
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
            // 事务执行 
            // 第三个参数lamda表达式:invocation.proceed()
            return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
        }

    TransactionAspectSupport.invokeWithinTransaction()

        protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                final InvocationCallback invocation) throws Throwable {
    
            //获取初始化时的AnnotationTransactionAttributeSource
            TransactionAttributeSource tas = getTransactionAttributeSource();
            //① 获取事务属性==>@Transactional注解中的属性,生成一个new RuleBasedTransactionAttribute()
            //方法、类都有@Transactional时上优先级:方法 > 类
            //TransactionAttribute继承TransactionDefinition,定义事务属性的实例(与事务的关系,类似于BeanDefinition与Bean)
            final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
            //② 获取事务管理器,
            // 先从@Transactional注解中value属性获取,
            // 若value未设置,则<tx:annotationDriven/>中transaction-manager属性获取
            final TransactionManager tm = determineTransactionManager(txAttr);
    
            //③ transactionManager的两个分类
            //反应式事务管理器,顶级接口ReactiveTransactionManager,例如MongoDB
            //响应式事务管理器,顶级接口PlatformTransactionManager,常用的关系型数据库MySQL等
            if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
                //反应式事务管理器,是spring5.2之后加入的新支持,反正用到这个东西的公司我也进不了,直接跳过吧
                ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
                    if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
                        throw new TransactionUsageException(
                                "Unsupported annotated transaction on suspending function detected: " + method +
                                ". Use TransactionalOperator.transactional extensions instead.");
                    }
                    ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
                    if (adapter == null) {
                        throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                                method.getReturnType());
                    }
                    return new ReactiveTransactionSupport(adapter);
                });
                return txSupport.invokeWithinTransaction(
                        method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
            }
    
            //响应式事务管理器实现
    
            //如果transactionManager是PlatforTransactionManager的实现类,类型强转;如果不是,抛出异常
            PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
            //获取方法的id作为事务的名称
            //joinpointIdentification = method.getDeclaringClass()).getName() + '.' + method.getName()
            final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    
            if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
                //④ 非CallBackPreferringPlatformTransactionManager的TransactionManager的事务管理
                // ⑤ 开启事务
                TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    
                Object retVal;
                try {
                    // 事务的advice是一个 around advice
                    //⑥执行方法(执行sql)
                    retVal = invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                    // target invocation exception
                    //⑦ 异常回滚
                    completeTransactionAfterThrowing(txInfo, ex);
                    throw ex;
                }
                finally {
                    //⑧ 清理资源,ThreadLocal中的TransactionInfo
                    cleanupTransactionInfo(txInfo);
                }
    
                if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                    // Set rollback-only in case of Vavr failure matching our rollback rules...
                    TransactionStatus status = txInfo.getTransactionStatus();
                    if (status != null && txAttr != null) {
                        retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                    }
                }
                //⑨ 事务提交
                commitTransactionAfterReturning(txInfo);
                return retVal;
            }
    
            else {
                //⑩ CallbackPreferringPlatformTransactionManager
                final ThrowableHolder throwableHolder = new ThrowableHolder();
    
                // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
                try {
                    Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
                        TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
                        try {
                            Object retVal = invocation.proceedWithInvocation();
                            if (vavrPresent && VavrDelegate.isVavrTry(retVal)) {
                                // Set rollback-only in case of Vavr failure matching our rollback rules...
                                retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
                            }
                            return retVal;
                        }
                        catch (Throwable ex) {
                            if (txAttr.rollbackOn(ex)) {
                                // A RuntimeException: will lead to a rollback.
                                if (ex instanceof RuntimeException) {
                                    throw (RuntimeException) ex;
                                }
                                else {
                                    throw new ThrowableHolderException(ex);
                                }
                            }
                            else {
                                // A normal return value: will lead to a commit.
                                throwableHolder.throwable = ex;
                                return null;
                            }
                        }
                        finally {
                            cleanupTransactionInfo(txInfo);
                        }
                    });
    
                    // Check result state: It might indicate a Throwable to rethrow.
                    if (throwableHolder.throwable != null) {
                        throw throwableHolder.throwable;
                    }
                    return result;
                }
                catch (ThrowableHolderException ex) {
                    throw ex.getCause();
                }
                catch (TransactionSystemException ex2) {
                    if (throwableHolder.throwable != null) {
                        logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                        ex2.initApplicationException(throwableHolder.throwable);
                    }
                    throw ex2;
                }
                catch (Throwable ex2) {
                    if (throwableHolder.throwable != null) {
                        logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    }
                    throw ex2;
                }
            }
        }

    1、事务创建——①~⑤

    ① final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null) : 创建一个TransactionDefinition实例

    根据@Transactional注解,创建一个TransactionDefinition实例(事务的属性定义实例,与事务的关系类似于Bean与BeanDefinition)实际类型RuleBasedTransactionAttribute

    方法、类都有@Transactional注解时,优先级 :方法 > 类

    /* org.springframework.transaction.interceptor.AbstractFallbackTransactionAttributeSource#getTransactionAttribute */
        /* 
         * 缓存机制
         */
        public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
            if (method.getDeclaringClass() == Object.class) {
                //Object的方法,返回null
                return null;
            }
    
            // 缓存机制
            Object cacheKey = getCacheKey(method, targetClass);
            TransactionAttribute cached = this.attributeCache.get(cacheKey);
            if (cached != null) {
                // Value will either be canonical value indicating there is no transaction attribute,
                // or an actual transaction attribute.
                if (cached == NULL_TRANSACTION_ATTRIBUTE) {
                    //缓存是没有事务属性的单例,返回null
                    return null;
                }
                else {
                    return cached;
                }
            }
            else {
                // 没有缓存时,根据@Transactional注解生成事务属性
                TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
                // 缓存处理
                if (txAttr == null) {
                    this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
                }
                else {
                    String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
                    if (txAttr instanceof DefaultTransactionAttribute) {
                         //设置事务描述,后面其实以这个属性作为事务的名称
                         //transactionName = desciptor = method.getDeclaringClass().getName() + "." + method.getName()  
                        ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
                    }
                    this.attributeCache.put(cacheKey, txAttr);
                }
                return txAttr;
            }
        }
    
        /*
         * 由于代理类的存在,所以需要先找到具体被@Transaction注解的method(targetClass、superClass甚至interface上),然后解析@Transaction生成TransactionDefinition
         * @Transactional注解的优先级:当同时注解方法和类时,方法 > 类
         */
        protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
            if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
                //默认publicMethodsOnly == true,所以如果方法不是public修饰的,返回null
                return null;
            }
    
            //targetClass == null,specificMethod = method
            //targetClass != null,
            // 方法在targetClass声明或重写,
            // Jdk动态代理 specificMethod = method,CgLib动态代理specificMethod = superClass.method
            // 方法在不再targetClass中声明且重写,
            // public方法,specificMethod = method
            // 非public方法,specificMethod = superClass.method或者接口中的method
            Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
    
            // 如果方法被@Transactional注解,解析方法上的@Transactional生成事务属性实例
            TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
            if (txAttr != null) {
                return txAttr;
            }
    
            // 如果方法上没有@Transaction注解,
            // 解析方法的声明类(可能是targetClass,superClass,甚至interface)上的@Transaction生成事务属性实例
            txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
            if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
                return txAttr;
            }
    
            //如果上面txAttr都为空,
            if (specificMethod != method) {
                // 代理实例的method方法上的@Transactional注解,AspectJ
                txAttr = findTransactionAttribute(method);
                if (txAttr != null) {
                    return txAttr;
                }
                // 代理方法的Class类上的@Transactional注解,AspectJ
                txAttr = findTransactionAttribute(method.getDeclaringClass());
                if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
                    return txAttr;
                }
            }
    
            return null;
        }
    
    /*org.springframework.transaction.annotation.SpringTransactionAnnotationParser#parseTransactionAnnotation(java.lang.reflect.AnnotatedElement) */ 
        public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
            AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
                    element, Transactional.class, false, false);
            if (attributes != null) {
                return parseTransactionAnnotation(attributes);
            }
            else {
                return null;
            }
        }
    
        /*
         * 解析@Transactional生成一个TransactionDefinition实例(事务属性实例),实际类型:RuleBasedTransactionAttribute
         */
        protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
            RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
            Propagation propagation = attributes.getEnum("propagation");
            rbta.setPropagationBehavior(propagation.value());
            Isolation isolation = attributes.getEnum("isolation");
            rbta.setIsolationLevel(isolation.value());
            rbta.setTimeout(attributes.getNumber("timeout").intValue());
            rbta.setReadOnly(attributes.getBoolean("readOnly"));
            rbta.setQualifier(attributes.getString("value"));
    
            List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
            for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
                rollbackRules.add(new RollbackRuleAttribute(rbRule));
            }
            for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
                rollbackRules.add(new RollbackRuleAttribute(rbRule));
            }
            for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
                rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
            }
            for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
                rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
            }
            rbta.setRollbackRules(rollbackRules);
    
            return rbta;
        }

    ② final TransactionManager tm = determineTransactionManager(txAttr):获取事务管理器transactionManager

    • 从@Transactional注解中value属性获取,
    • value未设置,则<tx:annotationDriven/>中transaction-manager属性获取,默认transaction-manager="transactionManager"
    /* org.springframework.transaction.interceptor.TransactionAspectSupport#determineTransactionManager */
        protected TransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
            // Do not attempt to lookup tx manager if no tx attributes are set
            if (txAttr == null || this.beanFactory == null) {
                //事务属性为空 或 IOC容器为空时 返回null
                return getTransactionManager();
            }
    
            String qualifier = txAttr.getQualifier();
            if (StringUtils.hasText(qualifier)) {
                //返回@Transactional中指定的transactionManager
                // @Transactional(value="transactionManager")中的value属性,beanName = value
                return determineQualifiedTransactionManager(this.beanFactory, qualifier);
            }
            else if (StringUtils.hasText(this.transactionManagerBeanName)) {
                //@Transactional注解value为空时
                //返回<tx:annotation-driven transaction-manager=""/>中的transaction-manager属性,
                // beanName = transaction-manager设置的属性,默认为"transactionManager"
                return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
            }
            else {
                //这个分支基本不会走到,上面transactionManagerBeanName有默认值
                TransactionManager defaultTransactionManager = getTransactionManager();
                if (defaultTransactionManager == null) {
                    defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
                    if (defaultTransactionManager == null) {
                        defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
                        this.transactionManagerCache.putIfAbsent(
                                DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
                    }
                }
                return defaultTransactionManager;
            }
        }

    ③ transactionManager的两个分类

    • 反应式事务管理器,顶级接口ReactiveTransactionManager,例如MongoDB中事务管理器
    • 响应式事务管理器,顶级接口PlatformTransactionManager,例如常用的关系型数据库MySQL等
    if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
                //反应式事务管理器,是spring5.2之后加入的新支持,反正会用到这个东西的公司我也进不了,直接跳过吧
                ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
                    if (KotlinDetector.isKotlinType(method.getDeclaringClass()) && KotlinDelegate.isSuspend(method)) {
                        throw new TransactionUsageException(
                                "Unsupported annotated transaction on suspending function detected: " + method +
                                ". Use TransactionalOperator.transactional extensions instead.");
                    }
                    ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
                    if (adapter == null) {
                        throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " +
                                method.getReturnType());
                    }
                    return new ReactiveTransactionSupport(adapter);
                });
                return txSupport.invokeWithinTransaction(
                        method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
            }

    ④  响应式事务管理器种又细分为两类:

    • 非CallbackPreferringPlatformTransactionManager的事务管理器,
    • CallbackPreferringPlatformTransactionManager

    接下来的⑤-⑨属于第一种事务管理器的事务处理,以常用的DataSourceTransactionManager为例

    ⑤ TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification):创建事务详细信息实例TransactionInfo

    • new TransactionInfo(transactionManager,transactionDefinish,transactionName):记录当前事务的事务管理器、事务属性、事务名称,事务管理器和事务属性前面已经确定了,事务名称是method的全限定名,
    • 返回一个TransactionStatus实例,txInfo.newTransactionStatus(transactionStatus):记录事务的事务状态
    • txInfo.bindToThread():将事务与当前线程绑定,通过ThreadLocal(TransactionAspectSupport.transactionInfoHandler)实现,将事务存储到当前Thread实例中
    /* org.springframework.transaction.interceptor.TransactionAspectSupport#createTransactionIfNecessary */
        protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    
            // @Transactional ==> new RuleBasedTransactionAttribute时没有设置name属性
            //所以 事务name = joinpointIdentification = method.getDeclaringClass()).getName() + '.' + method.getName()
            if (txAttr != null && txAttr.getName() == null) {
                txAttr = new DelegatingTransactionAttribute(txAttr) {
                    @Override
                    public String getName() {
                        return joinpointIdentification;
                    }
                };
            }
    
            TransactionStatus status = null;
            if (txAttr != null) {
                if (tm != null) {
                    //返回一个TransactionStatus实例,记录事务状态
                    status = tm.getTransaction(txAttr);
                }
                else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                                "] because no transaction manager has been configured");
                    }
                }
            }
            //txInfo = new TransactionInfo(tm,txAttr,joinpointIdentification)
            //txInfo.newTransactionStatus(status);
            //txInfo.bindToThread(); //将事务丢到线程中ThreadLocal(TransactionAspectSupport.transactionInfoHandler)实现
    //return txInfo
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }

    status = tm.getTransaction(txAttr):返回一个TransactionStatus实例,这里是应该是Spring事务最复杂的一部分。

    • 事务connection的创建,以及事务与非事务connection的属性区别(自动提交与手动提交),事务connection是怎么传递到MyBatis框架中的(ThreadLocal)
    • 事务隔离级别等属性校验及设置
    • spring特有的7种事务传播行为的实现
    /* org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction */
        public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
                throws TransactionException {
    
            // @Transactional ==> new RuleBasedTransactionAttribute
            //这里 def = RuleBasedTransactionAttribute
            TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    
            //以DataSourceTransactionManager为例
            //创建一个DataSourceTransactionObject实例,主要记录两个属性:
            // SavepointAllowed:是否支持保存点,保存点可用于事务嵌套
            //connectionHolder:事务连接,从当前线程中获取数据库连接,ThreadLocal实现
            // 由于是从当前线程中获取连接,所以初次开启事务,conncetionHolder == null
            // 事务嵌套时 connectionHolder != null,也就有了事务的传播处理
            Object transaction = doGetTransaction();
            boolean debugEnabled = logger.isDebugEnabled();
    
            if (isExistingTransaction(transaction)) {
                // 根据connectionHolder.isTransactionActive()判断当前线程是否已存在事务时,
                // 当前线程已存在事务,根据事务传播级别,进行相应处理并返回相应的TransactionStatus
                return handleExistingTransaction(def, transaction, debugEnabled);
            }
    
            //超时时间不能小于-1
            if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
                throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
            }
    
            // 传播级别为mandatory且当前线程不存在事务时,直接报错
            if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                throw new IllegalTransactionStateException(
                        "No existing transaction found for transaction marked with propagation 'mandatory'");
            }
            //传播级别为required、required_new、nested且当前线程不存在事务时,创建一个新的事务
            else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                    def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                SuspendedResourcesHolder suspendedResources = suspend(null);
                if (debugEnabled) {
                    logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
                }
                try {
                    //① 初始化一个TransactionStatus实例
                    //② 创建一个"事务连接",并放入到线程中,ThreadLocal实现,TransactionSynchronizationMananger.resource
                    //③ 如果开启事务同步,记录5个属性到线程中,ThreadLocal实现,TransactionSynchronizationManager的其他五个ThreadLocal变量
                    return startTransaction(def, transaction, debugEnabled, suspendedResources);
                }
                catch (RuntimeException | Error ex) {
                    resume(null, suspendedResources);
                    throw ex;
                }
            }
            //传播级别supports、not_supports、never且当前线程不存在事务时,以非事务方式执行
            else {
                // Create "empty" transaction: no actual transaction, but potentially synchronization.
                if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                    logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                            "isolation level will effectively be ignored: " + def);
                }
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                //① 初始化一个TransactionStatus实例
                //② 与上面不同,这里没有创建一个"事务连接",及以非事务方式执行
                //③ 如果开启事务同步(newSynchronization == ture),
                // 记录5个属性到线程中,ThreadLocal实现,TransactionSynchronizationManager的其他五个ThreadLocal变量
                return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
            }
        }
    Object transaction = doGetTransaction():从当前线程中获取事务连接。
    /* org.springframework.jdbc.datasource.DataSourceTransactionManager#doGetTransaction */
        protected Object doGetTransaction() {
            //创建一个DataSourceTransactionObject实例
            DataSourceTransactionObject txObject = new DataSourceTransactionObject();
            //txObject是否允许保存点,事务嵌套
            txObject.setSavepointAllowed(isNestedTransactionAllowed());
            //从当前线程中获取事务连接,ThreadLocal实现
            //ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
            //connectionHolder = Thread.currentThread().getThreadLocalMap.get("Transactional resources").get(datasource)
            ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
            txObject.setConnectionHolder(conHolder, false);
            return txObject;
        }

    接下来就分为两种场景:

    • 当前线程不存在事务时
    • 当前线程存在事务时

    (1) 当前线程不存在事务时:

    • 检查超时时间不能小于-1
    • 传播级别为mandatory时抛出异常
    • 传播级别为required、required_new、nested时,创建一个新事务。
    • 传播级别为supported、not_supported、never时,不创建事务,以非事务方式执行

    (2) 当前线程存在事务时:

    • 传播级别为never时,抛出异常
    • 传播级别为not_supported时,暂停当前线程事务,以非事务方式执行,执行完后,唤醒原事务
    • 传播级别为required_new时,暂停当前线程事务,创建一个新事务执行,执行完后,唤醒原事务
    • 传播级别为nested时,嵌套事务执行
    • 传播级别为required、supported、mandatory时,隔离级别不同,抛出异常,隔离级别相同,加入到当前线程事务中执行。

    总结上面场景,有五个实现需要弄清楚

    • 创建新事务
    • 不创建事务
    • 暂停当前事务,创建新事务/不创建事务,执行完后还原原事务,怎么实现还原的
    • 创建新事务,嵌套执行,怎样实现嵌套执行的
    • 不暂停当前事务,不创建事务,将当前逻辑加入到原事务中,怎样加入到当前事务

    a、创建新事务:startTransaction(def, transaction, debugEnabled, suspendedResources);

    • 初始化一个TransactionStatus实例;
    • 创建一个事务连接放入到线程中——spring使用datasource获取事务连接,然后放入到线程中;mybatis执行事务时的connection从线程中取的,而不是datasource获取的。
    • 如果开启事务同步,记录5个属性到线程中
        private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
                boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            //初始化TransactionStatus实例(记录事务状态信息)
            //this.transaction = transaction;  //txObject
            //this.newTransaction = newTransaction; //是否新事务
            //this.newSynchronization = newSynchronization;//
            //this.readOnly = readOnly;//@Transactional中readOnly属性,是否只读事务
            //this.debug = debug;//日志级别
            //this.suspendedResources = suspendedResources;
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            //connection+transaction的一些操作
            // 新连接,建立连接设置传播级别、隔离级别、是否只读事务、超时timeout、
            // 通过ThreadLocal将connection放入到线程中,
            doBegin(transaction, definition);
            // 如果开启事务同步,设置一部分属性到线程中,通过ThreadLocal记录
            // TransactionSynchronizationManager中的5个ThreadLocal
            // actualTransactionActive:当前线程是否存在事务txObject
            // currentTransactionIsolationLevel:当前线程中事务的隔离级别
            // currentTransactionReadOnly:当前线程中事务是否只读事务
            // currentTransactionName:当前线程中事务的事务名称
            // synchronizations :当前线程是否
            prepareSynchronization(status, definition);
            return status;
        }

    doBegin(transaction, definition):创建一个事务连接,(事务连接与非事务连接区别)

    • 初始化一个数据库连接,设置TransactionDefinition中的传播级别+隔离级别+只读+超时

    • connection自动提交切换为手动提交(事务连接与普通连接的本质区别吧)

    • connection放入到线程中(ThreadLocal实现)

        protected void doBegin(Object transaction, TransactionDefinition definition) {
            DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
            Connection con = null;
    
            try {
                if (!txObject.hasConnectionHolder() ||
                        txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                    //当前线程没有连接 或者 有链接但事务与连接同步,需要新建连接
                    Connection newCon = obtainDataSource().getConnection();
                    if (logger.isDebugEnabled()) {
                        logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                    }
                    txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
                }
    
                txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
                con = txObject.getConnectionHolder().getConnection();
    
                //设置connection.setReadOnly(readOnly):@Transactional中的readOnly属性,默认false
                //设置connection.setIsolationLevel(isolatianLevel):@Transaction中的isolation属性,默认default
                //default不是返回-1,返回null,其他隔离级别返回对应的Integer值
                Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
                //txObject设置事务隔离级别+是否只读事务
                txObject.setPreviousIsolationLevel(previousIsolationLevel);
                txObject.setReadOnly(definition.isReadOnly());
    
                if (con.getAutoCommit()) {
                    //MySQL默认自动提交,创建数据源时,也是配置的自动提交
                    //开启事务后,需要切换到手动提交
                    txObject.setMustRestoreAutoCommit(true);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                    }
                    con.setAutoCommit(false);
                }
    
                //readOnly == true时,只读事务,这里执行sql "set transaction read only"
                prepareTransactionalConnection(con, definition);
                //设置connectionHolder属性,正处于事务中
                txObject.getConnectionHolder().setTransactionActive(true);
    
                //设置超时时间,默认TransactionDefinition.TIMEOUT_DEFAULT,采用数据库的超时时间
                //不采用默认时间时,connectionHolder记录超时时间
                int timeout = determineTimeout(definition);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                    txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
                }
    
    
                if (txObject.isNewConnectionHolder()) {
                    // connection是新连接,将connectionHolder放入到线程中
                    // ThreadLocal实现,TransactionSynchronizationManager.resources
                    //泛型是ThreadLocal<Map<dataSource, connectionHolder>>
                    //存储格式currentThread.getThreadLocalMap.get(resources).put(dataSource,connectionHolder)
                    TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
                }
            }
    
            catch (Throwable ex) {
                if (txObject.isNewConnectionHolder()) {
                    //新连接创建失败时,释放连接,清除connectionHolder信息然后抛出错误
                    DataSourceUtils.releaseConnection(con, obtainDataSource());
                    txObject.setConnectionHolder(null, false);
                }
                throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
            }
        }

    b、不创建事务,以非事务执行 :prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);

    • 初始化一个TransactionStatus实例;
    • 不会创建事务连接——Mybatis非事务执行时的connection,不是在线程中取的,而是直接从datasource获取的。
    • 如果开启事务同步,记录5个属性到线程中
        protected final DefaultTransactionStatus prepareTransactionStatus(
                TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
                boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
            //初始化TransactionStatus实例(记录事务状态信息)
            //this.transaction = transaction;  //txObject
            //this.newTransaction = newTransaction; //是否新事务
            //this.newSynchronization = newSynchronization;//
            //this.readOnly = readOnly;//@Transactional中readOnly属性,是否只读事务
            //this.debug = debug;//日志级别
            //this.suspendedResources = suspendedResources;
            DefaultTransactionStatus status = newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
            // 如果开启事务同步,设置一部分属性到线程中,通过ThreadLocal记录
            // TransactionSynchronizationManager中的5个ThreadLocal
            // actualTransactionActive:当前线程是否存在事务txObject
            // currentTransactionIsolationLevel:当前线程中事务的隔离级别
            // currentTransactionReadOnly:当前线程中事务是否只读事务
            // currentTransactionName:当前线程中事务的事务名称
            // synchronizations :当前线程是否
            prepareSynchronization(status, definition);
            return status;
        }

    c、暂停事务后,创建新事务/不创建事务,执行完后还原事务,以not_supported隔离级别为例

    /* org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction */
        private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {
         //...
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
                //传播级别为not_supported且当前线程已存在事务时,暂停事务,以非事务方式执行完后,还原事务
                if (debugEnabled) {
                    logger.debug("Suspending current transaction");
                }
                //清除线程中的事务信息(ThreadLocal.remove)
                //记录清除的事务信息到TransactionStatus的suspendedResources属性中,
                //非事务执行完后,拿出suspendedResources继续执行事务
                Object suspendedResources = suspend(transaction);
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                //2中以非事务方式执行的实现
                return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
            }
            //...
        }

    事务还原——保存当前线程事务:Object suspendedResources = suspend(transaction);

    d、事务嵌套

    /* org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction */
        private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {
            //...
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                //传播级别为nested且当前线程已存在事务时,嵌套事务执行
                if (!isNestedTransactionAllowed()) {
                    throw new NestedTransactionNotSupportedException(
                            "Transaction manager does not allow nested transactions by default - " +
                            "specify 'nestedTransactionAllowed' property with value 'true'");
                }
                if (debugEnabled) {
                    logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                }
                if (useSavepointForNestedTransaction()) {
                    // 使用保存点实现事务嵌套
                    DefaultTransactionStatus status =
                            prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                    status.createAndHoldSavepoint();
                    return status;
                }
                else {
                    // 直接通过嵌套begin-commit-rollback实现
                    return startTransaction(definition, transaction, debugEnabled, null);
                }
            }
            //...
        }

    e、加入到事务中,其实就是不开启事务,mybatis直接使用原事务的连接执行新sql。

    /* org.springframework.transaction.support.AbstractPlatformTransactionManager#handleExistingTransaction */
    private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {
         //...
            // 传播级别为nrequired、supported、mandatory级别且当前线程已存在事务时,
            // 隔离级别相同时,加入到事务中执行
            // 隔离级别不同时,抛出异常
            if (isValidateExistingTransaction()) {
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                    Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                    if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                        Constants isoConstants = DefaultTransactionDefinition.constants;
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                (currentIsolationLevel != null ?
                                        isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                        "(unknown)"));
                    }
                }
                if (!definition.isReadOnly()) {
                    if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] is not marked as read-only but existing transaction is");
                    }
                }
            }
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
        }

    2、事务的提交与回滚

    ⑦ 异常回滚:completeTransactionAfterThrowing(txInfo,ex)

    看异常回滚之前:现弄清楚@Transactional有关异常的几个参数

    • rollbackFor:指定哪些异常(包括子类)必须导致事务回滚,类型指定。例如rollbackFor=NullPointerException;注意除指定异常必须回滚外,RuntimeException || Error也会回滚
    • rollbackForClassName:指定哪些异常(包括子类)必须导致事务回滚,名称指定(类名或全限定名)。例如rollbackFor="NullPointerException"或者"java.lang.NullPointerException"
    • noRollbackFor:指定哪些异常(包括子类)不会导致事务回滚,类型指定。
    • noRollbackForCalssName:指定哪些异常(包括子类)不会导致事务回滚,名称指定。

    对应前面解析@Transactional时代码,然后看异常回滚的实现completeTransactionAfterThrowing(txInfo,ex)

    /* org.springframework.transaction.annotation.SpringTransactionAnnotationParser#parseTransactionAnnotation(org.springframework.core.annotation.AnnotationAttributes) */
            //RollbackRuleAttribute对应必须回滚的异常
            //NoRollbackRuleAttribute对应不需要回滚的异常
            for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
                rollbackRules.add(new RollbackRuleAttribute(rbRule));
            }
            for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
                rollbackRules.add(new RollbackRuleAttribute(rbRule));
            }
            for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
                rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
            }
            for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
                rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
            }
            rbta.setRollbackRules(rollbackRules);
    
    /* org.springframework.transaction.interceptor.TransactionAspectSupport#completeTransactionAfterThrowing */
        protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
            if (txInfo != null && txInfo.getTransactionStatus() != null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                            "] after exception: " + ex);
                }
                if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                    //TransactionDefinition不为空,且出现rollbackFor,rollbackForClassName+RuntimeException+Error异常时回滚
                    try {
                        txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                    }
                    catch (TransactionSystemException ex2) {
                        logger.error("Application exception overridden by rollback exception", ex);
                        ex2.initApplicationException(ex);
                        throw ex2;
                    }
                    catch (RuntimeException | Error ex2) {
                        logger.error("Application exception overridden by rollback exception", ex);
                        throw ex2;
                    }
                }
                else {
                    // NoRollbackFor+NoRollbackForClassName指定的异常,不会回滚,依然提交
                    // 但当TransactionStatus.isRollbackOnly()==true时,会回滚
                    try {
                        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                    }
                    catch (TransactionSystemException ex2) {
                        logger.error("Application exception overridden by commit exception", ex);
                        ex2.initApplicationException(ex);
                        throw ex2;
                    }
                    catch (RuntimeException | Error ex2) {
                        logger.error("Application exception overridden by commit exception", ex);
                        throw ex2;
                    }
                }
            }
        }

    ⑧ 资源清理:cleanupTransactionInfo(txInfo):清除线程中的TranscationInfo实例

        protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
            if (txInfo != null) {
                            //(ThreadLocal)清除线程中的TransactionInfo 
                txInfo.restoreThreadLocalStatus();
            }
        }
    
        private void restoreThreadLocalStatus() {
            //oldTransactionInfo 默认 == null
            transactionInfoHolder.set(this.oldTransactionInfo);
        }

    ⑨ 事务提交:commitTransactionAfterReturning(txInfo),无异常正常返回时,事务提交

        protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
            if (txInfo != null && txInfo.getTransactionStatus() != null) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
                }
                //无异常时,事务提交
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
            }
        }

    补充:

    1、事务连接创建时,会在当前线程中存储connection+transactionSynchronization信息,所以

    事务提交和事务回滚最终都会清理线程中保存的事务信息(finally中)并唤醒暂停的事务:cleanupAfterCompletion(status)

    private void cleanupAfterCompletion(DefaultTransactionStatus status) {
            //设置事务状态,结束
            status.setCompleted();
            if (status.isNewSynchronization()) {
                //清理线程中事务同步器信息,5个ThreadLocal
                //synchronizations.remove();
                //currentTransactionName.remove();
                //currentTransactionReadOnly.remove();
                //currentTransactionIsolationLevel.remove();
                //actualTransactionActive.remove();
                TransactionSynchronizationManager.clear();
            }
            if (status.isNewTransaction()) {
                //如果是一个新事务(无事务传播行为)
                //重置connection中传播级别+隔离级别+可读事务标志+超时时间+autoCommit(true),并将连接归还给datasource
                //(重置连接属性,以便释放到数据库连接池中复用)
                //清除线程中的connection,(ThreadLocal)resources.remove()
                doCleanupAfterCompletion(status.getTransaction());
            }
            if (status.getSuspendedResources() != null) {
                if (status.isDebug()) {
                    logger.debug("Resuming suspended transaction after completion of inner transaction");
                }
                Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
                //还原暂停的事务
                resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
            }
        }

    2、Transactional的所有参数含义:

    //可注解在Class上,也可注解在method上
    @Target({ElementType.TYPE, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    @Documented
    public @interface Transactional {
        //事务的事务管理器,默认""
        @AliasFor("transactionManager")
        String value() default "";
    
        //事务的事务管理器,默认""
        @AliasFor("value")
        String transactionManager() default "";
    
        //事务的传播行为,默认required
        Propagation propagation() default Propagation.REQUIRED;
    
        //事务的隔离级别,默认default(数据库的隔离级别)
        Isolation isolation() default Isolation.DEFAULT;
    
        //事务超时时间,默认timeout_default(数据库的超时时间)
        int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;
    
        //是否只读事务,默认false(读写事务)
        boolean readOnly() default false;
    
        //必须回滚的异常类型数组,默认{} || RuntimeException || Error
        Class<? extends Throwable>[] rollbackFor() default {};
    
        //必须回滚的异常类型ClassName的数组,默认{} || RuntimeException || Error
        String[] rollbackForClassName() default {};
    
        //不需回滚的异常类型数组,默认为{}
        Class<? extends Throwable>[] noRollbackFor() default {};
    
        //不需回滚的异常类型ClassName的数组,默认为{}
        String[] noRollbackForClassName() default {};
    
    }

     3、TransactionSynchronizationManager中的六个ThreadLocal变量

    public abstract class TransactionSynchronizationManager {
    
        //当前线程ThreadLocalMap中—— <"Transactional resources",Map<datasource,connectionHolder>>
        private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");
        //??
        private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
        //<"Current transaction name",transactionName>
        private static final ThreadLocal<String> currentTransactionName = new NamedThreadLocal<>("Current transaction name");
        //<"Current transaction read-only status",readOnly>
        private static final ThreadLocal<Boolean> currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
        //<"Current transaction isolation level",isolationLevel>
        private static final ThreadLocal<Integer> currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
        //<"Actual transaction active",是否存在transaction实例:txObject>
        private static final ThreadLocal<Boolean> actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
  • 相关阅读:
    js中用setTimeout写定时炸弹
    javascript函数的形参和实参
    在Mac OS X中配置Apache+PHP+MySQL(转)
    600多万用户密码使用次数统计分析程序
    pgsql数据库创建以及压缩包形式导入导出
    apache中的.htaccess配置示例收集整理
    收集常用的正则表达式及其应用
    采用htpasswd设置验证机制
    用apache做代理时候,可设置请求头信息
    ubuntu在混合环境中创建和配置文件共享
  • 原文地址:https://www.cnblogs.com/wqff-biubiu/p/12540147.html
Copyright © 2011-2022 走看看