zoukankan      html  css  js  c++  java
  • Spring 源代码阅读之声明式事务

    事务控制流程

    例如对如下代码进行事务控制

    class service1{
        method1(){
            method2();
        }
    }
    class service2{
        method2();
    }

    原理:建立一个method interceptor 拦截service的方法,在方法开始前begin事务,方法结束后commit事务

    对于上述例子的流程为:

    1)method1 begin transaction,新建一个事务并将该事务存储到当前线程当中,建立一个对象transInfo,存储方法id和transaction引用,并标记status为new

    2)method1 proceed(出现异常回滚事务)

    3)method2 begin transaction,判断当前线程是否有事务,如果有则使用当前事务,建立一个对象transInfo,标记status为exist

    4)method2 proceed(出现异常回滚事务)

    5)method2 commit 判断transInfo 状态是否为new,否则跳过提交

    6)method1 commit 因为状态是new,所以提交

    1.  初始化事务配置信息TransactionProxyFactoryBean

    package org.springframework.transaction.interceptor;
    
    import java.util.Properties;
    
    import org.springframework.aop.Pointcut;
    import org.springframework.aop.framework.AbstractSingletonProxyFactoryBean;
    import org.springframework.aop.support.DefaultPointcutAdvisor;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.BeanFactoryAware;
    import org.springframework.beans.factory.FactoryBean;
    import org.springframework.beans.factory.ListableBeanFactory;
    import org.springframework.transaction.PlatformTransactionManager;
    
    public class TransactionProxyFactoryBean extends AbstractSingletonProxyFactoryBean
            implements BeanFactoryAware {
    
        private final TransactionInterceptor transactionInterceptor = new TransactionInterceptor();
    
        private Pointcut pointcut;
    
    
        
        public void setTransactionManager(PlatformTransactionManager transactionManager) {
            this.transactionInterceptor.setTransactionManager(transactionManager);
        }
    
        
        public void setTransactionAttributes(Properties transactionAttributes) {
            this.transactionInterceptor.setTransactionAttributes(transactionAttributes);
        }
    
        
        public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
            this.transactionInterceptor.setTransactionAttributeSource(transactionAttributeSource);
        }
    
        
        public void setPointcut(Pointcut pointcut) {
            this.pointcut = pointcut;
        }
    
        
        public void setBeanFactory(BeanFactory beanFactory) {
            this.transactionInterceptor.setBeanFactory(beanFactory);
        }
    
    
        
        @Override
        protected Object createMainInterceptor() {
            this.transactionInterceptor.afterPropertiesSet();
            if (this.pointcut != null) {
                return new DefaultPointcutAdvisor(this.pointcut, this.transactionInterceptor);
            }
            else {
                // Rely on default pointcut.
                return new TransactionAttributeSourceAdvisor(this.transactionInterceptor);
            }
        }
    
    }

    2.TransactionInterceptor 事务拦截器

    package org.springframework.transaction.interceptor;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.io.Serializable;
    import java.util.Properties;
    import org.aopalliance.intercept.MethodInterceptor;
    import org.aopalliance.intercept.MethodInvocation;
    import org.springframework.aop.support.AopUtils;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.transaction.PlatformTransactionManager;
    import org.springframework.transaction.TransactionStatus;
    import org.springframework.transaction.support.CallbackPreferringPlatformTransactionManager;
    import org.springframework.transaction.support.TransactionCallback;
    
    @SuppressWarnings("serial")
    public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
               //拦截methods进行事务处理,每个advisor都会进行一次拦截
         public Object invoke(final MethodInvocation invocation) throws Throwable {
            // Work out the target class: may be <code>null</code>.
            // The TransactionAttributeSource should be passed the target class
            // as well as the method, which may be from an interface.
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
            // If the transaction attribute is null, the method is non-transactional.
            final TransactionAttribute txAttr =
                    getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);
            //获取TransactionManager,一般情况下为配置文件配置好的TranstionManager
                    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
            final String joinpointIdentification = methodIdentification(invocation.getMethod(), targetClass);
    
            if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
                // Standard transaction demarcation with getTransaction and commit/rollback calls.
                //开始一个事务,将事务信息封装到TransactionInfo 并绑定到当前线程
                            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
                Object retVal = null;
                try {
                    // This is an around advice: Invoke the next interceptor in the chain.
                    // This will normally result in a target object being invoked.
                    retVal = invocation.proceed();
                }
                catch (Throwable ex) {
                    // target invocation exception
                             //出现异常时对事务做相应处理
                    completeTransactionAfterThrowing(txInfo, ex);
                    throw ex;
                }
                finally {
                    cleanupTransactionInfo(txInfo);
                }
                           //提交事务
                commitTransactionAfterReturning(txInfo);
                return retVal;
            }
    
            else {
                // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
                try {
                    Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr,
                            new TransactionCallback<Object>() {
                                public Object doInTransaction(TransactionStatus status) {
                                    TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                                    try {
                                        return invocation.proceed();
                                    }
                                    catch (Throwable ex) {
                                        if (txAttr.rollbackOn(ex)) {
                                            // A RuntimeException: will lead to a rollback.
                                            if (ex instanceof RuntimeException) {
                                                throw (RuntimeException) ex;
                                            }
                                            else {
                                                throw new ThrowableHolderException(ex);
                                            }
                                        }
                                        else {
                                            // A normal return value: will lead to a commit.
                                            return new ThrowableHolder(ex);
                                        }
                                    }
                                    finally {
                                        cleanupTransactionInfo(txInfo);
                                    }
                                }
                            });
    
                    // Check result: It might indicate a Throwable to rethrow.
                    if (result instanceof ThrowableHolder) {
                        throw ((ThrowableHolder) result).getThrowable();
                    }
                    else {
                        return result;
                    }
                }
                catch (ThrowableHolderException ex) {
                    throw ex.getCause();
                }
            }
        }
        
    }

    2.1事务开启相关源代码研究

    TransactionInterceptor invoke方法中

    //开始一个事务,并将事务信息封装到TransactionInfo 
    TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

    跟进该方法源代码:

    Class TransactionAspectSupport

    protected TransactionInfo createTransactionIfNecessary(
                PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {
    
            // If no name specified, apply method identification as transaction name.
            if (txAttr != null && txAttr.getName() == null) {
                txAttr = new DelegatingTransactionAttribute(txAttr) {
                    @Override
                    public String getName() {
                        return joinpointIdentification;
                    }
                };
            }
    
            TransactionStatus status = null;
            if (txAttr != null) {
                if (tm != null) {
                                    //如果当前线程中没有事务,则开启一个事务,若事务已经存在,则使用该事务
                    status = tm.getTransaction(txAttr);
                }
                else {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                                "] because no transaction manager has been configured");
                    }
                }
            }
            return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
        }

    createTransactionIfNecessary方法中

    //如果当前线程中没有事务,则开启一个事务,若事务已经存在,则使用该事务
    status = tm.getTransaction(txAttr);

    跟进该方法源代码:

    ClassAbstractPlatformTransactionManager

    public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {    
            Object transaction = doGetTransaction();//调用具体的TransactionManager实现类获取Transaction
    
            // Cache debug flag to avoid repeated checks.
            boolean debugEnabled = logger.isDebugEnabled();
    
            if (definition == null) {
                // Use defaults if no transaction definition given.
                definition = new DefaultTransactionDefinition();
            }
    
            if (isExistingTransaction(transaction)) {
                // Existing transaction found -> check propagation behavior to find out how to behave.
                return handleExistingTransaction(definition, transaction, debugEnabled);
            }
    
            // Check definition settings for new transaction.
            if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
                throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
            }
    
            // No existing transaction found -> check propagation behavior to find out how to proceed.
            if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
                throw new IllegalTransactionStateException(
                        "No existing transaction found for transaction marked with propagation 'mandatory'");
            }
            else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                    definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
                SuspendedResourcesHolder suspendedResources = suspend(null);
                if (debugEnabled) {
                    logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
                }
                try {
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                    //调用具体TransactionManager开启事务的方法
                                    doBegin(transaction, definition);
                    prepareSynchronization(status, definition);
                    return status;
                }
                catch (RuntimeException ex) {
                    resume(null, suspendedResources);
                    throw ex;
                }
                catch (Error err) {
                    resume(null, suspendedResources);
                    throw err;
                }
            }
            else {
                // Create "empty" transaction: no actual transaction, but potentially synchronization.
                boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
                return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
            }
        }

    getTransaction方法中

    //调用具体TransactionManager开启事务的方法
     doBegin(transaction, definition);

    跟进其源代码:

    ClassHibernateTransactionManager

    @Override
        protected void doBegin(Object transaction, TransactionDefinition definition) {
            HibernateTransactionObject txObject = (HibernateTransactionObject) transaction;
    
            if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                throw new IllegalTransactionStateException(
                        "Pre-bound JDBC Connection found! HibernateTransactionManager does not support " +
                        "running within DataSourceTransactionManager if told to manage the DataSource itself. " +
                        "It is recommended to use a single HibernateTransactionManager for all transactions " +
                        "on a single DataSource, no matter whether Hibernate or JDBC access.");
            }
    
            Session session = null;
    
            try {
                if (txObject.getSessionHolder() == null || txObject.getSessionHolder().isSynchronizedWithTransaction()) {
                    Interceptor entityInterceptor = getEntityInterceptor();
                    Session newSession = (entityInterceptor != null ?
                            getSessionFactory().openSession(entityInterceptor) : getSessionFactory().openSession());
                    if (logger.isDebugEnabled()) {
                        logger.debug("Opened new Session [" + SessionFactoryUtils.toString(newSession) +
                                "] for Hibernate transaction");
                    }
                    txObject.setSession(newSession);
                }
    
                session = txObject.getSessionHolder().getSession();
    
                if (this.prepareConnection && isSameConnectionForEntireSession(session)) {
                    // We're allowed to change the transaction settings of the JDBC Connection.
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                                "Preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]");
                    }
                    Connection con = session.connection();
                    Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
                    txObject.setPreviousIsolationLevel(previousIsolationLevel);
                }
                else {
                    // Not allowed to change the transaction settings of the JDBC Connection.
                    if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                        // We should set a specific isolation level but are not allowed to...
                        throw new InvalidIsolationLevelException(
                                "HibernateTransactionManager is not allowed to support custom isolation levels: " +
                                "make sure that its 'prepareConnection' flag is on (the default) and that the " +
                                "Hibernate connection release mode is set to 'on_close' (SpringTransactionFactory's default). " +
                                "Make sure that your LocalSessionFactoryBean actually uses SpringTransactionFactory: Your " +
                                "Hibernate properties should *not* include a 'hibernate.transaction.factory_class' property!");
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug(
                                "Not preparing JDBC Connection of Hibernate Session [" + SessionFactoryUtils.toString(session) + "]");
                    }
                }
    
                if (definition.isReadOnly() && txObject.isNewSession()) {
                    // Just set to NEVER in case of a new Session for this transaction.
                    session.setFlushMode(FlushMode.MANUAL);
                }
    
                if (!definition.isReadOnly() && !txObject.isNewSession()) {
                    // We need AUTO or COMMIT for a non-read-only transaction.
                    FlushMode flushMode = session.getFlushMode();
                    if (flushMode.lessThan(FlushMode.COMMIT)) {
                        session.setFlushMode(FlushMode.AUTO);
                        txObject.getSessionHolder().setPreviousFlushMode(flushMode);
                    }
                }
    
                Transaction hibTx;
    
                // Register transaction timeout.
                int timeout = determineTimeout(definition);
                if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                    // Use Hibernate's own transaction timeout mechanism on Hibernate 3.1+
                    // Applies to all statements, also to inserts, updates and deletes!
                    hibTx = session.getTransaction();
                    hibTx.setTimeout(timeout);
                    hibTx.begin();
                }
                else {
                    // Open a plain Hibernate transaction without specified timeout.
                    hibTx = session.beginTransaction();
                }
    
                // Add the Hibernate transaction to the session holder.
                txObject.getSessionHolder().setTransaction(hibTx);
    
                // Register the Hibernate Session's JDBC Connection for the DataSource, if set.
                if (getDataSource() != null) {
                    Connection con = session.connection();
                    ConnectionHolder conHolder = new ConnectionHolder(con);
                    if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                        conHolder.setTimeoutInSeconds(timeout);
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Exposing Hibernate transaction as JDBC transaction [" + con + "]");
                    }
                    TransactionSynchronizationManager.bindResource(getDataSource(), conHolder);
                    txObject.setConnectionHolder(conHolder);
                }
    
                // Bind the session holder to the thread.
                if (txObject.isNewSessionHolder()) {
                    TransactionSynchronizationManager.bindResource(getSessionFactory(), txObject.getSessionHolder());
                }
                txObject.getSessionHolder().setSynchronizedWithTransaction(true);
            }
    
            catch (Exception ex) {
                if (txObject.isNewSession()) {
                    try {
                        if (session.getTransaction().isActive()) {
                            session.getTransaction().rollback();
                        }
                    }
                    catch (Throwable ex2) {
                        logger.debug("Could not rollback Session after failed transaction begin", ex);
                    }
                    finally {
                        SessionFactoryUtils.closeSession(session);
                    }
                }
                throw new CannotCreateTransactionException("Could not open Hibernate Session for transaction", ex);
            }
        }
  • 相关阅读:
    如何实现基于消息安全验证机制下的username身份验证过程
    Stooge 排序
    严版数据结构题集2.13 & 2.14
    Gnome排序
    严版数据结构题集2.11
    梳排序
    鸡尾酒排序
    C语言如何产生随机数
    Bogo排序
    快排序
  • 原文地址:https://www.cnblogs.com/tangyanbo/p/4283503.html
Copyright © 2011-2022 走看看