zoukankan      html  css  js  c++  java
  • spring中的mybatis的sqlSession是如何做到线程隔离的?

      项目中常常使用mybatis配合spring进行数据库操作,但是我们知道,数据的操作是要求做到线程安全的,而且按照原来的jdbc的使用方式,每次操作完成之后都要将连接关闭,但是实际使用中我们并没有这么干。

      更让人疑惑的点是,spring中默认使用单例形式来加载bean,而往往我们也不会改变这种默认,所以,是所有线程共享数据连接?

      让我们来看看真相!

    自然是要个栗子的:

    我们来看下spring中配置mybatis数据库操作bean(使用 druid 连接池):

        <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource">
            <property name="url" value="${jdbc.url}" />
            <property name="driverClassName" value="${jdbc.driver}" />
            <property name="username" value="${jdbc.username}" />
            <property name="password" value="${jdbc.password}" />
        </bean>
        
        <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
            <property name="dataSource" ref="dataSource" />
            <property name="configLocation" value="classpath:mybatis-config.xml" />
        </bean>
        
        <!-- scope="prototype" 另说,另讨论,我们先以mapper形式看一下 -->
        <bean id="sqlSession" class="org.mybatis.spring.SqlSessionTemplate">
            <constructor-arg index="0" ref="sqlSessionFactory" />
        </bean>
        
        <!-- 事务 -->
        <bean name="transactionManager"
              class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <property name="dataSource" ref="dataSource"></property>
        </bean>

      而在java代码中使用则是使用依赖注入直接使用 @resource sqlSession, 如下:

        @Resource
        private SqlSessionTemplate sqlSession;
        
        @Override
        public User getUser(Map<String, String> cond) {
            // 此句执行db查询
            User result = sqlSession.selectOne(NAME_SPACE
                    + ".getUser", cond);
            return result;
        }

      这个sqlSession就是直接去操作数据库了看起来是这样,是在bean初始化的时候依赖注入的!

      所以,难道每次进入该操作的时候,sqlSession 的实例都会变化吗?答案是否定的。

      那么,肯定就是往下使用的时候才发生的变化呗!

    再往下走,可以看到,调用了一个代理来进行具体的查询!

      // org/mybatis/spring/SqlSessionTemplate.selectOne()
      public <T> T selectOne(String statement, Object parameter) {
        return this.sqlSessionProxy.<T> selectOne(statement, parameter);
      }

      为啥要用代理呢?自己直接查不就行了吗?其实,用代理是有好处的,那就可以可以进行另外的包装!

      代理是怎么生成的呢?其实只要看一下 SqlSessionTemplate 的构造方法就知道了!

      /**
       * Constructs a Spring managed {@code SqlSession} with the given
       * {@code SqlSessionFactory} and {@code ExecutorType}.
       * A custom {@code SQLExceptionTranslator} can be provided as an
       * argument so any {@code PersistenceException} thrown by MyBatis
       * can be custom translated to a {@code RuntimeException}
       * The {@code SQLExceptionTranslator} can also be null and thus no
       * exception translation will be done and MyBatis exceptions will be
       * thrown
       *
       * @param sqlSessionFactory
       * @param executorType
       * @param exceptionTranslator
       */
      public SqlSessionTemplate(SqlSessionFactory sqlSessionFactory, ExecutorType executorType,
          PersistenceExceptionTranslator exceptionTranslator) {
    
        notNull(sqlSessionFactory, "Property 'sqlSessionFactory' is required");
        notNull(executorType, "Property 'executorType' is required");
    
        this.sqlSessionFactory = sqlSessionFactory;
        this.executorType = executorType;
        this.exceptionTranslator = exceptionTranslator;
        // 生成代理 SqlSessionInterceptor 为 InvocationHandler
        this.sqlSessionProxy = (SqlSession) newProxyInstance(
            SqlSessionFactory.class.getClassLoader(),
            new Class[] { SqlSession.class },
            new SqlSessionInterceptor());
      }

      从上面的代码,看不到细节,但是,大致还是知道代理的具体实现了!即使用 SqlSessionInterceptor 去处理具体查询逻辑!

    我们来看下 SqlSessionInterceptor  的实现! 

      /**
       * Proxy needed to route MyBatis method calls to the proper SqlSession got
       * from Spring's Transaction Manager
       * It also unwraps exceptions thrown by {@code Method#invoke(Object, Object...)} to
       * pass a {@code PersistenceException} to the {@code PersistenceExceptionTranslator}.
       */
      private class SqlSessionInterceptor implements InvocationHandler {
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
          SqlSession sqlSession = getSqlSession(
              SqlSessionTemplate.this.sqlSessionFactory,
              SqlSessionTemplate.this.executorType,
              SqlSessionTemplate.this.exceptionTranslator);
          try {
            Object result = method.invoke(sqlSession, args);
            if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
              // force commit even on non-dirty sessions because some databases require
              // a commit/rollback before calling close()
              sqlSession.commit(true);
            }
            return result;
          } catch (Throwable t) {
            Throwable unwrapped = unwrapThrowable(t);
            if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
              // release the connection to avoid a deadlock if the translator is no loaded. See issue #22
              closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
              sqlSession = null;
              Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
              if (translated != null) {
                unwrapped = translated;
              }
            }
            throw unwrapped;
          } finally {
            if (sqlSession != null) {
              closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
            }
          }
        }
      }

      SqlSessionInterceptor 是 SqlSessionTemplate 的内部类,目的只有一个,就是处理多个 session 的db操作!

      所有请求都被 invoke() 拦截,从而做相应处理:

        1. 进入请求,先生成一个新的sqlSession,为本次db操作做准备;

        2. 通过反射调用请求进来的方法,将 sqlSession 回调,进行复杂查询及结果映射;

        3. 如果需要立即提交事务,do it;

        4. 如果出现异常,包装异常信息,重新抛出;

        5. 操作完成后,关闭本次session;

    到这里,其实我们好像已经明白了,其实外面的 sqlSession 单例,并不会影响具体的db操作控制,所以不用担心session的线程安全问题!

      不过,还有个点值得考虑下,如果我一次请求里有多次数据库操作,难道我真的要创建多个sqlSession或者说数据库连接?不会吧!

      如果这个问题得不到解决,可能你并不真正了解session的定义了!

    所以我们需要继续看一下 session 到底是怎么获取的?

      getSqlSession() 方法是在 SqlSessionUtils 中实现的!如下:

      /**
       * Gets an SqlSession from Spring Transaction Manager or creates a new one if needed.
       * Tries to get a SqlSession out of current transaction. If there is not any, it creates a new one.
       * Then, it synchronizes the SqlSession with the transaction if Spring TX is active and
       * <code>SpringManagedTransactionFactory</code> is configured as a transaction manager.
       *
       * @param sessionFactory a MyBatis {@code SqlSessionFactory} to create new sessions
       * @param executorType The executor type of the SqlSession to create
       * @param exceptionTranslator Optional. Translates SqlSession.commit() exceptions to Spring exceptions.
       * @throws TransientDataAccessResourceException if a transaction is active and the
       *             {@code SqlSessionFactory} is not using a {@code SpringManagedTransactionFactory}
       * @see SpringManagedTransactionFactory
       */
      public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
    
        notNull(sessionFactory, "No SqlSessionFactory specified");
        notNull(executorType, "No ExecutorType specified");
    
        SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
    
        // 如果已经有holder,则直接返回,复用连接
        if (holder != null && holder.isSynchronizedWithTransaction()) {
          if (holder.getExecutorType() != executorType) {
            throw new TransientDataAccessResourceException("Cannot change the ExecutorType when there is an existing transaction");
          }
    
          holder.requested();
    
          if (logger.isDebugEnabled()) {
            logger.debug("Fetched SqlSession [" + holder.getSqlSession() + "] from current transaction");
          }
    
          return holder.getSqlSession();
        }
    
        if (logger.isDebugEnabled()) {
          logger.debug("Creating a new SqlSession");
        }
    
        SqlSession session = sessionFactory.openSession(executorType);
    
        // Register session holder if synchronization is active (i.e. a Spring TX is active)
        //
        // Note: The DataSource used by the Environment should be synchronized with the
        // transaction either through DataSourceTxMgr or another tx synchronization.
        // Further assume that if an exception is thrown, whatever started the transaction will
        // handle closing / rolling back the Connection associated with the SqlSession.
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
          Environment environment = sessionFactory.getConfiguration().getEnvironment();
    
          if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
            if (logger.isDebugEnabled()) {
              logger.debug("Registering transaction synchronization for SqlSession [" + session + "]");
            }
    
            holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
            TransactionSynchronizationManager.bindResource(sessionFactory, holder);
            TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
            holder.setSynchronizedWithTransaction(true);
            holder.requested();
          } else {
            if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
              if (logger.isDebugEnabled()) {
                logger.debug("SqlSession [" + session + "] was not registered for synchronization because DataSource is not transactional");
              }
            } else {
              throw new TransientDataAccessResourceException(
                  "SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
            }
          }
        } else {
          if (logger.isDebugEnabled()) {
            logger.debug("SqlSession [" + session + "] was not registered for synchronization because synchronization is not active");
          }
        }
    
        return session;
      }

    如上获取 sqlSession 逻辑,主要分两种情况!

      1. 如果存在holder,则返回原有的sqlSession,到于这个holder我们稍后再说;
      2. 如果没有,则创建一个新连接!

      所以,看起来情况还不是太糟,至少有复用的概念了!

    那么问题来了,复用?如何做到线程安全?所以我们要看下 SqlSessionHolder 的实现了!

      获取holder是通过 TransactionSynchronizationManager.getResource(sessionFactory); 获取的:

        public static Object getResource(Object key) {
            Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
            // 实际获取
            Object value = doGetResource(actualKey);
            if (value != null && logger.isTraceEnabled()) {
                logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
                        Thread.currentThread().getName() + "]");
            }
            return value;
        }
        
        private static Object doGetResource(Object actualKey) {
            Map<Object, Object> map = resources.get();
            if (map == null) {
                return null;
            }
            Object value = map.get(actualKey);
            // Transparently remove ResourceHolder that was marked as void...
            if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
                map.remove(actualKey);
                // Remove entire ThreadLocal if empty...
                if (map.isEmpty()) {
                    resources.remove();
                }
                value = null;
            }
            return value;
        }

      咱们忽略对 key 的处理,实际是直接调用 doGetResource() 获取holder.
      而 doGetResource() 中,则使用了 resources 来保存具体的 kv。 resources 明显是个共享变量,但是看起来这里没有任何的加锁操作!这是为何?
      只要看一下 resources 的定义就知道了,其实现为 ThreadLocal, 所以是线程安全了!

        private static final ThreadLocal<Map<Object, Object>> resources =
                new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

      在新的请求进来时,自然是没有值的,所以直接返回null.而后续进入,则获取缓存返回!

    而对于没有获取到 holder 的情况,则需要重新创建一个 session 了!

      这里获取session由DefaultSqlSessionFactory 进行创建!如下:

      // org.apache.ibatis.session.defaults.DefaultSqlSessionFactory.openSession()
      public SqlSession openSession(ExecutorType execType) {
        return openSessionFromDataSource(execType, null, false);
      }
      
      private SqlSession openSessionFromDataSource(ExecutorType execType, TransactionIsolationLevel level, boolean autoCommit) {
        Transaction tx = null;
        try {
          final Environment environment = configuration.getEnvironment();
          // SpringManagedTransactionFactory
          final TransactionFactory transactionFactory = getTransactionFactoryFromEnvironment(environment);
          tx = transactionFactory.newTransaction(environment.getDataSource(), level, autoCommit);
          final Executor executor = configuration.newExecutor(tx, execType);
          return new DefaultSqlSession(configuration, executor, autoCommit);
        } catch (Exception e) {
          closeTransaction(tx); // may have fetched a connection so lets call close()
          throw ExceptionFactory.wrapException("Error opening session.  Cause: " + e, e);
        } finally {
          ErrorContext.instance().reset();
        }
      }

    创建 session 几件事:
      1. 根据环境配置,开启一个新事务,该事务管理器会负责后续jdbc连接管理工作;
      2. 根据事务创建一个 Executor,备用;
      3. 用DefaultSqlSession 将 executor 包装后返回,用于后续真正的db操作;

    至此,真正的 sqlSession 已经创建成功!返回后,就可以真正使用了!

    等等,创建的session好像并没有保存,那么还是那个问题,每个sql都会创建一个 sqlSession ? 好吧,是这样的!前面的holder,只是用于存在事务操作的连接!(holder的理解出了偏差哦)

      但是有一点,这里虽然创建了多个 sqlSession 实例,但是并不意味着有多个db连接,具体使用db连接时,则一般会会使用连接池来进行优化!如前面提到的 druid 就是个不错的选择!

    真实的jdbc连接获取,是在进行真正的 query 时,才进行调用 getConnection() 进行接入!

    具体则是在 doQuery() 时,进行st的组装时调用的 ,如下:

        // SimpleExecutor.prepareStatement()
      private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
        Statement stmt;
        // 获取 jdbc 连接,返回 java.sql.Connection
        Connection connection = getConnection(statementLog);
        stmt = handler.prepare(connection);
        handler.parameterize(stmt);
        return stmt;
      }
      
      // 调用 BaseExecutor.getConnection()
      protected Connection getConnection(Log statementLog) throws SQLException {
        // SpringManagedTransaction 管理 connection
        Connection connection = transaction.getConnection();
        if (statementLog.isDebugEnabled()) {
          return ConnectionLogger.newInstance(connection, statementLog, queryStack);
        } else {
          return connection;
        }
      }

      通过前面通过事务管理工厂创建的 SpringManagedTransaction 进行 connection 获取!一个事务管理器只会存在一次获取数据库连接的操作!

      public Connection getConnection() throws SQLException {
        if (this.connection == null) {
          openConnection();
        }
        return this.connection;
      }
      
      // 而 SpringManagedTransaction 又将connection交由 DataSourceUtils 进行管理!
      // org/springframework/jdbc/datasource/DataSourceUtils
        public static Connection getConnection(DataSource dataSource) throws CannotGetJdbcConnectionException {
            try {
                // 真正的连接获取
                return doGetConnection(dataSource);
            }
            catch (SQLException ex) {
                throw new CannotGetJdbcConnectionException("Could not get JDBC Connection", ex);
            }
        }
    
        /**
         * Actually obtain a JDBC Connection from the given DataSource.
         * Same as {@link #getConnection}, but throwing the original SQLException.
         * <p>Is aware of a corresponding Connection bound to the current thread, for example
         * when using {@link DataSourceTransactionManager}. Will bind a Connection to the thread
         * if transaction synchronization is active (e.g. if in a JTA transaction).
         * <p>Directly accessed by {@link TransactionAwareDataSourceProxy}.
         * @param dataSource the DataSource to obtain Connections from
         * @return a JDBC Connection from the given DataSource
         * @throws SQLException if thrown by JDBC methods
         * @see #doReleaseConnection
         */
        public static Connection doGetConnection(DataSource dataSource) throws SQLException {
            Assert.notNull(dataSource, "No DataSource specified");
    
            ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
            if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
                conHolder.requested();
                if (!conHolder.hasConnection()) {
                    logger.debug("Fetching resumed JDBC Connection from DataSource");
                    conHolder.setConnection(dataSource.getConnection());
                }
                return conHolder.getConnection();
            }
            // Else we either got no holder or an empty thread-bound holder here.
    
            logger.debug("Fetching JDBC Connection from DataSource");
            // 通过接入的dataSource进行连接获取,这里将会是最终的jdbc连接
            Connection con = dataSource.getConnection();
    
            if (TransactionSynchronizationManager.isSynchronizationActive()) {
                logger.debug("Registering transaction synchronization for JDBC Connection");
                // Use same Connection for further JDBC actions within the transaction.
                // Thread-bound object will get removed by synchronization at transaction completion.
                ConnectionHolder holderToUse = conHolder;
                if (holderToUse == null) {
                    holderToUse = new ConnectionHolder(con);
                }
                else {
                    holderToUse.setConnection(con);
                }
                holderToUse.requested();
                TransactionSynchronizationManager.registerSynchronization(
                        new ConnectionSynchronization(holderToUse, dataSource));
                holderToUse.setSynchronizedWithTransaction(true);
                if (holderToUse != conHolder) {
                    TransactionSynchronizationManager.bindResource(dataSource, holderToUse);
                }
            }
    
            return con;
        }

    上面的实现主要做三件事:
      1. 再次确认,是否存在事务处理,holder是否存在,如果有则复用;
      2. 如果没有,那再从数据源处获取连接;
      3. 获取新连接成功后,检查如果存在事务,则将新获取的连接放入holder中保存起来,以备下次使用;

      获取jdbc连接后,就可以真正发起execute()查询了。

      数据库连接的疑问算是解答了!我们发现,外部的框架并没有多少为我们节省db连接的动作!而是把最终 getConnection() 交给 datasource 数据源!

      而真正解决我们连接复用的问题的,是像 Druid 这样的连接池组件!所以,咱们可以单独来看这些中间件了!

  • 相关阅读:
    【git hub使用】
    【struct2 第一天】
    【JSP基础 第一天】
    【Java基础学习 day01】
    网站建设 【Django】 【MTV】
    Python-Json字符串和XML解析
    Python-冒泡和快排
    Python-面向对象编程
    练习-字符串编码
    练习-统计文件中单词数量
  • 原文地址:https://www.cnblogs.com/yougewe/p/10072740.html
Copyright © 2011-2022 走看看