zoukankan      html  css  js  c++  java
  • Spring源码之spring事务

    Spring事务

    spring声明式事务让我们从复杂的事务处理中得以脱身,我们可以不再去关注获得、关闭连接、事务提交、和回滚操作;简单来说事务可以做到在发生异常时进行回滚。

    事务自定义标签

    自定义标签

    spring事务的开关配置是:tx:annotation-driven/ ,全局搜索:"annotation-driven",可以找到类:

        public class TxNamespaceHandler extends NamespaceHandlerSupport
    
        @Override
    	public void init() {
    		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
    		// 程序入口 
    		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
    		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
    	}
    

    orgspringframework ransactionconfigAnnotationDrivenBeanDefinitionParser.java 然后我们关注parse方法

    public BeanDefinition parse(Element element, ParserContext parserContext) {
    		registerTransactionalEventListenerFactory(parserContext);
    		String mode = element.getAttribute("mode");
    		if ("aspectj".equals(mode)) {// 配置mode属性可以实现通过AOP织入事务 
    			// mode="aspectj"
    			registerTransactionAspect(element, parserContext);
    		}
    		else {
    			// mode="proxy"
    			// 根据配置文件定义工具类的 beanDefinition 并注册 
    			AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
    		}
    		return null;
    	}
    

    解析标签

    找到真正的入口: AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);

    public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
    			// 注册 或者升级 InfrastructureAdvisorAutoProxyCreator 它实现了接口 BeanPostProcessor,
    			// (最本质的逻辑是:通过 registry 注册了该后处理器,当从 beanFactory 依据 registory 初始化一个bean的时候,会调用该后处理器,对该 bean 进行事务增强)
    			// 保证 bean 实例化时会调用后处理器的 postProcessBeforeInitialization 方法。
    			AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
    			// beanName ?? 
    			String txAdvisorBeanName = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME;
    			// 判断默认的beanName是否已经解析
    			if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
    				Object eleSource = parserContext.extractSource(element);
    
    				// Create the TransactionAttributeSource definition.
    				// 创建事务属性 beanDefinition 配置 ??
    				RootBeanDefinition sourceDef = new RootBeanDefinition(
    						"org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
    				sourceDef.setSource(eleSource);
    				sourceDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    				// 注册 
    				String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
    
    				// Create the TransactionInterceptor definition.
    				// 拦截器 ?? 实际调用时调用其invoke方法 
    				RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
    				interceptorDef.setSource(eleSource);
    				interceptorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    				registerTransactionManager(element, interceptorDef);
    				interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
    				String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);// 注冊
    
    				// Create the TransactionAttributeSourceAdvisor definition.
    				// 切点的 beanDefinition       PointcutAdvisor <- AbstractPointcutAdvisor <- AbstractBeanFactoryPointcutAdvisor <- BeanFactoryTransactionAttributeSourceAdvisor 
    				RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
    				advisorDef.setSource(eleSource);
    				advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    				advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
    				advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
    				if (element.hasAttribute("order")) {// 是否配置了order属性 
    					advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
    				}
    				parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);// 注册
    
    				CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
    				compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
    				compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
    				compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
    				parserContext.registerComponent(compositeDef);
    			}
    		}
    

    上述代码的主要工作是注册了三个类:

    1. 注册:InfrastructureAdvisorAutoProxyCreator
    AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
    
    
    public static void registerAutoProxyCreatorIfNecessary(
    			ParserContext parserContext, Element sourceElement) {
    
    		BeanDefinition beanDefinition = AopConfigUtils.registerAutoProxyCreatorIfNecessary(
    				parserContext.getRegistry(), parserContext.extractSource(sourceElement));
    		useClassProxyingIfNecessary(parserContext.getRegistry(), sourceElement);
    		registerComponentIfNecessary(beanDefinition, parserContext);
    	}
    
    @Nullable
    	public static BeanDefinition registerAutoProxyCreatorIfNecessary(
    			BeanDefinitionRegistry registry, @Nullable Object source) {
    
    		// 注册或升级
    		return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
    	}
    
    

    InfrastructureAdvisorAutoProxyCreator.java类图
    在这里插入图片描述

    查看它的类图,很快能找打我们需要重点关注的接口:BeanPostProcessor,看到它,就代表后处理器它来了

    public interface BeanPostProcessor {
    	default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    		return bean;
    	}
    	default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    		return bean;
    	}
    }
    

    InfrastructureAdvisorAutoProxyCreator 它实现了接口 BeanPostProcessor,保证 bean 初始化时会调用后处理器中定义的方法。
    (最本质的逻辑是:如果在容器组件中注册了该后处理器,当从 beanFactory 初始化一个bean的时候,就会调用该后处理器,对需要初始化 bean 进行事务增强)

    1. 其中有两个类被注册到了类 BeanFactoryTransactionAttributeSourceAdvisor 中,从它的类图中我们可以看到它继承自:Advisor 故事即将开始;

    BeanFactoryTransactionAttributeSourceAdvisor.java 类图
    在这里插入图片描述

    以它为根基通过AOP增强的方式最终实现事务功能,它以beanName:org.springframework.transaction.config.internalTransactionAdvisor 注册到了容器组件中。

    • 被注册的两个类之一是:AnnotationTransactionAttributeSource 注册的beanName 为:transactionAttributeSource ,(它是增强和方法是否匹配的实际处理类,后续会提到它)

    AnnotationTransactionAttributeSource.java类图

    在这里插入图片描述

    • 另一个类被注册到BeanFactoryTransactionAttributeSourceAdvisor 中的类是:TransactionInterceptor,注册的beanName为:adviceBeanName;
      TransactionInterceptor.java类图:
      在这里插入图片描述

    从它的类图我们可以找到,它实现了接口:MethodInterceptor,看到这个接口我们就该想到方法 invoke();

    bean 的初始化

    InfrastructureAdvisorAutoProxyCreator

    注册的三个类中,第一个登上舞台的是类 InfrastructureAdvisorAutoProxyCreator
    跟随类 InfrastructureAdvisorAutoProxyCreator的继承体系解构,在它上游的类:AbstractAutoProxyCreator 中发现了如下的定义:
    很容发现了我们的老朋友:wrapIfNecessary() 自打学习 AOP 我们就认识了它

    // 初始化后  后处理器<增强>
    	public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) throws BeansException {
    		if (bean != null) {
    			Object cacheKey = getCacheKey(bean.getClass(), beanName);
    			if (this.earlyProxyReferences.remove(cacheKey) != bean) {// 避免循环依赖,提前暴露 ??
    				// 如果需要增强  则需要封装指定的 bean (代理该bean) 
    				return wrapIfNecessary(bean, beanName, cacheKey);
    			}
    		}
    		return bean;
    }
    

    进入该方法的逻辑:

    // 要么返回bean 要么返回被代理后的bean-proxy
    	protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    		if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {
    			return bean;// 已经处理过 返回 
    		}
    		// advisedBeans : 需要被增强的
    		if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {
    			return bean;// 不需要增强返回
    		}
    		// 不需要被代理的类:基础类,或者设置了不需代理的类
    		if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {
    			this.advisedBeans.put(cacheKey, Boolean.FALSE);
    			return bean;
    		}
    
    		// Create proxy if we have advice.
    		// 获取所有需要被增强的方法 ?? 获取增强切点,(且适用) 
    		Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
    		if (specificInterceptors != DO_NOT_PROXY) {// 存在增强方法,则创建代理 
    			this.advisedBeans.put(cacheKey, Boolean.TRUE);
    			// 对获取到的需要增强的方法进行代理 
    			Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));// 单例 
    			this.proxyTypes.put(cacheKey, proxy.getClass());
    			return proxy;
    		}
    
    		this.advisedBeans.put(cacheKey, Boolean.FALSE);
    		return bean;
    	}
    

    获取增强方法

    我们在类AbstractAdvisorAutoProxyCreator:中找到了获取增强方法的定义

    protected Object[] getAdvicesAndAdvisorsForBean(
    			Class<?> beanClass, String beanName, @Nullable TargetSource targetSource) {
    
    		// 根据 Class 以及 beanName 获取增强,(且适用)
    		List<Advisor> advisors = findEligibleAdvisors(beanClass, beanName);
    		if (advisors.isEmpty()) {
    			return DO_NOT_PROXY;
    		}
    		return advisors.toArray();
    	}
    

    在大体实现上根AOP 基本类似:

    protected List<Advisor> findEligibleAdvisors(Class<?> beanClass, String beanName) {
    		// AnnotationAwareAspectJAutoProxyCreator 类覆盖了该方法 
    		List<Advisor> candidateAdvisors = findCandidateAdvisors();// 获取所有的增强 (所有  拦截||切点???)
    		List<Advisor> eligibleAdvisors = findAdvisorsThatCanApply(candidateAdvisors, beanClass, beanName);// 寻找适用于当前 Bean 的增强 
    		extendAdvisors(eligibleAdvisors);
    		if (!eligibleAdvisors.isEmpty()) {
    			eligibleAdvisors = sortAdvisors(eligibleAdvisors);
    		}
    		return eligibleAdvisors;
    	}
    

    在获取所有增强的 findCandidateAdvisors() 方法内部找到了如下代码:

    BeanFactoryUtils.beanNamesForTypeIncludingAncestors(this.beanFactory, Advisor.class, true, false);
    

    我们可看到,它提取了所有的实现了 Advisor 接口的类,到这里我们会议一下,解析事务开关标签的时候,是不是注册了三个类,其中有两个类就注册到了:BeanFactoryTransactionAttributeSourceAdvisor
    而它的继承体系中刚好就有:Advisor,自此我们注册的另外两个类将会通过 BeanFactoryTransactionAttributeSourceAdvisor 一一登录舞台。

    获取所有增强中内适用于当前方法的增强

    跟随方法 findAdvisorsThatCanApply()以及参数candidateAdvisors,找到如下方法:

    public static List<Advisor> findAdvisorsThatCanApply(List<Advisor> candidateAdvisors, Class<?> clazz) {
    		if (candidateAdvisors.isEmpty()) {
    			return candidateAdvisors;
    		}
    		List<Advisor> eligibleAdvisors = new ArrayList<>();
    		// 处理引介增强
    		for (Advisor candidate : candidateAdvisors) {
    			if (candidate instanceof IntroductionAdvisor && canApply(candidate, clazz)) {
    				eligibleAdvisors.add(candidate);// 适合 ?? 拦截条件成立??
    			}
    		}
    		boolean hasIntroductions = !eligibleAdvisors.isEmpty();
    		for (Advisor candidate : candidateAdvisors) {
    			// 跳过引介增强(已处理)
    			if (candidate instanceof IntroductionAdvisor) {
    				// already processed
    				continue;
    			}
    			// 普通 bean 处理
    			if (canApply(candidate, clazz, hasIntroductions)) {
    				eligibleAdvisors.add(candidate);
    			}
    		}
    		return eligibleAdvisors;
    	}
    

    根据 BeanFactoryTransactionAttributeSourceAdvisor 的类图我们可以很明确知道,它的继承结构中并没有:IntroductionAdvisor,那么进入下面的逻辑:
    canApply() 方法判断增强是否适用当前方法:

    public static boolean canApply(Advisor advisor, Class<?> targetClass, boolean hasIntroductions) {
    		if (advisor instanceof IntroductionAdvisor) {
    			return ((IntroductionAdvisor) advisor).getClassFilter().matches(targetClass);
    		}
    		else if (advisor instanceof PointcutAdvisor) {
    			PointcutAdvisor pca = (PointcutAdvisor) advisor;
    			return canApply(pca.getPointcut(), targetClass, hasIntroductions);
    		}
    		else {
    			// It doesn't have a pointcut so we assume it applies.
    			return true;
    		}
    	}
    

    很明显 BeanFactoryTransactionAttributeSourceAdvisor 继承自:PointcutAdvisor

    else if (advisor instanceof PointcutAdvisor) {
    			PointcutAdvisor pca = (PointcutAdvisor) advisor;
    			return canApply(pca.getPointcut(), targetClass, hasIntroductions);
    		}
    

    跟随上述几行代码,我在类 BeanFactoryTransactionAttributeSourceAdvisor 类中找到了getPointcut() 方法:

    private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
    		@Override
    		@Nullable
    		protected TransactionAttributeSource getTransactionAttributeSource() {
    			return transactionAttributeSource; 
    		} 
    };
    

    定睛一看:你说巧不巧,它返回的:TransactionAttributeSourcePointcut,实现了一个回调 getTransactionAttributeSource()方法,
    它返回的就是 transactionAttributeSource。回到开始,解析事务标签注册了三个类,两个被注册到:BeanFactoryTransactionAttributeSourceAdvisor
    transactionAttributeSource 就是那二者之一,它对应的bean就是:AnnotationTransactionAttributeSource。

    继续深入方法:canApply(pca.getPointcut(), targetClass, hasIntroductions);

    public static boolean canApply(Pointcut pc, Class<?> targetClass, boolean hasIntroductions) {
    		
    		..........
    		
    		MethodMatcher methodMatcher = pc.getMethodMatcher();
    		
    		IntroductionAwareMethodMatcher introductionAwareMethodMatcher = null;
            if (methodMatcher instanceof IntroductionAwareMethodMatcher) {
                introductionAwareMethodMatcher = (IntroductionAwareMethodMatcher) methodMatcher;
            }
    		...............
    		
    		for (Class<?> clazz : classes) {
    			Method[] methods = ReflectionUtils.getAllDeclaredMethods(clazz);
    			for (Method method : methods) {
    				if (introductionAwareMethodMatcher != null ?
    						introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) :
    						methodMatcher.matches(method, targetClass)) {
    					return true;
    				}
    			}
    		}
    
    		return false;
    	}
    

    经过上文,我们得知:pc的实际类型是: TransactionAttributeSourcePointcut

    TransactionAttributeSourcePointcut.java类图
    在这里插入图片描述

    跟随它的类图,我们在类:StaticMethodMatcherPointcut 中找到了 getMethodMatcher() 方法的实现

    @Override
    	public final MethodMatcher getMethodMatcher() {
    		return this;
    	}
    

    根据它的类型我么可以确定表达式:

    introductionAwareMethodMatcher != null 
        ? introductionAwareMethodMatcher.matches(method, targetClass, hasIntroductions) 
            : methodMatcher.matches(method, targetClass)
    

    最终执行的是后者:methodMatcher.matches(method, targetClass)。
    类 TransactionAttributeSourcePointcut中找到了 matches的实现:

    @Override
    	public boolean matches(Method method, @Nullable Class<?> targetClass) {
    		if (targetClass != null && TransactionalProxy.class.isAssignableFrom(targetClass)) {
    			return false;
    		}
    		// 注册的类定义 transactionAttributeSource AnnotationTransactionAttributeSource
    		TransactionAttributeSource tas = getTransactionAttributeSource();
    		return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
    	}
    

    这里我们见到了回调方法:getTransactionAttributeSource(), 最终:tas 的实际类型为:AnnotationTransactionAttributeSource,
    表达式 methodMatcher.matches(method, targetClass) 最终判定的是:

    (tas == null || tas.getTransactionAttribute(method, targetClass) != null)
    

    跟随 AnnotationTransactionAttributeSource.getTransactionAttribute 继续往下,

    在父类AbstractFallbackTransactionAttributeSource中找到方法:computeTransactionAttribute() ,至此我们终于看到了提取事务声明的方法:

    protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
    		Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
    		// 查看方法中是否存在事务声明
    		TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
    		if (txAttr != null) {
    			return txAttr; }
    		// 查看方法所在类是否存在事务声明 
    		txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
    		if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
    			return txAttr; }
    		// specificMethod != method 代表存在接口,去接口中找 
    		if (specificMethod != method) {
    			// 查找接口实现方法 ??
    			txAttr = findTransactionAttribute(method);
    			if (txAttr != null) {
    				return txAttr;  }
    			// 查找接口实现类 ?? 
    			txAttr = findTransactionAttribute(method.getDeclaringClass());
    			if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
    				return txAttr; }
    		}
    		return null;
    	}
    

    最后在 AnnotationTransactionAttributeSource 类中找到 findTransactionAttribute() 方法的定义:

        protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
    		return determineTransactionAttribute(clazz);
    	}
    
    	protected TransactionAttribute findTransactionAttribute(Method method) {
    		return determineTransactionAttribute(method);
    	}
    	
    	protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
        		// annotationParsers (SpringTransactionAnnotationParser) 
        		for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
        			TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element);
        			if (attr != null) {
        				return attr;
        			}
        		}
        		return null;
        	}
    

    走到此处,你可能懵逼了:this.annotationParsers是个啥,我没有注册过它啊? 查看类 AnnotationTransactionAttributeSource 的构造器:

    public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
    		this.publicMethodsOnly = publicMethodsOnly;
    		if (jta12Present || ejb3Present) {
    			this.annotationParsers = new LinkedHashSet<>(4);
    			this.annotationParsers.add(new SpringTransactionAnnotationParser());
    			if (jta12Present) {
    				this.annotationParsers.add(new JtaTransactionAnnotationParser());
    			}
    			if (ejb3Present) {
    				this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
    			}
    		}
    		else {
    			this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
    		}
    	}
    

    最终集合 this.annotationParsers 被初始化为:SpringTransactionAnnotationParser 的集合;从中,我们来到了此行的终点:

    
    @Override
    	@Nullable
    	public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
    		// 注解工具类,提取注解 ?? 
    		AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
    				element, Transactional.class, false, false);
    		if (attributes != null) {
    			return parseTransactionAnnotation(attributes);
    		}
    		else {
    			return null;
    		}
    	}
    
    	// 获取注解标记 
    	protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
    		RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
    
    		Propagation propagation = attributes.getEnum("propagation");
    		rbta.setPropagationBehavior(propagation.value());
    		Isolation isolation = attributes.getEnum("isolation");
    		rbta.setIsolationLevel(isolation.value());
    		rbta.setTimeout(attributes.getNumber("timeout").intValue());// 超时 ??
    		rbta.setReadOnly(attributes.getBoolean("readOnly"));// 只读 ??
    		rbta.setQualifier(attributes.getString("value"));// 
    
    		// 回滚 ?? 
    		List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
    		for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
    			rollbackRules.add(new RollbackRuleAttribute(rbRule));
    		}
    		for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
    			rollbackRules.add(new RollbackRuleAttribute(rbRule));
    		}
    		for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
    			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    		}
    		for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
    			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    		}
    		rbta.setRollbackRules(rollbackRules);
    
    		return rbta;
    	}
    
    

    自此事务标签的解析完成,剩下的是对增强的具体实现

    TransactionInterceptor 事务增强的实现

    应该还记得有两个类被注册到BeanFactoryTransactionAttributeSourceAdvisor 中,我们已经讲到了,AnnotationTransactionAttributeSource,
    它的作用是处理增强是否匹配当前方法。 (换句话说,方法是是否能提取到事务标签即可证明是否适用增强)

    剩下一个还未提到的类是:TransactionInterceptor,
    不多说,关门放类图:
    TransactionInterceptor.java 类图
    在这里插入图片描述

    从继承的体系结构,可以很容易得知他是一个拦截器,那么直接看invoke的定义:

    public Object invoke(MethodInvocation invocation) throws Throwable {
    		// Work out the target class: may be {@code null}.
    		// The TransactionAttributeSource should be passed the target class
    		// as well as the method, which may be from an interface.
    		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
    		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
    		return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);// 继续 
    	}
    	
    	protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
        			final InvocationCallback invocation) throws Throwable {
        
        		// If the transaction attribute is null, the method is non-transactional.
        		TransactionAttributeSource tas = getTransactionAttributeSource();
        		// 获取事务属性  <增强信息>
        		final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        		// transactionManager 
        		final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        		// 切点信息 ??  - 构造方法唯一标识:  全限定类名.方法名() 
        		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
        
        		// 声明式事务处理 
        		if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        			// Standard transaction demarcation with getTransaction and commit/rollback calls.
        			// 创建事务 
        			TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        
        			Object retVal;
        			try {
        				// This is an around advice: Invoke the next interceptor in the chain.
        				// This will normally result in a target object being invoked.
        				// 执行被增强的方法 
        				retVal = invocation.proceedWithInvocation();
        			}
        			catch (Throwable ex) {
        				// 出现异常,处理回滚  默认仅仅回滚 RunTimeException
        				completeTransactionAfterThrowing(txInfo, ex);
        				throw ex;
        			}
        			finally {
        				// clear 
        				cleanupTransactionInfo(txInfo);
        			}
        			// 提交事务  <不论是否回滚都需要提交事务到恰当的状态>  
        			// 对于内嵌事务不会直接提交,会将事务结果设置保存点,当最外层事务也正常执行后,由最外层事务统一提交 
        			commitTransactionAfterReturning(txInfo);
        			return retVal;
        		}
        	}
    	
    	
    

    创建事务:

    // 创建事务
    	protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
    			@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    
    		// If no name specified, apply method identification as transaction name.
    		if (txAttr != null && txAttr.getName() == null) {
    			// 包装器??
    			txAttr = new DelegatingTransactionAttribute(txAttr) {
    				@Override
    				public String getName() {
    					return joinpointIdentification;
    				}
    			};
    		}
    
    		TransactionStatus status = null;
    		if (txAttr != null) {
    			if (tm != null) {
    				// 事务状态 ??
    				status = tm.getTransaction(txAttr);// DataSourceTransactionManager.getTransaction();
    			}
    			else {
    				if (logger.isDebugEnabled()) {
    					logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
    							"] because no transaction manager has been configured");
    				}
    			}
    		}
    		// 准备并封装事务信息 
    		return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    	}
    

    处理事务准备工作,事务获取以及信息构建:

    
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    		Object transaction = doGetTransaction();// 获取事务 ,基于JDBC创建事务实例,如果当前线程已经记录了连接,可以复用 
    
    		// Cache debug flag to avoid repeated checks.
    		boolean debugEnabled = logger.isDebugEnabled();
    
    		if (definition == null) {
    			// Use defaults if no transaction definition given.
    			definition = new DefaultTransactionDefinition();
    		}
    
    		if (isExistingTransaction(transaction)) {// 当前线程已经存在事务,且连接不为空,连接中的 transactionActive 不为空
    			// Existing transaction found -> check propagation behavior to find out how to behave.
    			return handleExistingTransaction(definition, transaction, debugEnabled);// 当前线程存在事务,转向嵌套事务的处理 
    		}
    
    		// Check definition settings for new transaction.
    		// 事务超时设置验证
    		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    		}
    
    		// No existing transaction found -> check propagation behavior to find out how to proceed.
    		// 当前线程不存在事务,且 propagationBehavior 被声明为:PROPAGATION_MANDATORY 则抛出异常 
    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {// 强制性的,但是当前线程没有事务,所以抛出异常 
    			throw new IllegalTransactionStateException(
    					"No existing transaction found for transaction marked with propagation 'mandatory'");
    		}
    		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||// 需要??
    				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||// 需要一个新的??
    				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {// 嵌套
    			// 上述类型,他们都需要新建事务 
    			
    			//  空挂起 ??
    			SuspendedResourcesHolder suspendedResources = suspend(null);
    			if (debugEnabled) {
    				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
    			}
    			try {
    				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    				DefaultTransactionStatus status = newTransactionStatus(
    						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    				// 构造事务,设置connectionHolder、隔离级别、超时、如果是新连接则需要绑定到当前线程   (委托给低层的连接完成)
    				doBegin(transaction, definition);// 数据库连接设置,新(新生成,或者记录被清空无记录) 连接记录到当前线程 
    				// 新同步事务的设置??  针对当前线程进行设置 
    				prepareSynchronization(status, definition);
    				return status;
    			}
    			catch (RuntimeException | Error ex) {
    				resume(null, suspendedResources);
    				throw ex;
    			}
    		}
    		else {
    			// Create "empty" transaction: no actual transaction, but potentially synchronization.
    			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    						"isolation level will effectively be ignored: " + definition);
    			}
    			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    		}
    	}
    

    数据库连接设置:

    
    protected void doBegin(Object transaction, TransactionDefinition definition) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    		Connection con = null;
    
    		try {
    			// 当前线程没有绑定连接或者设置了同步事务 
    			if (!txObject.hasConnectionHolder() ||
    					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    				Connection newCon = obtainDataSource().getConnection();
    				if (logger.isDebugEnabled()) {
    					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    				}
    				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);// 新连接 
    			}
    
    			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);// 修改connectionHolder 的事务标记 
    			con = txObject.getConnectionHolder().getConnection();
    
    			// 设置隔离级别 
    			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    			txObject.setPreviousIsolationLevel(previousIsolationLevel);
    
    			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
    			// so we don't want to do it unnecessarily (for example if we've explicitly
    			// configured the connection pool to set it already).
    			if (con.getAutoCommit()) {// 更改自动提交设置,由spring控制提交
    				txObject.setMustRestoreAutoCommit(true);
    				if (logger.isDebugEnabled()) {
    					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
    				}
    				con.setAutoCommit(false);
    			}
    
    			prepareTransactionalConnection(con, definition);
    			txObject.getConnectionHolder().setTransactionActive(true);// 设置当前线程存在事务 
    
    			int timeout = determineTimeout(definition);
    			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {// 设置连接绑定到事务的过期时间 ?? 
    				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    			}
    
    			// Bind the connection holder to the thread.
    			if (txObject.isNewConnectionHolder()) {// 连接是否为新的连接 ?? 如果是绑定到当前线程 
    				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
    			}
    		}
    
    		catch (Throwable ex) {
    			if (txObject.isNewConnectionHolder()) {
    				DataSourceUtils.releaseConnection(con, obtainDataSource());
    				txObject.setConnectionHolder(null, false);
    			}
    			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    		}
    	}
    
    

    处理已经存在的事务:

    private TransactionStatus handleExistingTransaction(
    			TransactionDefinition definition, Object transaction, boolean debugEnabled)
    			throws TransactionException {// 处理已经存在的事务
    
    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {// never 抛异常
    			throw new IllegalTransactionStateException(
    					"Existing transaction found for transaction marked with propagation 'never'");
    		}
    
    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {// supported 
    			if (debugEnabled) {
    				logger.debug("Suspending current transaction");
    			}
    			Object suspendedResources = suspend(transaction);
    			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);// always 
    			return prepareTransactionStatus(
    					definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    		}
    
    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {// requires_new 
    			if (debugEnabled) {
    				logger.debug("Suspending current transaction, creating new transaction with name [" +
    						definition.getName() + "]");
    			}
    			SuspendedResourcesHolder suspendedResources = suspend(transaction);// 挂起旧事务
    			try {
    				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);// 创建新事务
    				DefaultTransactionStatus status = newTransactionStatus(
    						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    				doBegin(transaction, definition);
    				prepareSynchronization(status, definition);
    				return status;
    			}
    			catch (RuntimeException | Error beginEx) {
    				resumeAfterBeginException(transaction, suspendedResources, beginEx);
    				throw beginEx;
    			}
    		}
    
    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {// nested 嵌套式事务处理 
    			if (!isNestedTransactionAllowed()) {
    				throw new NestedTransactionNotSupportedException(
    						"Transaction manager does not allow nested transactions by default - " +
    						"specify 'nestedTransactionAllowed' property with value 'true'");
    			}
    			if (debugEnabled) {
    				logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
    			}
    			if (useSavepointForNestedTransaction()) {
    				// Create savepoint within existing Spring-managed transaction,
    				// through the SavepointManager API implemented by TransactionStatus.
    				// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
    				// 没有保存点,在嵌套式事务建立初始保存点  存档??
    				DefaultTransactionStatus status =
    						prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    				status.createAndHoldSavepoint();
    				return status;
    			}
    			else {
    				// Nested transaction through nested begin and commit/rollback calls.
    				// Usually only for JTA: Spring synchronization might get activated here
    				// in case of a pre-existing JTA transaction.
    				// 不能使用保存点的时候,新建事务 
    				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    				DefaultTransactionStatus status = newTransactionStatus(
    						definition, transaction, true, newSynchronization, debugEnabled, null);
    				doBegin(transaction, definition);
    				prepareSynchronization(status, definition);
    				return status;
    			}
    		}
    
    		// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    		if (debugEnabled) {
    			logger.debug("Participating in existing transaction");
    		}
    		if (isValidateExistingTransaction()) {
    			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
    				Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    				if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
    					Constants isoConstants = DefaultTransactionDefinition.constants;
    					throw new IllegalTransactionStateException("Participating transaction with definition [" +
    							definition + "] specifies isolation level which is incompatible with existing transaction: " +
    							(currentIsolationLevel != null ?
    									isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
    									"(unknown)"));
    				}
    			}
    			if (!definition.isReadOnly()) {
    				if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
    					throw new IllegalTransactionStateException("Participating transaction with definition [" +
    							definition + "] is not marked as read-only but existing transaction is");
    				}
    			}
    		}
    		boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    		return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    	}
    

    回滚

    protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    		if (txInfo != null && txInfo.getTransactionStatus() != null) {//  判断存在事务 
    			if (logger.isTraceEnabled()) {
    				logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
    						"] after exception: " + ex);
    			}
    			// 默认实现: (ex instanceof RuntimeException || ex instanceof Error) 
    			if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
    				try {
    					txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
    				}
    				catch (TransactionSystemException ex2) {
    					logger.error("Application exception overridden by rollback exception", ex);
    					ex2.initApplicationException(ex);
    					throw ex2;
    				}
    				catch (RuntimeException | Error ex2) {
    					logger.error("Application exception overridden by rollback exception", ex);
    					throw ex2;
    				}
    			}
    			else {
    				// We don't roll back on this exception.
    				// Will still roll back if TransactionStatus.isRollbackOnly() is true.
    				// 当不满足 rollbackOn 的回滚条件时,抛异常也会提交事务 
    				try {
    					txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    				}
    				catch (TransactionSystemException ex2) {
    					logger.error("Application exception overridden by commit exception", ex);
    					ex2.initApplicationException(ex);
    					throw ex2;
    				}
    				catch (RuntimeException | Error ex2) {
    					logger.error("Application exception overridden by commit exception", ex);
    					throw ex2;
    				}
    			}
    		}
    	}
    

    回滚处理:

    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    		try {
    			boolean unexpectedRollback = unexpected;
    
    			try {
    				// 自定义触发器的调用
    				// 激活所有 TransactionSynchronization 中的 beforeCompletion方法
    				triggerBeforeCompletion(status);
    
    				if (status.hasSavepoint()) {// 回滚到保存点 ??  常用于嵌套事务,内层事务异常回滚不会影响到外层事务 
    					if (status.isDebug()) {
    						logger.debug("Rolling back transaction to savepoint");
    					}
    					// 保存点依赖于底层的数据库连接
    					status.rollbackToHeldSavepoint();// 如果有保存点,当前事务为单独的线程(单独的事务),会退到保存点 
    				}
    				else if (status.isNewTransaction()) {// 当前事务为独立的新事务,直接回滚 
    					if (status.isDebug()) {
    						logger.debug("Initiating transaction rollback");
    					}
    					doRollback(status);
    				}
    				else {// 没有保存点,且不是独立的新事务,那么标记状态,等到事务链执行完后统一回滚  (都不提交) 
    					// Participating in larger transaction
    					if (status.hasTransaction()) {
    						if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
    							if (status.isDebug()) {
    								logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
    							}
    							doSetRollbackOnly(status);
    						}
    						else {
    							if (status.isDebug()) {
    								logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
    							}
    						}
    					}
    					else {
    						logger.debug("Should roll back transaction but cannot - no transaction available");
    					}
    					// Unexpected rollback only matters here if we're asked to fail early
    					if (!isFailEarlyOnGlobalRollbackOnly()) {
    						unexpectedRollback = false;
    					}
    				}
    			}
    			catch (RuntimeException | Error ex) {
    				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    				throw ex;
    			}
    
    			// 自定义触发器的调用
    			// 激活所有 TransactionSynchronization 中的 afterCompletion方法
    			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    
    			// Raise UnexpectedRollbackException if we had a global rollback-only marker
    			if (unexpectedRollback) {
    				throw new UnexpectedRollbackException(
    						"Transaction rolled back because it has been marked as rollback-only");
    			}
    		}
    		finally {
    			cleanupAfterCompletion(status);// 清空记录的资源,例如记录到线程的连接 ;   将挂起的资源恢复,例如:嵌套事务。
    		}
    	}
    

    事务提交

    public final void commit(TransactionStatus status) throws TransactionException {
    		if (status.isCompleted()) {// 事务已经完成,在此提交抛出异常 
    			throw new IllegalTransactionStateException(
    					"Transaction is already completed - do not call commit or rollback more than once per transaction");
    		}
    
    		DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    		if (defStatus.isLocalRollbackOnly()) {// 事务链 中标记了回滚,那么直接回滚,不再尝试提交事务 
    			if (defStatus.isDebug()) {
    				logger.debug("Transactional code has requested rollback");
    			}
    			processRollback(defStatus, false);
    			return;
    		}
    
    		if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {// 
    			if (defStatus.isDebug()) {
    				logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
    			}
    			processRollback(defStatus, true);
    			return;
    		}
    
    		// 处理提交逻辑 
    		processCommit(defStatus);
    	}
    

    处理提交逻辑:

    
    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    		try {
    			boolean beforeCompletionInvoked = false;
    
    			try {
    				boolean unexpectedRollback = false;
    				prepareForCommit(status);// 预留钩子 
    				triggerBeforeCommit(status);// 提交前触发器激活
    				triggerBeforeCompletion(status);// 完成前触发器 ?? 
    				beforeCompletionInvoked = true;
    
    				if (status.hasSavepoint()) {// 存在保存点??
    					if (status.isDebug()) {// 事务正常执行完毕,释放(清除)保存点 
    						logger.debug("Releasing transaction savepoint");
    					}
    					unexpectedRollback = status.isGlobalRollbackOnly();
    					status.releaseHeldSavepoint();
    				}
    				else if (status.isNewTransaction()) {// 独立的新事务,直接提交 
    					if (status.isDebug()) {
    						logger.debug("Initiating transaction commit");
    					}
    					unexpectedRollback = status.isGlobalRollbackOnly();
    					doCommit(status);// 提交事务 调用底层的数据库连接
    				}// 不被spring管理的事务 ?? 无法设置保存点的事务 ?? 设置回滚标识  
    				else if (isFailEarlyOnGlobalRollbackOnly()) {// 事务链中的某个事务被设置了回滚标记,那么直接回滚 
    					unexpectedRollback = status.isGlobalRollbackOnly();
    				}
    
    				// Throw UnexpectedRollbackException if we have a global rollback-only
    				// marker but still didn't get a corresponding exception from commit.
    				if (unexpectedRollback) {// 回滚标识  
    					throw new UnexpectedRollbackException(
    							"Transaction silently rolled back because it has been marked as rollback-only");
    				}
    			}
    			catch (UnexpectedRollbackException ex) {
    				// can only be caused by doCommit
    				triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    				throw ex;
    			}
    			catch (TransactionException ex) {
    				// can only be caused by doCommit
    				if (isRollbackOnCommitFailure()) {
    					doRollbackOnCommitException(status, ex);
    				}
    				else {
    					triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    				}
    				throw ex;
    			}
    			catch (RuntimeException | Error ex) {
    				if (!beforeCompletionInvoked) {
    					triggerBeforeCompletion(status);
    				}
    				doRollbackOnCommitException(status, ex);// 提交过程中出现异常,回滚 
    				throw ex;
    			}
    
    			// Trigger afterCommit callbacks, with an exception thrown there
    			// propagated to callers but the transaction still considered as committed.
    			try {
    				triggerAfterCommit(status);// 提交事务后的触发器
    			}
    			finally {
    				// 事务完成后的触发器
    				triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
    			}
    
    		}
    		finally {
    			cleanupAfterCompletion(status);// 事务完成后进行清理 
    		}
    	}
    
    
  • 相关阅读:
    Mesos-DNS
    秒杀系统
    Springboot配置Druid多数据源
    Android vitals 帮您解决应用质量问题
    Android vitals 帮您解决应用质量问题 (下篇)
    Android Sunflower 带您玩转 Jetpack
    Android Smart Linkify 支持机器学习
    Android Pie SDK 与 Kotlin 更合拍
    Android P 中的新文本特性
    Android P Beta 2 及终版 API 强势来袭!
  • 原文地址:https://www.cnblogs.com/bokers/p/14903075.html
Copyright © 2011-2022 走看看