1、事务的简介
1.1、什么是事务
1.2、事物的特性(ACID)
①:atomicity【原子性】 原子性表现为操作不能被分割,那么这二个操作 要么同时完成,要么就全部不完成,若事务出错了, 那么事务就会回滚, 好像什么 都 没有发生过
②:Consistency【一致性】 一致性也比较容易理解,也就是说数据库要一直处于一致的状态,事务开始前是一个一致状态, 事务结束后是另一个一致状态, 事务将数据库从一个一致状态转移到另一个一致状态
③:Isolation【隔离性】 所谓的独立性就是指并发的事务之间不会互相影响,如果一个事务要访问的数据正在被另外一个 事务修改,只要另外一个事务还未提交,它所访问的数据就不受未提交事务的影响。换句话说,一个事 务的影响在该事务提交前对其它事务是不可见的
④:Durability【持久性】 若事务已经提交了,那么就回在数据库中永久的保存下来
2、事务三个接口介绍
2.1、PlatformTransactionManager
public interface PlatformTransactionManager { /** * 获取事务状态*/ TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException; /** * 事务提交*/ void commit(TransactionStatus status) throws TransactionException; /** * 事务回滚*/ void rollback(TransactionStatus status) throws TransactionException; }
2.2、TransactionDefinition
public interface TransactionDefinition { /** * 事务的传播行为,支持当前事务,如果没有就新建一个 */ int PROPAGATION_REQUIRED = 0; /** * 支持当前事物,如果没有就以非事务的方式执行*/ int PROPAGATION_SUPPORTS = 1; /** * 如果存在事务,则以该事务的方式执行,如果不存在则抛异常*/ int PROPAGATION_MANDATORY = 2; /** * 创建一个新事务,如果当前存在事务,则把当前事务挂起*/ int PROPAGATION_REQUIRES_NEW = 3; /** * 以非事务的方式执行,如果存在,则把当前事务挂起*/ int PROPAGATION_NOT_SUPPORTED = 4; /** * 以非事务的方式执行,如果存在事务,则抛异常*/ int PROPAGATION_NEVER = 5; /** * 如果当前存在事务,则该方法应该运行在一个嵌套的事务中,被嵌套的事务可以独立于封装事务进行提交或者回滚
* 如果封装事务不存在,就和request_new一样 */ int PROPAGATION_NESTED = 6; /** * 事务的隔离级别,mysql默认可重复读,oracle默认已提交读*/ int ISOLATION_DEFAULT = -1; /** * 未提交读(会导致脏读,不可重复读,幻读)*/ int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED; /** * 已提交读(会导致不可重复读,幻读)*/ int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED; /** * 可重复读(会导致幻读)*/ int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ; /** * 序列化*/ int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE; /** * 默认超时时间 */ int TIMEOUT_DEFAULT = -1; /** * 获取事务的传播行为*/ int getPropagationBehavior(); /** * 获取事务的隔离级别*/ int getIsolationLevel(); /** * 获取事务的超时时间*/ int getTimeout(); /** * 返回是否是只读事务*/ boolean isReadOnly(); /** * 获取事务的名称*/ String getName(); }
2.3、TransactionStatus
public interface TransactionStatus extends SavepointManager, Flushable { /** *是否是新事务 */ boolean isNewTransaction(); /** *是否有保存点 */ boolean hasSavepoint(); /** *设置为只回滚 */ void setRollbackOnly(); /** * 是否为只回滚 */ boolean isRollbackOnly(); /** * */ @Override void flush(); /** *判断当前事务是否已经完成 */ boolean isCompleted(); }
3、源码分析
@EnableTransactionManagement @ComponentScan(basePackages = {"com.pjf"}) public class MainConfig { @Bean public DataSource dataSource() { DruidDataSource dataSource = new DruidDataSource(); dataSource.setUsername(""); dataSource.setPassword(""); dataSource.setUrl("jdbc:mysql://localhost:3306/"); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); return dataSource; } @Bean public PlatformTransactionManager transactionManager(DataSource dataSource) { return new DataSourceTransactionManager(dataSource); } }
3.1 @EnableTransactionManagement
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(TransactionManagementConfigurationSelector.class) public @interface EnableTransactionManagement { boolean proxyTargetClass() default false; AdviceMode mode() default AdviceMode.PROXY; int order() default Ordered.LOWEST_PRECEDENCE; }
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> { @Override protected String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY:
//注册了两个类 return new String[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()}; case ASPECTJ: return new String[] { TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } }
3.1.1、AutoProxyRegistrar
public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar { @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { boolean candidateFound = false; Set<String> annoTypes = importingClassMetadata.getAnnotationTypes(); for (String annoType : annoTypes) { AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType); if (candidate == null) { continue; } Object mode = candidate.get("mode"); Object proxyTargetClass = candidate.get("proxyTargetClass"); if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() && Boolean.class == proxyTargetClass.getClass()) { candidateFound = true;
//获取mode的类型,默认的为PROXY if (mode == AdviceMode.PROXY) { AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); if ((Boolean) proxyTargetClass) { AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry); return; } } } } if (!candidateFound && logger.isWarnEnabled()) { String name = getClass().getSimpleName(); logger.warn(String.format("%s was imported but no annotations were found " + "having both 'mode' and 'proxyTargetClass' attributes of type " + "AdviceMode and boolean respectively. This means that auto proxy " + "creator registration and configuration may not have occurred as " + "intended, and components may not be proxied as expected. Check to " + "ensure that %s has been @Import'ed on the same class where these " + "annotations are declared; otherwise remove the import of %s " + "altogether.", name, name, name)); } } }
public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry) { return registerAutoProxyCreatorIfNecessary(registry, null); } public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, Object source) { //注册InfrastructureAdvisorAutoProxyCreator类 return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source); } private static BeanDefinition registerOrEscalateApcAsRequired(Class<?> cls, BeanDefinitionRegistry registry, Object source) { Assert.notNull(registry, "BeanDefinitionRegistry must not be null"); //注册InfrastructureAdvisorAutoProxyCreator类,实现了AbstractAutoProxyCreator //上一节aop源码是注册了AnnotationAwareAspectJAutoProxyCreator,同样也实现了AbstractAutoProxyCreator //两个类的beanName都是AUTO_PROXY_CREATOR_BEAN_NAME,所以如果也需要aop配置,那么这里判断包含就会为true, //然后AnnotationAwareAspectJAutoProxyCreator实现的Order优先级高,所以会注入这个类,InfrastructureAdvisorAutoProxyCreator就不会被注入 //其实上一节aop分析中也可以看出AnnotationAwareAspectJAutoProxyCreator是有findCandidateAdvisors方法的,所以advisor切面也会被注入进来的 if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) { BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME); if (!cls.getName().equals(apcDefinition.getBeanClassName())) { int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName()); int requiredPriority = findPriorityForClass(cls); if (currentPriority < requiredPriority) { apcDefinition.setBeanClassName(cls.getName()); } } return null; } RootBeanDefinition beanDefinition = new RootBeanDefinition(cls); beanDefinition.setSource(source); beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE); beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE); registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition); return beanDefinition; }
InfrastructureAdvisorAutoProxyCreator同样也实现了BeanPostProcessor和InstantiationAwareBeanPostProcessor
和之前aop的AnnotationAwareAspectJAutoProxyCreator逻辑一样,就不重复分析了,唯一的区别就是这里
如果到这里用的是InfrastructureAdvisorAutoProxyCreator,在postProcessBeforeInstantiation()方法其实没做什么事,因为aop是需要把切面缓存起来,因为需要一个类一个类的查找,提高下性能而已,而事务的切面是直接实例化的,也就是下面的BeanFactoryTransactionAttributeSourceAdvisor
其余的postProcessAfterInitialization()逻辑是一样的,不过内部调用到的ifelse分支不一样
3.1.2、ProxyTransactionManagementConfiguration
@Configuration public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration { @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) //注册了事务增强器 public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() { BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); advisor.setTransactionAttributeSource(transactionAttributeSource()); advisor.setAdvice(transactionInterceptor()); advisor.setOrder(this.enableTx.<Integer>getNumber("order")); return advisor; } //定义了一个事务属性源对象 //这里面处理了@Transctional的切点 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionAttributeSource transactionAttributeSource() { return new AnnotationTransactionAttributeSource(); } //事务拦截器对象 @Bean @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public TransactionInterceptor transactionInterceptor() { TransactionInterceptor interceptor = new TransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource()); if (this.txManager != null) { //需要配置事务管理器 interceptor.setTransactionManager(this.txManager); } return interceptor; } }
3.2、事务的调用流程
这里看下jdk动态代理的调用
3.2.1 、JdkDynamicAopProxy#invoke
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MethodInvocation invocation; Object oldProxy = null; boolean setProxyContext = false; TargetSource targetSource = this.advised.targetSource; Class<?> targetClass = null; Object target = null; try { //equal()方法 if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) { // The target does not implement the equals(Object) method itself. return equals(args[0]); } //hashcode()方法 else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) { // The target does not implement the hashCode() method itself. return hashCode(); } //方法的类是不是DecoratingProxy else if (method.getDeclaringClass() == DecoratingProxy.class) { // There is only getDecoratedClass() declared -> dispatch to proxy config. return AopProxyUtils.ultimateTargetClass(this.advised); } //有没有实现Advised else if (!this.advised.opaque && method.getDeclaringClass().isInterface() && method.getDeclaringClass().isAssignableFrom(Advised.class)) { // Service invocations on ProxyConfig with the proxy config... return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args); } Object retVal; //是不是有配置exposeProxy,為true的放入threadLocal中,然后就可以通过AopContext.currentProxy()拿到代理对象 if (this.advised.exposeProxy) { // Make invocation available if necessary. oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } // May be null. Get as late as possible to minimize the time we "own" the target, // in case it comes from a pool. //获取被代理对象 target = targetSource.getTarget(); if (target != null) { targetClass = target.getClass(); } // Get the interception chain for this method. //把增强器转为方法拦截器链
//这里返回上面定义的 TransactionInterceptor List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); // Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. //若方法拦截器为空,则直接反射调用目标方法 if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // We need to create a method invocation... //创建方法拦截器调用链 invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. //执行方法拦截器调用链 retVal = invocation.proceed(); } // Massage return value if necessary. //获取方法返回值 Class<?> returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { // Special case: it returned "this" and the return type of the method // is type-compatible. Note that we can't help if the target sets // a reference to itself in another returned object. //如果方法返回值是this,则将代理对象赋值给retVal retVal = proxy; } //如果方法返回值为基础类型并且不是void,但是返回了null,则报错 else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { throw new AopInvocationException( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { // Must have come from TargetSource. targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } }
3.2.2、proceed()
public Object proceed() throws Throwable { // We start with an index of -1 and increment early. //责任链模式,这里其实和上一节aop调用也是一样的 if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { return invokeJoinpoint(); } //获取拦截器 Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { // Evaluate dynamic method matcher here: static part will already have // been evaluated and found to match. InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { // Dynamic matching failed. // Skip this interceptor and invoke the next in the chain. return proceed(); } } else { // It's an interceptor, so we just invoke it: The pointcut will have // been evaluated statically before this object was constructed. //在这里调用 return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); } }
3.2.3、TransactionInterceptor#invoke
public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. //获取代理的目标 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... //使用事务调用 return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override //触发调用目标方法 public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); }
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. //上面定义的事务源属性对象,获取事务的一些属性,比如隔离级别,传播级别等等 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); //获取事务管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); //获取需要切入的方法 final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. //调用目标方法 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception //异常回滚事务,点进去看会发现只会处理runTimeException和error的错误 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { //清除事务信息 cleanupTransactionInfo(txInfo); } //提交事务 commitTransactionAfterReturning(txInfo); return retVal; } else { final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status) { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } } }
3.2.3.1、TransactionAspectSupport#createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary( PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) { // If no name specified, apply method identification as transaction name.
//包装下事务的属性,加了个名字 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } //获取一个事务状态 TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } }
//准备事务信息 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
AbstractPlatformTransactionManager#getTransaction
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { //先尝试开启一个事务 Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); //传进来的事务定义为空,就是用默认的 if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } //判断是否存在事务 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } //下面都是不存在事务的处理逻辑 // Check definition settings for new transaction. //判断事务是否超时 if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. //不存在事务,但是传播行为Mandatory是不存在就报错 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //挂起当前事务,但是当前是不存在的 SuspendedResourcesHolder suspendedResources = suspend(null); if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { //第一次进来的时候会走到这,创建一个新事务 boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //开启一个事务 doBegin(transaction, definition); //准备事务同步 prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
protected Object doGetTransaction() { //创建一个数据库事务管理器 DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject(); //设置一个事务保存点 txObject.setSavepointAllowed(this.isNestedTransactionAllowed()); //从事务管理器中获取持有器 ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.dataSource); txObject.setConnectionHolder(conHolder, false); //返回一个事务 return txObject; } protected boolean isExistingTransaction(Object transaction) { DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction; //持有器不为空,并且事务激活 return txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive(); } protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction; Connection con = null; try { if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this.dataSource.getConnection(); if (this.logger.isDebugEnabled()) { this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } //将数据库连接封装为一个持有器对象并设置待对象中 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } //设置开始同步标志 txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); //设置事务的隔离级别 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (this.logger.isDebugEnabled()) { this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } //关闭事务的自动提交 con.setAutoCommit(false); } this.prepareTransactionalConnection(con, definition); //激活,上面判断是否存在事务有判断这个字段 txObject.getConnectionHolder().setTransactionActive(true); int timeout = this.determineTimeout(definition); if (timeout != -1) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } //把数据源和事务持有器保存到事务管理器中 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable var7) { if (txObject.isNewConnectionHolder()) { //抛出异常,释放资源 DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder((ConnectionHolder)null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7); } }
//把当前的事务设置到同步管理器中 protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
//已经存在事务的处理逻辑 private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } //不同的事务传播行为级别的处理 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } //挂起当前事务 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }