zoukankan      html  css  js  c++  java
  • Spring Trasnaction管理(1)- 线程间事务隔离

    问题导读

    • Spring中事务是如何实现的
    • Spring中各个线程间是如何进行连接、事务隔离的

    Spring事务配置

    Spring的事务管理应该是日常开发中总会碰到的,但是Spring具体是怎么实现线程间的事务隔离的,下面我们就最基本的DataSourceTransactionMnager来看下。
    一般使用的是以下的方式配置transaction(当然还有其他aop等方式)

        <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
            <property name="dataSource" ref="dataSource" />  
        </bean>  
        
        <tx:annotation-driven proxy-target-class="true" transaction-manager="txManager"/> 
    

    将datasource注入transactionManager之后注册tx:annotation-driven内就可以在代码中使用注解Transactional进行定义事务了
    这里是Spring在启动时将transactionManager的代码织入业务代码来实现事务管理(后续会研究如何织入的)。

    getTransaction

    当调用到相关业务代码前首先调用AbstractPlatformTransactionManager 的getTransaction这个方法会控制事务的传播级别(require,requirenew,support。。。)

    	public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    		Object transaction = doGetTransaction(); //获得现有transaction
    
    		// Cache debug flag to avoid repeated checks.
    		boolean debugEnabled = logger.isDebugEnabled();
    
    		if (definition == null) {
    			// Use defaults if no transaction definition given.
    			definition = new DefaultTransactionDefinition();
    		}
    
    		if (isExistingTransaction(transaction)) {
    			// 如果当前transaction存在就使用handleExistingTransaction,这里是主要控制事务传播机制的地方
    			return handleExistingTransaction(definition, transaction, debugEnabled);
    		}
    
    		// Check definition settings for new transaction.
    		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    		}
    
    		// 当前无transaction存在,根据事务传播级别进行控制
    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    			throw new IllegalTransactionStateException(
    					"No existing transaction found for transaction marked with propagation 'mandatory'");
    		}
    		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    			SuspendedResourcesHolder suspendedResources = suspend(null);
    			if (debugEnabled) {
    				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
    			}
    			try {
    				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    				DefaultTransactionStatus status = newTransactionStatus(
    						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    				//真正开始事务
    				doBegin(transaction, definition);
    				//将事务状态存入TransactionSynchronizationManager
    				prepareSynchronization(status, definition);
    				return status;
    			}
    			catch (RuntimeException ex) {
    				resume(null, suspendedResources);
    				throw ex;
    			}
    			catch (Error err) {
    				resume(null, suspendedResources);
    				throw err;
    			}
    		}
    		else {
    			// Create "empty" transaction: no actual transaction, but potentially synchronization.
    			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    						"isolation level will effectively be ignored: " + definition);
    			}
    			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    		}
    	}
    
    
    	//获得当前transaction(如果有)
    	protected Object doGetTransaction() {
    		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    		txObject.setSavepointAllowed(isNestedTransactionAllowed());
    		//从TransactionSynchronizationManager中获得当前线程中的Connection
    		ConnectionHolder conHolder =
    				(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    		txObject.setConnectionHolder(conHolder, false);
    		return txObject;
    	}
    	
    	...
    	//开始事务
    	protected void doBegin(Object transaction, TransactionDefinition definition) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    		Connection con = null;
    
    		try {
    			if (txObject.getConnectionHolder() == null ||
    					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    				//从配置中注入的datasource中获得connection
    				Connection newCon = this.dataSource.getConnection();
    				if (logger.isDebugEnabled()) {
    					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    				}
    				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    			}
    
    			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    			con = txObject.getConnectionHolder().getConnection();
    
    			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    			txObject.setPreviousIsolationLevel(previousIsolationLevel);
    
    			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
    			// so we don't want to do it unnecessarily (for example if we've explicitly
    			// configured the connection pool to set it already).
    			if (con.getAutoCommit()) {
    				txObject.setMustRestoreAutoCommit(true);
    				if (logger.isDebugEnabled()) {
    					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
    				}
    				//设置非自动提交
    				con.setAutoCommit(false);
    			}
    			txObject.getConnectionHolder().setTransactionActive(true);
    
    			int timeout = determineTimeout(definition);
    			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
    				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    			}
    
    			// Bind the session holder to the thread.
    			if (txObject.isNewConnectionHolder()) {
    				//将新建的Connection放入TransactionSynchronizationManager
    				TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
    			}
    		}
    
    		catch (Throwable ex) {
    			if (txObject.isNewConnectionHolder()) {
    				DataSourceUtils.releaseConnection(con, this.dataSource);
    				txObject.setConnectionHolder(null, false);
    			}
    			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    		}
    	}
    	
    	
    	protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    		if (status.isNewSynchronization()) {
    			//将transaction的状态保存到TransactionSynchronizationManager
    			TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
    			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
    					definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
    							definition.getIsolationLevel() : null);
    			TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
    			TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
    			TransactionSynchronizationManager.initSynchronization();
    		}
    	}
    
    public abstract class TransactionSynchronizationManager {
    	//ThreadLocal的resources用来保存TransactionStatus Connection等
    	private static final ThreadLocal<Map<Object, Object>> resources =
    			new NamedThreadLocal<>("Transactional resources");
    	...
    	
    	public static Object getResource(Object key) {
    		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    		Object value = doGetResource(actualKey);
    		if (value != null && logger.isTraceEnabled()) {
    			logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
    					Thread.currentThread().getName() + "]");
    		}
    		return value;
    	}
    

    可以看到其实Spring只是简单的将获得的连接和事务信息存放到TransactionSynchronizationManager中的ThreadLoacl变量中进行保存,这样就实现了数据库连接及事务的线程安全。

    Commit/Rollback

    在Commit/rollback阶段是使用从一开始getTransaction方法返回的TransactionStatus(其中存放了connection和transaction的信息)作为参数传入commit/rollback进行commit/rollback并在finally中清理TransactionSynchronizationManager

    //commit部分代码
    	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    		try {
    			boolean beforeCompletionInvoked = false;
    			try {
    				prepareForCommit(status);
    				triggerBeforeCommit(status);
    				triggerBeforeCompletion(status);
    				beforeCompletionInvoked = true;
    				boolean globalRollbackOnly = false;
    				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
    					globalRollbackOnly = status.isGlobalRollbackOnly();
    				}
    				if (status.hasSavepoint()) {
    					if (status.isDebug()) {
    						logger.debug("Releasing transaction savepoint");
    					}
    					status.releaseHeldSavepoint();
    				}
    				//如果是自己创建的事物就进行提交,如果不是(比如是嵌套的)就由上层提交
    				else if (status.isNewTransaction()) {
    					if (status.isDebug()) {
    						logger.debug("Initiating transaction commit");
    					}
    					doCommit(status);
    				}
    				// Throw UnexpectedRollbackException if we have a global rollback-only
    				// marker but still didn't get a corresponding exception from commit.
    				if (globalRollbackOnly) {
    					throw new UnexpectedRollbackException(
    							"Transaction silently rolled back because it has been marked as rollback-only");
    				}
    		...
    		}
    		finally {
    			//清除当前TransactionSynchronizationManager中transaction状态和释放申请的连接
    			cleanupAfterCompletion(status);
    		}
    	}
    

    小结

    Spring内部维护了TransactionSynchronizationManager一个单例,并使用ThreadLocal变量记录所有连接事务的信息,这样就防止了线程之间事务、连接的共享,从而实现事务的隔离

  • 相关阅读:
    查看日志
    MySQL连接方式和启动方式
    day03--MySQL用户篇
    MySQL5.6与5.7区别
    Ansible部署主从复制
    day03--MySQL多实例及多实例主从
    MySQL体系结构
    day02-mysql编译安装误删除用户恢复
    数据库包获取方式
    day01--数据库介绍及二进制安装MySQL5.6
  • 原文地址:https://www.cnblogs.com/resentment/p/5744708.html
Copyright © 2011-2022 走看看