zoukankan      html  css  js  c++  java
  • spring源码解析四( spring事务)

    1、事务的简介

    1.1、什么是事务

        事务是逻辑上的一组执行单元,要么都执行,要么都不执行

    1.2、事物的特性(ACID)

    ①:atomicity【原子性】 原子性表现为操作不能被分割,那么这二个操作 要么同时完成,要么就全部不完成,若事务出错了, 那么事务就会回滚, 好像什么 都 没有发生过

    ②:Consistency【一致性】 一致性也比较容易理解,也就是说数据库要一直处于一致的状态,事务开始前是一个一致状态, 事务结束后是另一个一致状态, 事务将数据库从一个一致状态转移到另一个一致状态

    ③:Isolation【隔离性】 所谓的独立性就是指并发的事务之间不会互相影响,如果一个事务要访问的数据正在被另外一个 事务修改,只要另外一个事务还未提交,它所访问的数据就不受未提交事务的影响。换句话说,一个事 务的影响在该事务提交前对其它事务是不可见的

    ④:Durability【持久性】 若事务已经提交了,那么就回在数据库中永久的保存下来

    2、事务三个接口介绍

    2.1、PlatformTransactionManager

    事务管理器
    public interface PlatformTransactionManager {
    
        /**
         * 获取事务状态*/
        TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    
        /**
         * 事务提交*/
        void commit(TransactionStatus status) throws TransactionException;
    
        /**
         * 事务回滚*/
        void rollback(TransactionStatus status) throws TransactionException;
    
    }

    2.2、TransactionDefinition

    事物属性的定义
    public interface TransactionDefinition {
    
        /**
         * 事务的传播行为,支持当前事务,如果没有就新建一个
         */
        int PROPAGATION_REQUIRED = 0;
    
        /**
         * 支持当前事物,如果没有就以非事务的方式执行*/
        int PROPAGATION_SUPPORTS = 1;
    
        /**
         * 如果存在事务,则以该事务的方式执行,如果不存在则抛异常*/
        int PROPAGATION_MANDATORY = 2;
    
        /**
         * 创建一个新事务,如果当前存在事务,则把当前事务挂起*/
        int PROPAGATION_REQUIRES_NEW = 3;
    
        /**
         * 以非事务的方式执行,如果存在,则把当前事务挂起*/
        int PROPAGATION_NOT_SUPPORTED = 4;
    
        /**
         * 以非事务的方式执行,如果存在事务,则抛异常*/
        int PROPAGATION_NEVER = 5;
    
        /**
         * 如果当前存在事务,则该方法应该运行在一个嵌套的事务中,被嵌套的事务可以独立于封装事务进行提交或者回滚
    * 如果封装事务不存在,就和request_new一样 */ int PROPAGATION_NESTED = 6; /** * 事务的隔离级别,mysql默认可重复读,oracle默认已提交读*/ int ISOLATION_DEFAULT = -1; /** * 未提交读(会导致脏读,不可重复读,幻读)*/ int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED; /** * 已提交读(会导致不可重复读,幻读)*/ int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED; /** * 可重复读(会导致幻读)*/ int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ; /** * 序列化*/ int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE; /** * 默认超时时间 */ int TIMEOUT_DEFAULT = -1; /** * 获取事务的传播行为*/ int getPropagationBehavior(); /** * 获取事务的隔离级别*/ int getIsolationLevel(); /** * 获取事务的超时时间*/ int getTimeout(); /** * 返回是否是只读事务*/ boolean isReadOnly(); /** * 获取事务的名称*/ String getName(); }

    2.3、TransactionStatus

    事务的状态
    public interface TransactionStatus extends SavepointManager, Flushable {
    
        /**
         *是否是新事务
         */
        boolean isNewTransaction();
    
        /**
         *是否有保存点
         */
        boolean hasSavepoint();
    
        /**
         *设置为只回滚
         */
        void setRollbackOnly();
    
        /**
         * 是否为只回滚
         */
        boolean isRollbackOnly();
    
        /**
         * 
         */
        @Override
        void flush();
    
        /**
         *判断当前事务是否已经完成
         */
        boolean isCompleted();
    
    }

    3、源码分析

    @EnableTransactionManagement
    @ComponentScan(basePackages = {"com.pjf"})
    public class MainConfig {
    
        @Bean
        public DataSource dataSource() {
            DruidDataSource dataSource = new DruidDataSource();
            dataSource.setUsername("");
            dataSource.setPassword("");
            dataSource.setUrl("jdbc:mysql://localhost:3306/");
            dataSource.setDriverClassName("com.mysql.jdbc.Driver");
            return dataSource;
        }
    
        @Bean
        public PlatformTransactionManager transactionManager(DataSource dataSource) {
            return new DataSourceTransactionManager(dataSource);
        }
    }

    3.1 @EnableTransactionManagement

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(TransactionManagementConfigurationSelector.class)
    public @interface EnableTransactionManagement {
    
        boolean proxyTargetClass() default false;
    
        AdviceMode mode() default AdviceMode.PROXY;
    
        int order() default Ordered.LOWEST_PRECEDENCE;
    
    }
    public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
    
        @Override
        protected String[] selectImports(AdviceMode adviceMode) {
            switch (adviceMode) {
                case PROXY:
    //注册了两个类
    return new String[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()}; case ASPECTJ: return new String[] { TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME}; default: return null; } }


    3.1.1、AutoProxyRegistrar

    public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {
        @Override
        public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
            boolean candidateFound = false;
            Set<String> annoTypes = importingClassMetadata.getAnnotationTypes();
            for (String annoType : annoTypes) {
                AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType);
                if (candidate == null) {
                    continue;
                }
                Object mode = candidate.get("mode");
                Object proxyTargetClass = candidate.get("proxyTargetClass");
                if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
                        Boolean.class == proxyTargetClass.getClass()) {
                    candidateFound = true;
    //获取mode的类型,默认的为PROXY
    if (mode == AdviceMode.PROXY) { AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); if ((Boolean) proxyTargetClass) { AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry); return; } } } } if (!candidateFound && logger.isWarnEnabled()) { String name = getClass().getSimpleName(); logger.warn(String.format("%s was imported but no annotations were found " + "having both 'mode' and 'proxyTargetClass' attributes of type " + "AdviceMode and boolean respectively. This means that auto proxy " + "creator registration and configuration may not have occurred as " + "intended, and components may not be proxied as expected. Check to " + "ensure that %s has been @Import'ed on the same class where these " + "annotations are declared; otherwise remove the import of %s " + "altogether.", name, name, name)); } } }
    public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry) {
            return registerAutoProxyCreatorIfNecessary(registry, null);
        }
    
    public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry, Object source) {
            //注册InfrastructureAdvisorAutoProxyCreator类
            return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
        }
    
    private static BeanDefinition registerOrEscalateApcAsRequired(Class<?> cls, BeanDefinitionRegistry registry, Object source) {
            Assert.notNull(registry, "BeanDefinitionRegistry must not be null");
            //注册InfrastructureAdvisorAutoProxyCreator类,实现了AbstractAutoProxyCreator
            //上一节aop源码是注册了AnnotationAwareAspectJAutoProxyCreator,同样也实现了AbstractAutoProxyCreator
            //两个类的beanName都是AUTO_PROXY_CREATOR_BEAN_NAME,所以如果也需要aop配置,那么这里判断包含就会为true,
            //然后AnnotationAwareAspectJAutoProxyCreator实现的Order优先级高,所以会注入这个类,InfrastructureAdvisorAutoProxyCreator就不会被注入
            //其实上一节aop分析中也可以看出AnnotationAwareAspectJAutoProxyCreator是有findCandidateAdvisors方法的,所以advisor切面也会被注入进来的
    
            if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
                BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
                if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
                    int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
                    int requiredPriority = findPriorityForClass(cls);
                    if (currentPriority < requiredPriority) {
                        apcDefinition.setBeanClassName(cls.getName());
                    }
                }
                return null;
            }
    
            RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
            beanDefinition.setSource(source);
            beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);
            beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
            registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
            return beanDefinition;
        }
    
    

    InfrastructureAdvisorAutoProxyCreator同样也实现了BeanPostProcessor和InstantiationAwareBeanPostProcessor 

    和之前aop的AnnotationAwareAspectJAutoProxyCreator逻辑一样,就不重复分析了,唯一的区别就是这里

    如果到这里用的是InfrastructureAdvisorAutoProxyCreator,在postProcessBeforeInstantiation()方法其实没做什么事,因为aop是需要把切面缓存起来,因为需要一个类一个类的查找,提高下性能而已,而事务的切面是直接实例化的,也就是下面的BeanFactoryTransactionAttributeSourceAdvisor 

    其余的postProcessAfterInitialization()逻辑是一样的,不过内部调用到的ifelse分支不一样

    3.1.2、ProxyTransactionManagementConfiguration

    @Configuration
    public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
    
        @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        //注册了事务增强器
        public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
            BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
            advisor.setTransactionAttributeSource(transactionAttributeSource());
            advisor.setAdvice(transactionInterceptor());
            advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
            return advisor;
        }
        //定义了一个事务属性源对象
        //这里面处理了@Transctional的切点
        @Bean
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public TransactionAttributeSource transactionAttributeSource() {
            return new AnnotationTransactionAttributeSource();
        }
        //事务拦截器对象
        @Bean
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public TransactionInterceptor transactionInterceptor() {
            TransactionInterceptor interceptor = new TransactionInterceptor();
            interceptor.setTransactionAttributeSource(transactionAttributeSource());
            if (this.txManager != null) {
                //需要配置事务管理器
                interceptor.setTransactionManager(this.txManager);
            }
            return interceptor;
        }
    
    }

    3.2、事务的调用流程

    这里看下jdk动态代理的调用

    3.2.1 、JdkDynamicAopProxy#invoke

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            MethodInvocation invocation;
            Object oldProxy = null;
            boolean setProxyContext = false;
    
            TargetSource targetSource = this.advised.targetSource;
            Class<?> targetClass = null;
            Object target = null;
    
            try {
                //equal()方法
                if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) {
                    // The target does not implement the equals(Object) method itself.
                    return equals(args[0]);
                }
                //hashcode()方法
                else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) {
                    // The target does not implement the hashCode() method itself.
                    return hashCode();
                }
                //方法的类是不是DecoratingProxy
                else if (method.getDeclaringClass() == DecoratingProxy.class) {
                    // There is only getDecoratedClass() declared -> dispatch to proxy config.
                    return AopProxyUtils.ultimateTargetClass(this.advised);
                }
               //有没有实现Advised
                else if (!this.advised.opaque && method.getDeclaringClass().isInterface() &&
                        method.getDeclaringClass().isAssignableFrom(Advised.class)) {
                    // Service invocations on ProxyConfig with the proxy config...
                    return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args);
                }
    
                Object retVal;
                //是不是有配置exposeProxy,為true的放入threadLocal中,然后就可以通过AopContext.currentProxy()拿到代理对象
                if (this.advised.exposeProxy) {
                    // Make invocation available if necessary.
                    oldProxy = AopContext.setCurrentProxy(proxy);
                    setProxyContext = true;
                }
    
                // May be null. Get as late as possible to minimize the time we "own" the target,
                // in case it comes from a pool.
                //获取被代理对象
                target = targetSource.getTarget();
                if (target != null) {
                    targetClass = target.getClass();
                }
    
                // Get the interception chain for this method.
                //把增强器转为方法拦截器链
    //这里返回上面定义的 TransactionInterceptor  List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); // Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. //若方法拦截器为空,则直接反射调用目标方法 if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // We need to create a method invocation... //创建方法拦截器调用链 invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. //执行方法拦截器调用链 retVal = invocation.proceed(); } // Massage return value if necessary. //获取方法返回值 Class<?> returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { // Special case: it returned "this" and the return type of the method // is type-compatible. Note that we can't help if the target sets // a reference to itself in another returned object. //如果方法返回值是this,则将代理对象赋值给retVal retVal = proxy; } //如果方法返回值为基础类型并且不是void,但是返回了null,则报错 else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { throw new AopInvocationException( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { // Must have come from TargetSource. targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } }

     3.2.2、proceed()

    public Object proceed() throws Throwable {
            //    We start with an index of -1 and increment early.
            //责任链模式,这里其实和上一节aop调用也是一样的
            if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
                return invokeJoinpoint();
            }
            //获取拦截器
            Object interceptorOrInterceptionAdvice =
                    this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
            if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
                // Evaluate dynamic method matcher here: static part will already have
                // been evaluated and found to match.
                InterceptorAndDynamicMethodMatcher dm =
                        (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
                if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) {
                    return dm.interceptor.invoke(this);
                }
                else {
                    // Dynamic matching failed.
                    // Skip this interceptor and invoke the next in the chain.
                    return proceed();
                }
            }
            else {
                // It's an interceptor, so we just invoke it: The pointcut will have
                // been evaluated statically before this object was constructed.
                //在这里调用
                return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
            }
        }

    3.2.3、TransactionInterceptor#invoke

    public Object invoke(final MethodInvocation invocation) throws Throwable {
            // Work out the target class: may be {@code null}.
            // The TransactionAttributeSource should be passed the target class
            // as well as the method, which may be from an interface.
            //获取代理的目标
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
            // Adapt to TransactionAspectSupport's invokeWithinTransaction...
            //使用事务调用
            return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
                @Override
                //触发调用目标方法
                public Object proceedWithInvocation() throws Throwable {
                    return invocation.proceed();
                }
            });
        }
    protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
                throws Throwable {
    
            // If the transaction attribute is null, the method is non-transactional.
            //上面定义的事务源属性对象,获取事务的一些属性,比如隔离级别,传播级别等等
            final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
            //获取事务管理器
            final PlatformTransactionManager tm = determineTransactionManager(txAttr);
            //获取需要切入的方法
            final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    
            if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
                // Standard transaction demarcation with getTransaction and commit/rollback calls.
                TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
                Object retVal = null;
                try {
                    // This is an around advice: Invoke the next interceptor in the chain.
                    // This will normally result in a target object being invoked.
                    //调用目标方法
                    retVal = invocation.proceedWithInvocation();
                }
                catch (Throwable ex) {
                    // target invocation exception
                    //异常回滚事务,点进去看会发现只会处理runTimeException和error的错误
                    completeTransactionAfterThrowing(txInfo, ex);
                    throw ex;
                }
                finally {
                    //清除事务信息
                    cleanupTransactionInfo(txInfo);
                }
                //提交事务
                commitTransactionAfterReturning(txInfo);
                return retVal;
            }
    
            else {
                final ThrowableHolder throwableHolder = new ThrowableHolder();
    
                // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
                try {
                    Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                            new TransactionCallback<Object>() {
                                @Override
                                public Object doInTransaction(TransactionStatus status) {
                                    TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                    try {
                                        return invocation.proceedWithInvocation();
                                    }
                                    catch (Throwable ex) {
                                        if (txAttr.rollbackOn(ex)) {
                                            // A RuntimeException: will lead to a rollback.
                                            if (ex instanceof RuntimeException) {
                                                throw (RuntimeException) ex;
                                            }
                                            else {
                                                throw new ThrowableHolderException(ex);
                                            }
                                        }
                                        else {
                                            // A normal return value: will lead to a commit.
                                            throwableHolder.throwable = ex;
                                            return null;
                                        }
                                    }
                                    finally {
                                        cleanupTransactionInfo(txInfo);
                                    }
                                }
                            });
    
                    // Check result state: It might indicate a Throwable to rethrow.
                    if (throwableHolder.throwable != null) {
                        throw throwableHolder.throwable;
                    }
                    return result;
                }
                catch (ThrowableHolderException ex) {
                    throw ex.getCause();
                }
                catch (TransactionSystemException ex2) {
                    if (throwableHolder.throwable != null) {
                        logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                        ex2.initApplicationException(throwableHolder.throwable);
                    }
                    throw ex2;
                }
                catch (Throwable ex2) {
                    if (throwableHolder.throwable != null) {
                        logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    }
                    throw ex2;
                }
            }
        }

    3.2.3.1、TransactionAspectSupport#createTransactionIfNecessary

    protected TransactionInfo createTransactionIfNecessary(
                PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
    
            // If no name specified, apply method identification as transaction name.
    //包装下事务的属性,加了个名字 if (txAttr != null && txAttr.getName() == null) { txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } //获取一个事务状态 TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr); } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } }
    //准备事务信息
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }

    AbstractPlatformTransactionManager#getTransaction

    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
           //先尝试开启一个事务
            Object transaction = doGetTransaction();
    
            // Cache debug flag to avoid repeated checks.
            boolean debugEnabled = logger.isDebugEnabled();
            //传进来的事务定义为空,就是用默认的
            if (definition == null) {
                // Use defaults if no transaction definition given.
                definition = new DefaultTransactionDefinition();
            }
            //判断是否存在事务
            if (isExistingTransaction(transaction)) {
                // Existing transaction found -> check propagation behavior to find out how to behave.
                return handleExistingTransaction(definition, transaction, debugEnabled);
            }
            //下面都是不存在事务的处理逻辑
            // Check definition settings for new transaction.
            //判断事务是否超时
            if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
                throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
            }
            
            // No existing transaction found -> check propagation behavior to find out how to proceed.
            //不存在事务,但是传播行为Mandatory是不存在就报错
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                throw new IllegalTransactionStateException(
                        "No existing transaction found for transaction marked with propagation 'mandatory'");
            }
            else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                    definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                    definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                //挂起当前事务,但是当前是不存在的
                SuspendedResourcesHolder suspendedResources = suspend(null);
                if (debugEnabled) {
                    logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
                }
                try {
                    //第一次进来的时候会走到这,创建一个新事务
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    //开启一个事务
                    doBegin(transaction, definition);
                    //准备事务同步
                    prepareSynchronization(status, definition);
                    return status;
                }
                catch (RuntimeException ex) {
                    resume(null, suspendedResources);
                    throw ex;
                }
                catch (Error err) {
                    resume(null, suspendedResources);
                    throw err;
                }
            }
            else {
                // Create "empty" transaction: no actual transaction, but potentially synchronization.
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                    logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                            "isolation level will effectively be ignored: " + definition);
                }
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
            }
        }
    protected Object doGetTransaction() {
            //创建一个数据库事务管理器
            DataSourceTransactionManager.DataSourceTransactionObject txObject = new DataSourceTransactionManager.DataSourceTransactionObject();
            //设置一个事务保存点
            txObject.setSavepointAllowed(this.isNestedTransactionAllowed());
            //从事务管理器中获取持有器
            ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(this.dataSource);
            txObject.setConnectionHolder(conHolder, false);
            //返回一个事务
            return txObject;
        }
    
    protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        //持有器不为空,并且事务激活
        return txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive();
    }
    
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        Connection con = null;
    
        try {
            
            if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.dataSource.getConnection();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                //将数据库连接封装为一个持有器对象并设置待对象中
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            //设置开始同步标志
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            //设置事务的隔离级别
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            if (con.getAutoCommit()) {
    
                txObject.setMustRestoreAutoCommit(true);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                //关闭事务的自动提交
                con.setAutoCommit(false);
            }
            
            this.prepareTransactionalConnection(con, definition);
            //激活,上面判断是否存在事务有判断这个字段
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = this.determineTimeout(definition);
            if (timeout != -1) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            //把数据源和事务持有器保存到事务管理器中
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(this.getDataSource(), txObject.getConnectionHolder());
            }
    
        } catch (Throwable var7) {
            if (txObject.isNewConnectionHolder()) {
                //抛出异常,释放资源
                DataSourceUtils.releaseConnection(con, this.dataSource);
                txObject.setConnectionHolder((ConnectionHolder)null, false);
            }
    
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
        }
    }
    //把当前的事务设置到同步管理器中
    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    
            if (status.isNewSynchronization()) {
                TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                        definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                                definition.getIsolationLevel() : null);
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
                TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
                TransactionSynchronizationManager.initSynchronization();
            }
        }
    //已经存在事务的处理逻辑
    private TransactionStatus handleExistingTransaction(
                TransactionDefinition definition, Object transaction, boolean debugEnabled)
                throws TransactionException {
    
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
                throw new IllegalTransactionStateException(
                        "Existing transaction found for transaction marked with propagation 'never'");
            }
            //不同的事务传播行为级别的处理
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction");
                }
                //挂起当前事务
                Object suspendedResources = suspend(transaction);
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(
                        definition, null, false, newSynchronization, debugEnabled, suspendedResources);
            }
    
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
                if (debugEnabled) {
                    logger.debug("Suspending current transaction, creating new transaction with name [" +
                            definition.getName() + "]");
                }
                SuspendedResourcesHolder suspendedResources = suspend(transaction);
                try {
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    doBegin(transaction, definition);
                    prepareSynchronization(status, definition);
                    return status;
                }
                catch (RuntimeException beginEx) {
                    resumeAfterBeginException(transaction, suspendedResources, beginEx);
                    throw beginEx;
                }
                catch (Error beginErr) {
                    resumeAfterBeginException(transaction, suspendedResources, beginErr);
                    throw beginErr;
                }
            }
    
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                if (!isNestedTransactionAllowed()) {
                    throw new NestedTransactionNotSupportedException(
                            "Transaction manager does not allow nested transactions by default - " +
                            "specify 'nestedTransactionAllowed' property with value 'true'");
                }
                if (debugEnabled) {
                    logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
                }
                if (useSavepointForNestedTransaction()) {
                    // Create savepoint within existing Spring-managed transaction,
                    // through the SavepointManager API implemented by TransactionStatus.
                    // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
                    DefaultTransactionStatus status =
                            prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                    status.createAndHoldSavepoint();
                    return status;
                }
                else {
                    // Nested transaction through nested begin and commit/rollback calls.
                    // Usually only for JTA: Spring synchronization might get activated here
                    // in case of a pre-existing JTA transaction.
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, null);
                    doBegin(transaction, definition);
                    prepareSynchronization(status, definition);
                    return status;
                }
            }
    
            // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
            if (debugEnabled) {
                logger.debug("Participating in existing transaction");
            }
            if (isValidateExistingTransaction()) {
                if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                    Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                    if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                        Constants isoConstants = DefaultTransactionDefinition.constants;
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                (currentIsolationLevel != null ?
                                        isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                        "(unknown)"));
                    }
                }
                if (!definition.isReadOnly()) {
                    if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                        throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                definition + "] is not marked as read-only but existing transaction is");
                    }
                }
            }
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
        }

     



  • 相关阅读:
    如何快速取得股票交易历史数据
    ArcSDE性能优化系列之ArcSDE参数篇
    2020年8月29日
    2020年8月31日
    9.2
    2020年8月25日
    2020年8月30日
    2020年8月27日
    2020年8月26日
    2020年8月28日
  • 原文地址:https://www.cnblogs.com/pjfmeng/p/14115597.html
Copyright © 2011-2022 走看看