zoukankan      html  css  js  c++  java
  • Spring Trasnaction管理(1)- 线程间事务隔离

    问题导读

    • Spring中事务是如何实现的
    • Spring中各个线程间是如何进行连接、事务隔离的

    Spring事务配置

    Spring的事务管理应该是日常开发中总会碰到的,但是Spring具体是怎么实现线程间的事务隔离的,下面我们就最基本的DataSourceTransactionMnager来看下。
    一般使用的是以下的方式配置transaction(当然还有其他aop等方式)

        <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
            <property name="dataSource" ref="dataSource" />  
        </bean>  
        
        <tx:annotation-driven proxy-target-class="true" transaction-manager="txManager"/> 
    

    将datasource注入transactionManager之后注册tx:annotation-driven内就可以在代码中使用注解Transactional进行定义事务了
    这里是Spring在启动时将transactionManager的代码织入业务代码来实现事务管理(后续会研究如何织入的)。

    getTransaction

    当调用到相关业务代码前首先调用AbstractPlatformTransactionManager 的getTransaction这个方法会控制事务的传播级别(require,requirenew,support。。。)

    	public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    		Object transaction = doGetTransaction(); //获得现有transaction
    
    		// Cache debug flag to avoid repeated checks.
    		boolean debugEnabled = logger.isDebugEnabled();
    
    		if (definition == null) {
    			// Use defaults if no transaction definition given.
    			definition = new DefaultTransactionDefinition();
    		}
    
    		if (isExistingTransaction(transaction)) {
    			// 如果当前transaction存在就使用handleExistingTransaction,这里是主要控制事务传播机制的地方
    			return handleExistingTransaction(definition, transaction, debugEnabled);
    		}
    
    		// Check definition settings for new transaction.
    		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    		}
    
    		// 当前无transaction存在,根据事务传播级别进行控制
    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    			throw new IllegalTransactionStateException(
    					"No existing transaction found for transaction marked with propagation 'mandatory'");
    		}
    		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    			SuspendedResourcesHolder suspendedResources = suspend(null);
    			if (debugEnabled) {
    				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
    			}
    			try {
    				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    				DefaultTransactionStatus status = newTransactionStatus(
    						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    				//真正开始事务
    				doBegin(transaction, definition);
    				//将事务状态存入TransactionSynchronizationManager
    				prepareSynchronization(status, definition);
    				return status;
    			}
    			catch (RuntimeException ex) {
    				resume(null, suspendedResources);
    				throw ex;
    			}
    			catch (Error err) {
    				resume(null, suspendedResources);
    				throw err;
    			}
    		}
    		else {
    			// Create "empty" transaction: no actual transaction, but potentially synchronization.
    			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    						"isolation level will effectively be ignored: " + definition);
    			}
    			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    		}
    	}
    
    
    	//获得当前transaction(如果有)
    	protected Object doGetTransaction() {
    		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    		txObject.setSavepointAllowed(isNestedTransactionAllowed());
    		//从TransactionSynchronizationManager中获得当前线程中的Connection
    		ConnectionHolder conHolder =
    				(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    		txObject.setConnectionHolder(conHolder, false);
    		return txObject;
    	}
    	
    	...
    	//开始事务
    	protected void doBegin(Object transaction, TransactionDefinition definition) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    		Connection con = null;
    
    		try {
    			if (txObject.getConnectionHolder() == null ||
    					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    				//从配置中注入的datasource中获得connection
    				Connection newCon = this.dataSource.getConnection();
    				if (logger.isDebugEnabled()) {
    					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    				}
    				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    			}
    
    			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    			con = txObject.getConnectionHolder().getConnection();
    
    			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    			txObject.setPreviousIsolationLevel(previousIsolationLevel);
    
    			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
    			// so we don't want to do it unnecessarily (for example if we've explicitly
    			// configured the connection pool to set it already).
    			if (con.getAutoCommit()) {
    				txObject.setMustRestoreAutoCommit(true);
    				if (logger.isDebugEnabled()) {
    					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
    				}
    				//设置非自动提交
    				con.setAutoCommit(false);
    			}
    			txObject.getConnectionHolder().setTransactionActive(true);
    
    			int timeout = determineTimeout(definition);
    			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
    				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    			}
    
    			// Bind the session holder to the thread.
    			if (txObject.isNewConnectionHolder()) {
    				//将新建的Connection放入TransactionSynchronizationManager
    				TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
    			}
    		}
    
    		catch (Throwable ex) {
    			if (txObject.isNewConnectionHolder()) {
    				DataSourceUtils.releaseConnection(con, this.dataSource);
    				txObject.setConnectionHolder(null, false);
    			}
    			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    		}
    	}
    	
    	
    	protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    		if (status.isNewSynchronization()) {
    			//将transaction的状态保存到TransactionSynchronizationManager
    			TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
    			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
    					definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
    							definition.getIsolationLevel() : null);
    			TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
    			TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
    			TransactionSynchronizationManager.initSynchronization();
    		}
    	}
    
    public abstract class TransactionSynchronizationManager {
    	//ThreadLocal的resources用来保存TransactionStatus Connection等
    	private static final ThreadLocal<Map<Object, Object>> resources =
    			new NamedThreadLocal<>("Transactional resources");
    	...
    	
    	public static Object getResource(Object key) {
    		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    		Object value = doGetResource(actualKey);
    		if (value != null && logger.isTraceEnabled()) {
    			logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
    					Thread.currentThread().getName() + "]");
    		}
    		return value;
    	}
    

    可以看到其实Spring只是简单的将获得的连接和事务信息存放到TransactionSynchronizationManager中的ThreadLoacl变量中进行保存,这样就实现了数据库连接及事务的线程安全。

    Commit/Rollback

    在Commit/rollback阶段是使用从一开始getTransaction方法返回的TransactionStatus(其中存放了connection和transaction的信息)作为参数传入commit/rollback进行commit/rollback并在finally中清理TransactionSynchronizationManager

    //commit部分代码
    	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    		try {
    			boolean beforeCompletionInvoked = false;
    			try {
    				prepareForCommit(status);
    				triggerBeforeCommit(status);
    				triggerBeforeCompletion(status);
    				beforeCompletionInvoked = true;
    				boolean globalRollbackOnly = false;
    				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
    					globalRollbackOnly = status.isGlobalRollbackOnly();
    				}
    				if (status.hasSavepoint()) {
    					if (status.isDebug()) {
    						logger.debug("Releasing transaction savepoint");
    					}
    					status.releaseHeldSavepoint();
    				}
    				//如果是自己创建的事物就进行提交,如果不是(比如是嵌套的)就由上层提交
    				else if (status.isNewTransaction()) {
    					if (status.isDebug()) {
    						logger.debug("Initiating transaction commit");
    					}
    					doCommit(status);
    				}
    				// Throw UnexpectedRollbackException if we have a global rollback-only
    				// marker but still didn't get a corresponding exception from commit.
    				if (globalRollbackOnly) {
    					throw new UnexpectedRollbackException(
    							"Transaction silently rolled back because it has been marked as rollback-only");
    				}
    		...
    		}
    		finally {
    			//清除当前TransactionSynchronizationManager中transaction状态和释放申请的连接
    			cleanupAfterCompletion(status);
    		}
    	}
    

    小结

    Spring内部维护了TransactionSynchronizationManager一个单例,并使用ThreadLocal变量记录所有连接事务的信息,这样就防止了线程之间事务、连接的共享,从而实现事务的隔离

  • 相关阅读:
    003 01 Android 零基础入门 01 Java基础语法 01 Java初识 03 Java程序的执行流程
    002 01 Android 零基础入门 01 Java基础语法 01 Java初识 02 Java简介
    001 01 Android 零基础入门 01 Java基础语法 01 Java初识 01 导学
    001 Android Studio 首次编译执行项目过程中遇到的几个常见问题
    Dora.Interception,为.NET Core度身打造的AOP框架 [2]:以约定的方式定义拦截器
    Dora.Interception,为.NET Core度身打造的AOP框架 [1]:更加简练的编程体验
    监视EntityFramework中的sql流转你需要知道的三种方式Log,SqlServerProfile, EFProfile
    轻量级ORM框架——第二篇:Dapper中的一些复杂操作和inner join应该注意的坑
    轻量级ORM框架——第一篇:Dapper快速学习
    CF888G Xor-MST(异或生成树模板)
  • 原文地址:https://www.cnblogs.com/resentment/p/5744708.html
Copyright © 2011-2022 走看看