zoukankan      html  css  js  c++  java
  • Mybatis数据源结构解析之连接池

    对于 ORM 框架而言,数据源的组织是一个非常重要的一部分,这直接影响到框架的性能问题。本文将通过对 MyBatis 框架的数据源结构进行详尽的分析,找出什么时候创建 Connection ,并且深入解析 MyBatis 的连接池。


    本章的组织结构:

    • 零、什么是连接池和线程池
    • 一、MyBatis 数据源 DataSource 分类
    • 二、数据源 DataSource 的创建过程
    • 三、 DataSource 什么时候创建 Connection 对象
    • 四、不使用连接池的 UnpooledDataSource
    • 五、使用了连接池的 PooledDataSource

    连接池和线程池

    连接池:(降低物理连接损耗)

    • 1、连接池是面向数据库连接的
    • 2、连接池是为了优化数据库连接资源
    • 3、连接池有点类似在客户端做优化

    数据库连接是一项有限的昂贵资源,一个数据库连接对象均对应一个物理数据库连接,每次操作都打开一个物理连接,使用完都关闭连接,这样造成系统的性能低下。 数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并将这些连接组成一个连接池,由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。并且应用程序可以根据池中连接的使用率,动态增加或减少池中的连接数。


    线程池:(降低线程创建销毁损耗)

    • 1、线程池是面向后台程序的
    • 2、线程池是是为了提高内存和CPU效率
    • 3、线程池有点类似于在服务端做优化

    线程池是一次性创建一定数量的线程(应该可以配置初始线程数量的),当用请求过来不用去创建新的线程,直接使用已创建的线程,使用后又放回到线程池中。
    避免了频繁创建线程,及销毁线程的系统开销,提高是内存和CPU效率。

    相同点:

    都是事先准备好资源,避免频繁创建和销毁的代价。

    数据源的分类

    在Mybatis体系中,分为3DataSource

    打开Mybatis源码找到datasource包,可以看到3个子package

    • UNPOOLED 不使用连接池的数据源

    • POOLED 使用连接池的数据源

    • JNDI 使用JNDI实现的数据源

    MyBatis内部分别定义了实现了java.sql.DataSource接口的UnpooledDataSource,PooledDataSource类来表示UNPOOLED、POOLED类型的数据源。 如下图所示:

    • PooledDataSource和UnpooledDataSrouce都实现了java.sql.DataSource接口.
    • PooledDataSource持有一个UnPooledDataSource的引用,当PooledDataSource要创建Connection实例时,实际还是通过UnPooledDataSource来创建的.(PooledDataSource)只是提供一种缓存连接池机制.

    JNDI类型的数据源DataSource,则是通过JNDI上下文中取值。

    数据源 DataSource 的创建过程

    在mybatis的XML配置文件中,使用元素来配置数据源:

    <!-- 配置数据源(连接池) -->
    <dataSource type="POOLED"> //这里 type 属性的取值就是为POOLED、UNPOOLED、JNDI
      <property name="driver" value="${jdbc.driver}"/>
      <property name="url" value="${jdbc.url}"/>
      <property name="username" value="${jdbc.username}"/>
      <property name="password" value="${jdbc.password}"/>
    </dataSource>
    

    MyBatis在初始化时,解析此文件,根据<dataSource>的type属性来创建相应类型的的数据源DataSource,即:

    • type=”POOLED” :创建PooledDataSource实例

    • type=”UNPOOLED” :创建UnpooledDataSource实例

    • type=”JNDI” :从JNDI服务上查找DataSource实例


      Mybatis是通过工厂模式来创建数据源对象的 我们来看看源码:

      public interface DataSourceFactory {
      
      void setProperties(Properties props);
      
      DataSource getDataSource();//生产DataSource
      
      }
      

      上述3种类型的数据源,对应有自己的工厂模式,都实现了这个DataSourceFactory

    MyBatis创建了DataSource实例后,会将其放到Configuration对象内的Environment对象中, 供以后使用。

    注意dataSource 此时只会保存好配置信息.连接池此时并没有创建好连接.只有当程序在调用操作数据库的方法时,才会初始化连接.

    DataSource什么时候创建Connection对象

    我们需要创建SqlSession对象并需要执行SQL语句时,这时候MyBatis才会去调用dataSource对象来创建java.sql.Connection对象。也就是说,java.sql.Connection对象的创建一直延迟到执行SQL语句的时候。

    例子:

      @Test
      public void testMyBatisBuild() throws IOException {
          Reader reader = Resources.getResourceAsReader("mybatis-config.xml");
          SqlSessionFactory factory = new SqlSessionFactoryBuilder().build(reader);
          SqlSession sqlSession = factory.openSession();
          TestMapper mapper = sqlSession.getMapper(TestMapper.class);
          Ttest one = mapper.getOne(1L);//直到这一行,才会去创建一个数据库连接!
          System.out.println(one);
          sqlSession.close();
      }
    

    口说无凭,跟进源码看看他们是在什么时候创建的...

    跟进源码,验证Datasource 和Connection对象创建时机

    验证Datasource创建时机
    • 上面我们已经知道,pooled数据源实际上也是使用的unpooled的实例,那么我们在UnpooledDataSourceFactory的
      getDataSource方法的源码中做一些修改 并运行测试用例:
    @Override
    public DataSource getDataSource() {//此方法是UnpooledDataSourceFactory实现DataSourceFactory复写
      System.out.println("创建了数据源");
      System.out.println(dataSource.toString());
      return dataSource;
    }
    

    结论:在创建完SqlsessionFactory时,DataSource实例就创建了.


    验证Connection创建时机

    首先我们先查出现在数据库的所有连接数,在数据库中执行

    SELECT * FROM performance_schema.hosts;
    

    返回数据: 显示当前连接数为1,总连接数70

    show full processlist; //显示所有的任务列表
    

    返回:当前只有一个查询的连接在运行

    重新启动项目,在运行到需要执行实际的sql操作时,可以看到他已经被代理增强了

    直到此时,连接数还是没有变,说明连接还没有创建,我们接着往下看.

    我们按F7进入方法,可以看到,他被代理,,这时候会执行到之前的代理方法中调用invoke方法.这里有一个判断,但是并不成立,于是进入cachedInvoker(method).invoke()方法代理执行一下操作

    cachedInvoker(method).invoke()方法

      
        @Override
        public Object invoke(Object proxy, Method method, Object[] args, SqlSession sqlSession) throws Throwable {
          return mapperMethod.execute(sqlSession, args);
        }
    

    继续F7进入方法,由于我们是单条查询select 所以会case进入select块中的selectOne

    继续F7

    继续F7

    通过configuration.getMappedStatement获取MappedStatement

    单步步过,F8后,进入executor.query方法

    @Override
    public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
      BoundSql boundSql = ms.getBoundSql(parameterObject);
      CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
      return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
    }
    

    继续走到底,F7进入query方法

    此时,会去缓存中查询,这里的缓存是二级缓存对象 ,生命周期是mapper级别的(一级缓存是一个session级别的),因为我们此时是第一次运行程序,所以肯定为Null,这时候会直接去查询,调用delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql)方法,F7进入这个方法

    二级缓存没有获取到,又去查询了一级缓存,发现一级缓存也没有,这个时候,才去查数据库

    queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);//没有缓存则去db查

    F7进入queryFromDatabase方法.看到是一些对一级缓存的操作,我们主要看doQuery方法F7进入它.

    可以看到它准备了一个空的Statement

    我们F7跟进看一下prepareStatement方法 ,发现他调用了getConnection,哎!有点眼熟了,继续F7进入getConnection()方法,

    又是一个getConnection()....继续F7进入transaction.getConnection()方法

    又是一个getConnection()方法.判断connection是否为空.为空openConnection()否则直接返回connection;我们F7继续跟进openConnection()方法

      protected void openConnection() throws SQLException {
      if (log.isDebugEnabled()) {
        log.debug("Opening JDBC Connection");
      }
      connection = dataSource.getConnection();//最终获取连接的地方在这句.
      if (level != null) {
        connection.setTransactionIsolation(level.getLevel());//设置隔离等级
      }
      setDesiredAutoCommit(autoCommit);//是否自动提交,默认false,update不会提交到数据库,需要手动commit
    }
    

    dataSource.getConnection()执行完,至此一个connection才创建完成.
    我们验证一下 在dataSource.getConnection()时打一下断点.

    此时数据库中的连接数依然没变 还是1

    我们按F8 执行一步

    在控制台可以看到connection = com.mysql.jdbc.JDBC4Connection@1500b2f3 实例创建完毕 我们再去数据库中看看连接数

    两个连接分别是:

    不使用连接池的 UnpooledDataSource

    的type属性被配置成了”UNPOOLED”,MyBatis首先会实例化一个UnpooledDataSourceFactory工厂实例,然后通过.getDataSource()方法返回一个UnpooledDataSource实例对象引用,我们假定为dataSource。
    使用UnpooledDataSource的getConnection(),每调用一次就会产生一个新的Connection实例对象。

    UnPooledDataSource的getConnection()方法实现如下:

    /*
    UnpooledDataSource的getConnection()实现
    */
    public Connection getConnection() throws SQLException
    {
      return doGetConnection(username, password);
    }
    
    private Connection doGetConnection(String username, String password) throws SQLException
    {
      //封装username和password成properties
      Properties props = new Properties();
      if (driverProperties != null)
      {
          props.putAll(driverProperties);
      }
      if (username != null)
      {
          props.setProperty("user", username);
      }
      if (password != null)
      {
          props.setProperty("password", password);
      }
      return doGetConnection(props);
    }
    
    /*
    *  获取数据连接
    */
    private Connection doGetConnection(Properties properties) throws SQLException
    {
      //1.初始化驱动
      initializeDriver();
      //2.从DriverManager中获取连接,获取新的Connection对象
      Connection connection = DriverManager.getConnection(url, properties);
      //3.配置connection属性
      configureConnection(connection);
      return connection;
    }
    

    UnpooledDataSource会做以下几件事情:

      1. 初始化驱动: 判断driver驱动是否已经加载到内存中,如果还没有加载,则会动态地加载driver类,并实例化一个Driver对象,使用DriverManager.registerDriver()方法将其注册到内存中,以供后续使用。
      1. 创建Connection对象: 使用DriverManager.getConnection()方法创建连接。
      1. 配置Connection对象: 设置是否自动提交autoCommit和隔离级别isolationLevel。
      1. 返回Connection对象。

    我们每调用一次getConnection()方法,都会通过DriverManager.getConnection()返回新的java.sql.Connection实例,这样当然对于资源是一种浪费,为了防止重复的去创建和销毁连接,于是引入了连接池的概念.

    使用了连接池的 PooledDataSource

    要理解连接池,首先要了解它对于connection的容器,它使用PoolState容器来管理所有的conncetion


    在PoolState中,它将connection分为两种状态,空闲状态(idle)活动状态(active),他们分别被存储到PoolState容器内的idleConnectionsactiveConnections两个ArrayList中

    • idleConnections:空闲(idle)状态PooledConnection对象被放置到此集合中,表示当前闲置的没有被使用的PooledConnection集合,调用PooledDataSource的getConnection()方法时,会优先从此集合中取PooledConnection对象。当用完一个java.sql.Connection对象时,MyBatis会将其包裹成PooledConnection对象放到此集合中。

    • activeConnections:活动(active)状态的PooledConnection对象被放置到名为activeConnections的ArrayList中,表示当前正在被使用的PooledConnection集合,调用PooledDataSource的getConnection()方法时,会优先从idleConnections集合中取PooledConnection对象,如果没有,则看此集合是否已满,如果未满,PooledDataSource会创建出一个PooledConnection,添加到此集合中,并返回。

    从连接池中获取一个连接对象的过程

    下面让我们看一下PooledDataSource 的popConnection方法获取Connection对象的实现:

    private PooledConnection popConnection(String username, String password) throws SQLException {
        boolean countedWait = false;
        PooledConnection conn = null;
        long t = System.currentTimeMillis();
        int localBadConnectionCount = 0;
    
        while (conn == null) {
          synchronized (state) {//给state对象加锁
            if (!state.idleConnections.isEmpty()) {//如果空闲列表不空,就从空闲列表中拿connection
              // Pool has available connection
              conn = state.idleConnections.remove(0);//拿出空闲列表中的第一个,去验证连接是否还有效
              if (log.isDebugEnabled()) {
                log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
              }
            } else {
              // 空闲连接池中没有可用的连接,就来看看活跃连接列表中是否有..先判断活动连接总数 是否小于 最大可用的活动连接数
              if (state.activeConnections.size() < poolMaximumActiveConnections) {
                // 如果连接数小于list.size 直接创建新的连接.
                conn = new PooledConnection(dataSource.getConnection(), this);
                if (log.isDebugEnabled()) {
                  log.debug("Created connection " + conn.getRealHashCode() + ".");
                }
              } else {
                // 此时连接数也满了,不能创建新的连接. 找到最老的那个,检查它是否过期
                //计算它的校验时间,如果校验时间大于连接池规定的最大校验时间,则认为它已经过期了
                // 利用这个PoolConnection内部的realConnection重新生成一个PooledConnection
                PooledConnection oldestActiveConnection = state.activeConnections.get(0);
                long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                if (longestCheckoutTime > poolMaximumCheckoutTime) {
                  // 可以要求过期这个连接.
                  state.claimedOverdueConnectionCount++;
                  state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
                  state.accumulatedCheckoutTime += longestCheckoutTime;
                  state.activeConnections.remove(oldestActiveConnection);
                  if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                    try {
                      oldestActiveConnection.getRealConnection().rollback();
                    } catch (SQLException e) {
                      /*
                         Just log a message for debug and continue to execute the following
                         statement like nothing happened.
                         Wrap the bad connection with a new PooledConnection, this will help
                         to not interrupt current executing thread and give current thread a
                         chance to join the next competition for another valid/good database
                         connection. At the end of this loop, bad {@link @conn} will be set as null.
                       */
                      log.debug("Bad connection. Could not roll back");
                    }
                  }
                  conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
                  conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
                  conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
                  oldestActiveConnection.invalidate();
                  if (log.isDebugEnabled()) {
                    log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
                  }
                } else {
                  //如果不能释放,则必须等待
                  // Must wait
                  try {
                    if (!countedWait) {
                      state.hadToWaitCount++;
                      countedWait = true;
                    }
                    if (log.isDebugEnabled()) {
                      log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                    }
                    long wt = System.currentTimeMillis();
                    state.wait(poolTimeToWait);
                    state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                  } catch (InterruptedException e) {
                    break;
                  }
                }
              }
            }
            if (conn != null) {
              // ping to server and check the connection is valid or not
              if (conn.isValid()) {//去验证连接是否还有效.
                if (!conn.getRealConnection().getAutoCommit()) {
                  conn.getRealConnection().rollback();
                }
                conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
                conn.setCheckoutTimestamp(System.currentTimeMillis());
                conn.setLastUsedTimestamp(System.currentTimeMillis());
                state.activeConnections.add(conn);
                state.requestCount++;
                state.accumulatedRequestTime += System.currentTimeMillis() - t;
              } else {
                if (log.isDebugEnabled()) {
                  log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
                }
                state.badConnectionCount++;
                localBadConnectionCount++;
                conn = null;
                if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
                  if (log.isDebugEnabled()) {
                    log.debug("PooledDataSource: Could not get a good connection to the database.");
                  }
                  throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
                }
              }
            }
          }
    
        }
    
        if (conn == null) {
          if (log.isDebugEnabled()) {
            log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
          }
          throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
        }
    
        return conn;
      }
    

    如上所示,对于PooledDataSource的getConnection()方法内,先是调用类PooledDataSource的popConnection()方法返回了一个PooledConnection对象,然后调用了PooledConnection的getProxyConnection()来返回Connection对象。

    复用连接的过程

    如果我们使用了连接池,我们在用完了Connection对象时,需要将它放在连接池中,该怎样做呢? 如果让我们来想的话,应该是通过代理Connection对象,在调用close时,并不真正关闭,而是丢到管理连接的容器中去. 要验证这个想法 那么 来看看Mybatis帮我们怎么实现复用连接的.

    class PooledConnection implements InvocationHandler {
    
      //......
      //所创建它的datasource引用
      private PooledDataSource dataSource;
      //真正的Connection对象
      private Connection realConnection;
      //代理自己的代理Connection
      private Connection proxyConnection;
    
      //......
    }
    

    PooledConenction实现了InvocationHandler接口,并且,proxyConnection对象也是根据这个它来生成的代理对象:

    public PooledConnection(Connection connection, PooledDataSource dataSource) {
        this.hashCode = connection.hashCode();
        this.realConnection = connection;//真实连接
        this.dataSource = dataSource;
        this.createdTimestamp = System.currentTimeMillis();
        this.lastUsedTimestamp = System.currentTimeMillis();
        this.valid = true;
        this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
      }
    

    实际上,我们调用PooledDataSource的getConnection()方法返回的就是这个proxyConnection对象。
    当我们调用此proxyConnection对象上的任何方法时,都会调用PooledConnection对象内invoke()方法。
    让我们看一下PooledConnection类中的invoke()方法定义:

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        //当调用关闭的时候,回收此Connection到PooledDataSource中
        if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
          dataSource.pushConnection(this);
          return null;
        } else {
          try {
            if (!Object.class.equals(method.getDeclaringClass())) {
              checkConnection();
            }
            return method.invoke(realConnection, args);
          } catch (Throwable t) {
            throw ExceptionUtil.unwrapThrowable(t);
          }
        }
      }
    

    结论:当我们使用了pooledDataSource.getConnection()返回的Connection对象的close()方法时,不会调用真正Connection的close()方法,而是将此Connection对象放到连接池中。调用dataSource.pushConnection(this)实现

    protected void pushConnection(PooledConnection conn) throws SQLException {
    
        synchronized (state) {
          state.activeConnections.remove(conn);
          if (conn.isValid()) {
            if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
              state.accumulatedCheckoutTime += conn.getCheckoutTime();
              if (!conn.getRealConnection().getAutoCommit()) {
                conn.getRealConnection().rollback();
              }
              PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
              state.idleConnections.add(newConn);
              newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
              newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
              conn.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
              }
              state.notifyAll();
            } else {
              state.accumulatedCheckoutTime += conn.getCheckoutTime();
              if (!conn.getRealConnection().getAutoCommit()) {
                conn.getRealConnection().rollback();
              }
              conn.getRealConnection().close();
              if (log.isDebugEnabled()) {
                log.debug("Closed connection " + conn.getRealHashCode() + ".");
              }
              conn.invalidate();
            }
          } else {
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
            }
            state.badConnectionCount++;
          }
        }
      }
    

    关注公众号:java宝典
    a

  • 相关阅读:
    Docker-CentOS系统安装Docker
    Docker-准备Docker环境
    Docker系列-文章汇总
    商品订单库存一致性问题的思考
    java模板、工厂设计模式在项目中的重构
    2018Java年底总结
    java的AQS中enp没有同步代码块为啥是原子操作
    java使用awt包在生产环境docker部署时出现中文乱码的处理
    初探装饰器模式
    开灯问题
  • 原文地址:https://www.cnblogs.com/java-bible/p/14066834.html
Copyright © 2011-2022 走看看