zoukankan      html  css  js  c++  java
  • Spring Trasnaction管理(1)- 线程间事务隔离

    问题导读

    • Spring中事务是如何实现的
    • Spring中各个线程间是如何进行连接、事务隔离的

    Spring事务配置

    Spring的事务管理应该是日常开发中总会碰到的,但是Spring具体是怎么实现线程间的事务隔离的,下面我们就最基本的DataSourceTransactionMnager来看下。
    一般使用的是以下的方式配置transaction(当然还有其他aop等方式)

        <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">  
            <property name="dataSource" ref="dataSource" />  
        </bean>  
        
        <tx:annotation-driven proxy-target-class="true" transaction-manager="txManager"/> 
    

    将datasource注入transactionManager之后注册tx:annotation-driven内就可以在代码中使用注解Transactional进行定义事务了
    这里是Spring在启动时将transactionManager的代码织入业务代码来实现事务管理(后续会研究如何织入的)。

    getTransaction

    当调用到相关业务代码前首先调用AbstractPlatformTransactionManager 的getTransaction这个方法会控制事务的传播级别(require,requirenew,support。。。)

    	public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    		Object transaction = doGetTransaction(); //获得现有transaction
    
    		// Cache debug flag to avoid repeated checks.
    		boolean debugEnabled = logger.isDebugEnabled();
    
    		if (definition == null) {
    			// Use defaults if no transaction definition given.
    			definition = new DefaultTransactionDefinition();
    		}
    
    		if (isExistingTransaction(transaction)) {
    			// 如果当前transaction存在就使用handleExistingTransaction,这里是主要控制事务传播机制的地方
    			return handleExistingTransaction(definition, transaction, debugEnabled);
    		}
    
    		// Check definition settings for new transaction.
    		if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    			throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    		}
    
    		// 当前无transaction存在,根据事务传播级别进行控制
    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    			throw new IllegalTransactionStateException(
    					"No existing transaction found for transaction marked with propagation 'mandatory'");
    		}
    		else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    				definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    			SuspendedResourcesHolder suspendedResources = suspend(null);
    			if (debugEnabled) {
    				logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
    			}
    			try {
    				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    				DefaultTransactionStatus status = newTransactionStatus(
    						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    				//真正开始事务
    				doBegin(transaction, definition);
    				//将事务状态存入TransactionSynchronizationManager
    				prepareSynchronization(status, definition);
    				return status;
    			}
    			catch (RuntimeException ex) {
    				resume(null, suspendedResources);
    				throw ex;
    			}
    			catch (Error err) {
    				resume(null, suspendedResources);
    				throw err;
    			}
    		}
    		else {
    			// Create "empty" transaction: no actual transaction, but potentially synchronization.
    			if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    				logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    						"isolation level will effectively be ignored: " + definition);
    			}
    			boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    			return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
    		}
    	}
    
    
    	//获得当前transaction(如果有)
    	protected Object doGetTransaction() {
    		DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    		txObject.setSavepointAllowed(isNestedTransactionAllowed());
    		//从TransactionSynchronizationManager中获得当前线程中的Connection
    		ConnectionHolder conHolder =
    				(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
    		txObject.setConnectionHolder(conHolder, false);
    		return txObject;
    	}
    	
    	...
    	//开始事务
    	protected void doBegin(Object transaction, TransactionDefinition definition) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    		Connection con = null;
    
    		try {
    			if (txObject.getConnectionHolder() == null ||
    					txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    				//从配置中注入的datasource中获得connection
    				Connection newCon = this.dataSource.getConnection();
    				if (logger.isDebugEnabled()) {
    					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    				}
    				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    			}
    
    			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    			con = txObject.getConnectionHolder().getConnection();
    
    			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    			txObject.setPreviousIsolationLevel(previousIsolationLevel);
    
    			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
    			// so we don't want to do it unnecessarily (for example if we've explicitly
    			// configured the connection pool to set it already).
    			if (con.getAutoCommit()) {
    				txObject.setMustRestoreAutoCommit(true);
    				if (logger.isDebugEnabled()) {
    					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
    				}
    				//设置非自动提交
    				con.setAutoCommit(false);
    			}
    			txObject.getConnectionHolder().setTransactionActive(true);
    
    			int timeout = determineTimeout(definition);
    			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
    				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    			}
    
    			// Bind the session holder to the thread.
    			if (txObject.isNewConnectionHolder()) {
    				//将新建的Connection放入TransactionSynchronizationManager
    				TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
    			}
    		}
    
    		catch (Throwable ex) {
    			if (txObject.isNewConnectionHolder()) {
    				DataSourceUtils.releaseConnection(con, this.dataSource);
    				txObject.setConnectionHolder(null, false);
    			}
    			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    		}
    	}
    	
    	
    	protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    		if (status.isNewSynchronization()) {
    			//将transaction的状态保存到TransactionSynchronizationManager
    			TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
    			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
    					definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
    							definition.getIsolationLevel() : null);
    			TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
    			TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
    			TransactionSynchronizationManager.initSynchronization();
    		}
    	}
    
    public abstract class TransactionSynchronizationManager {
    	//ThreadLocal的resources用来保存TransactionStatus Connection等
    	private static final ThreadLocal<Map<Object, Object>> resources =
    			new NamedThreadLocal<>("Transactional resources");
    	...
    	
    	public static Object getResource(Object key) {
    		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    		Object value = doGetResource(actualKey);
    		if (value != null && logger.isTraceEnabled()) {
    			logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
    					Thread.currentThread().getName() + "]");
    		}
    		return value;
    	}
    

    可以看到其实Spring只是简单的将获得的连接和事务信息存放到TransactionSynchronizationManager中的ThreadLoacl变量中进行保存,这样就实现了数据库连接及事务的线程安全。

    Commit/Rollback

    在Commit/rollback阶段是使用从一开始getTransaction方法返回的TransactionStatus(其中存放了connection和transaction的信息)作为参数传入commit/rollback进行commit/rollback并在finally中清理TransactionSynchronizationManager

    //commit部分代码
    	private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    		try {
    			boolean beforeCompletionInvoked = false;
    			try {
    				prepareForCommit(status);
    				triggerBeforeCommit(status);
    				triggerBeforeCompletion(status);
    				beforeCompletionInvoked = true;
    				boolean globalRollbackOnly = false;
    				if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
    					globalRollbackOnly = status.isGlobalRollbackOnly();
    				}
    				if (status.hasSavepoint()) {
    					if (status.isDebug()) {
    						logger.debug("Releasing transaction savepoint");
    					}
    					status.releaseHeldSavepoint();
    				}
    				//如果是自己创建的事物就进行提交,如果不是(比如是嵌套的)就由上层提交
    				else if (status.isNewTransaction()) {
    					if (status.isDebug()) {
    						logger.debug("Initiating transaction commit");
    					}
    					doCommit(status);
    				}
    				// Throw UnexpectedRollbackException if we have a global rollback-only
    				// marker but still didn't get a corresponding exception from commit.
    				if (globalRollbackOnly) {
    					throw new UnexpectedRollbackException(
    							"Transaction silently rolled back because it has been marked as rollback-only");
    				}
    		...
    		}
    		finally {
    			//清除当前TransactionSynchronizationManager中transaction状态和释放申请的连接
    			cleanupAfterCompletion(status);
    		}
    	}
    

    小结

    Spring内部维护了TransactionSynchronizationManager一个单例,并使用ThreadLocal变量记录所有连接事务的信息,这样就防止了线程之间事务、连接的共享,从而实现事务的隔离

  • 相关阅读:
    Android开发 使用 adb logcat 显示 Android 日志
    【嵌入式开发】向开发板中烧写Linux系统-型号S3C6410
    C语言 结构体相关 函数 指针 数组
    C语言 命令行参数 函数指针 gdb调试
    C语言 指针数组 多维数组
    Ubuntu 基础操作 基础命令 热键 man手册使用 关机 重启等命令使用
    C语言 内存分配 地址 指针 数组 参数 实例解析
    CRT 环境变量注意事项
    hadoop 输出文件 key val 分隔符
    com.mysql.jdbc.exceptions.MySQLNonTransientConnectionException: Too many connections
  • 原文地址:https://www.cnblogs.com/resentment/p/5744708.html
Copyright © 2011-2022 走看看