zoukankan      html  css  js  c++  java
  • mybatis——datasource

    本来准备看下DruidDatasource的,发现太复杂了,还是研究一下mybatis的datasource吧,目的:

    • 弄清datasource的基本原理
    • 简单的调优

    mybatis的datasource实现:无连接池实现的UnpooledDataSource,连接池实现的PooledDataSource

     一、DataSource

    DataSource是JDK中提供数据库库连接的顶级接口。DataSource类图

     主要实现:

    getConnection():获取一个数据库连接。

    getLogWriter():日志

    setLoginTimeout:数据源在尝试连接数据库时的最大等待时间

    wrapper:包装器,例如BeanWrapper包装Bean,允许实例包装datasource。

    二、UnpooledDataSource

     UnpooledDataSource是一个无连接池实现的DataSource,JDBC的简单封装,开启一个连接,执行完关闭连接。

    1、变量与构造方法

      private ClassLoader driverClassLoader;//Driver的类加载器
      private Properties driverProperties;//启动属性
      private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<String, Driver>();//驱动程序容器
    
      private String driver;//数据库连接驱动程序
      private String url;//数据库连接url
      private String username;//用户名
      private String password;//密码
    
      private Boolean autoCommit;//自动提交
      private Integer defaultTransactionIsolationLevel;//默认事务隔离级别
    
      static {
        //类加载时,扫描DriverMananger中所有的数据库连接驱动程序
        Enumeration<Driver> drivers = DriverManager.getDrivers();
        while (drivers.hasMoreElements()) {
          Driver driver = drivers.nextElement();
          registeredDrivers.put(driver.getClass().getName(), driver);
        }
      }
    
      public UnpooledDataSource() {
      }
    
      public UnpooledDataSource(String driver, String url, String username, String password) {
        this.driver = driver;
        this.url = url;
        this.username = username;
        this.password = password;
      }
    
      public UnpooledDataSource(String driver, String url, Properties driverProperties) {
        this.driver = driver;
        this.url = url;
        this.driverProperties = driverProperties;
      }
    
      public UnpooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
        this.driverClassLoader = driverClassLoader;
        this.driver = driver;
        this.url = url;
        this.username = username;
        this.password = password;
      }
    
      public UnpooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
        this.driverClassLoader = driverClassLoader;
        this.driver = driver;
        this.url = url;
        this.driverProperties = driverProperties;
      }

    2、主要实现方法

    getConnection():获取数据库连接,JDBC的封装使用。

    /* org.apache.ibatis.datasource.unpooled.UnpooledDataSource*/
      public Connection getConnection() throws SQLException {
        return doGetConnection(username, password);
      }
    
      public Connection getConnection(String username, String password) throws SQLException {
        return doGetConnection(username, password);
      }
    
      private Connection doGetConnection(String username, String password) throws SQLException {
        Properties props = new Properties();
        if (driverProperties != null) {
          //设置driverProperties文件到属性对象中
          props.putAll(driverProperties);
        }
        if (username != null) {
          //设置username到属性对象中,注意显式配置username会覆盖driverPropertie文件中的
          props.setProperty("user", username);
        }
        if (password != null) {
          //设置password到属性对象中中,注意显式配置password会覆盖driverPropertie文件中的
          props.setProperty("password", password);
        }
        return doGetConnection(props);
      }
    
      private Connection doGetConnection(Properties properties) throws SQLException {
        //启动驱动程序
        //若在registerDrivers中(已启动),直接返回
        //若不在registerDrivers中,使用driverClassLoader(不为空)加载driver类,启动驱动程序
       //若driverClassLoader为空,所有的类加载器,遍历加载一次,直到能加载为止
        initializeDriver();
        //JDBC获取连接的语句
        Connection connection = DriverManager.getConnection(url, properties);
        //设置连接属性
        //自动提交autocommit
        //默认事务隔离级别:defaultTransactionIsolationLevel
        configureConnection(connection);
        return connection;
      }

    3、总结

    UnpooledDataSource就是JDBC的简单封装,没有其他逻辑。

    ① 类似工厂方法提供一个连接方法,getConnection().

    ② 启动数据库连接驱动程序driver,类加载器不是我以为的线程上下文类加载器,而是遍历所有的类加载器,直到找到合适的类加载器加载成功(cathc异常)。

    三、PooledDataSource

    PooledDataSource是数据库连接的连接池实现。前面研究过线程池ThreadPoolExecutor实现,多个线程+队列。这里猜测应该也是用的实现差不多,使用队列实现,不同在于这里队列里的元素时connection。

    1、变量与构造方法

      private final PoolState state = new PoolState(this);//连接池中连接存放的容器(两个队列)
    
      private final UnpooledDataSource dataSource;//装饰者模式,PooledDataSource是一个增强的UnpooledDataSource
    
      // OPTIONAL CONFIGURATION FIELDS
      protected int poolMaximumActiveConnections = 10;//连接池中最大活跃连接数
      protected int poolMaximumIdleConnections = 5;//连接池中最大空闲连接数
      protected int poolMaximumCheckoutTime = 20000;//连接池中活跃连接的超时时间
      protected int poolTimeToWait = 20000;//连接池满时,获取连接的等待时间
      protected int poolMaximumLocalBadConnectionTolerance = 3;//连接池中坏连接容忍数
      protected String poolPingQuery = "NO PING QUERY SET";//获取连接后,ping数据库连接检测时执行的sql,(开启检测后需要设置,否则检测无效)
      protected boolean poolPingEnabled;//启用ping数据库连接检测的标识(实际使用经常是关闭的),获取连接后,ping数据库校验连接有效
      protected int poolPingConnectionsNotUsedFor;//一个时间参数,获取连接后,连接空闲时间大于此值,才需要进行ping数据库连接检测
    
      private int expectedConnectionTypeCode;//连接池属性集合的hashCode,("" + url + username + password).hashCode();
    
      public PooledDataSource() {
        dataSource = new UnpooledDataSource();
      }
    
      public PooledDataSource(UnpooledDataSource dataSource) {
        this.dataSource = dataSource;
      }
    
      public PooledDataSource(String driver, String url, String username, String password) {
        dataSource = new UnpooledDataSource(driver, url, username, password);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }
    
      public PooledDataSource(String driver, String url, Properties driverProperties) {
        dataSource = new UnpooledDataSource(driver, url, driverProperties);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }
    
      public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, String username, String password) {
        dataSource = new UnpooledDataSource(driverClassLoader, driver, url, username, password);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }
    
      public PooledDataSource(ClassLoader driverClassLoader, String driver, String url, Properties driverProperties) {
        dataSource = new UnpooledDataSource(driverClassLoader, driver, url, driverProperties);
        expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
      }

    2、主要实现

    ① 连接池容器PoolState

    PoolState是实际的连接池容器,里面还记录了连接池的一些属性,主要看看属性变量,部分属性是连接池调优的参考参数。

      protected PooledDataSource dataSource;//记录实际的数据源
      protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();//空闲连接队列
      protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();//活动连接队列
      protected long requestCount = 0;//线程池接受连接请求总数
      protected long accumulatedRequestTime = 0;//线程池接受连接请求的总响应时间
      protected long accumulatedCheckoutTime = 0;//线程池中连接的总连接时间(包括已关闭的连接)
      protected long claimedOverdueConnectionCount = 0;//线程池中逾期连接数
      protected long accumulatedCheckoutTimeOfOverdueConnections = 0;//逾期连接的总连接时间
      protected long accumulatedWaitTime = 0;//线程池中获取连接总等待时间(空闲队列为空,活动队列满)
      protected long hadToWaitCount = 0;//线程池中获取连接总等待次数
      protected long badConnectionCount = 0;//线程池中的坏连接数

    ② 连接对象的代理PooledConnection

    PooledConnection实现了InvokeHandler,JDK动态代理的实现

    主要看看PooledConnection属性与invoke方法,属性

      private static final String CLOSE = "close";
      private static final Class<?>[] IFACES = new Class<?>[] { Connection.class };
      private final int hashCode; //连接的hashCode
      private final PooledDataSource dataSource; //连接的数据源
      private final Connection realConnection;//真实连接,target object
      private final Connection proxyConnection;//代理连接 proxy object
      private long checkoutTimestamp; // 连接连接时长
      private long createdTimestamp; // 连接的创建时间
      private long lastUsedTimestamp; //  连接的最后使用时间
      private int connectionTypeCode; // 连接数据源属性的hashCode (url+username+password).hashCode
      private boolean valid; //连接有效标识
    
      public PooledConnection(Connection connection, PooledDataSource dataSource) {
        this.hashCode = connection.hashCode();
        this.realConnection = connection;
        this.dataSource = dataSource;
        this.createdTimestamp = System.currentTimeMillis();
        this.lastUsedTimestamp = System.currentTimeMillis();
        this.valid = true;
        //创建代理对象
        this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
      }

    invoke():连接复用的实现之一pushConnection(),执行的时机。

      public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
          //调用connection.close()时,不是直接调用真实连接target object的close
          //而是通过proxy object调用PooledDataSource.pushConnection(target object)
          dataSource.pushConnection(this);
          return null;
        } else {
          try {
            if (!Object.class.equals(method.getDeclaringClass())) {
              //非Object方法的其他方法,先检查valid(有效标识),无效抛出异常
              checkConnection();
            }
            //调用target object的方法
            return method.invoke(realConnection, args);
          } catch (Throwable t) {
            throw ExceptionUtil.unwrapThrowable(t);
          }
        }
      }

    ③ 连接池连接复用实现 :队列实现

    出队:popConnection():返回一个连接

      public Connection getConnection(String username, String password) throws SQLException {
        return popConnection(username, password).getProxyConnection();
      }
    
      private PooledConnection popConnection(String username, String password) throws SQLException {
        boolean countedWait = false;
        PooledConnection conn = null;
        long t = System.currentTimeMillis();
        int localBadConnectionCount = 0;
       //重复获取连接直到,获取成功或者获取超时
        while (conn == null) {
          //加锁
          synchronized (state) {
            if (!state.idleConnections.isEmpty()) {
              //空闲连接队列不为空,出队一个连接(连接复用)
              conn = state.idleConnections.remove(0);
              if (log.isDebugEnabled()) {
                log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
              }
            } else {
              // 空闲连接队列为空
              if (state.activeConnections.size() < poolMaximumActiveConnections) {
                // 活动连接队列 < 最大活动连接数,新建一个连接
                conn = new PooledConnection(dataSource.getConnection(), this);
                if (log.isDebugEnabled()) {
                  log.debug("Created connection " + conn.getRealHashCode() + ".");
                }
              } else {
                // 活动连接队列大小已到达最大活动连接数,取活动时间最长的活动连接(FIFO取队列头poll())
                PooledConnection oldestActiveConnection = state.activeConnections.get(0);
                long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                if (longestCheckoutTime > poolMaximumCheckoutTime) {
                  // 活动连接队列中时长最长的连接 超出了 最大连接时间
                  // 更新连接池总逾期连接数+1
                  // 更新连接池逾期连接的总连接时间
                  // 更新连接池连接的总连接时间
                  // 将逾期连接作废:移出活动连接队列+非自动提交连接回退+更改作废标识+创建新的连接代理
                  state.claimedOverdueConnectionCount++;
                  state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
                  state.accumulatedCheckoutTime += longestCheckoutTime;
                  // 移出活动连接队列
                  state.activeConnections.remove(oldestActiveConnection);
                  if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                    //非自动提交连接回退(事务时)
                    try {
                      oldestActiveConnection.getRealConnection().rollback();
                    } catch (SQLException e) {
                      log.debug("Bad connection. Could not roll back");
                    }  
                  }
                  //创建新的连接代理,旧的连接代理作废(invalid=false),注意:保留旧的pooledConnection(可能还在执行),等realConnection.close()一起被GC。
                  conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
                  conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
                  conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
                  oldestActiveConnection.invalidate();
                  if (log.isDebugEnabled()) {
                    log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
                  }
                } else {
                  // 空闲连接队列为空,活动连接队列为满,并且活动连接中连接时长最长的连接未超时
                  // 必须等待
                  try {
                    if (!countedWait) {
                      //统计等待次数,调优参考参数,过大可能需要设置更大的活动连接数了
                      state.hadToWaitCount++;
                      countedWait = true;
                    }
                    if (log.isDebugEnabled()) {
                      log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                    }
                    long wt = System.currentTimeMillis();
                    //生产者--消费者模型,等待唤醒,让出锁+CPU
                    state.wait(poolTimeToWait);
                    //统计连接池的等待时长
                    state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                  } catch (InterruptedException e) {
                    break;
                  }
                }
              }
            }
            if (conn != null) {
              // 获取连接成功,ping数据库服务器,检查连接是否还有效,例如连接闲置时间>数据库超时时间,连接失效  
              if (conn.isValid()) {
                if (!conn.getRealConnection().getAutoCommit()) {
                  conn.getRealConnection().rollback();
                }
                //连接有效,
                //更新连接属性hashCode,
                //更新获取连接的时间,最后使用连接的时间,并
                //加入到活动连接队列中,
                //连接池请求数+1,请求响应时间统计
                conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
                conn.setCheckoutTimestamp(System.currentTimeMillis());
                conn.setLastUsedTimestamp(System.currentTimeMillis());
                state.activeConnections.add(conn);
                state.requestCount++;
                state.accumulatedRequestTime += System.currentTimeMillis() - t;
              } else {
                if (log.isDebugEnabled()) {
                  log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
                }
                //连接无效
                //更新连接池,坏连接数+1,本地坏连接数+1,
                state.badConnectionCount++;
                localBadConnectionCount++;
                conn = null;
                if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
                  //当坏连接数 > 最大空闲连接数 + 本地坏连接容忍数时,抛出异常,
                  //此种场景相当于连接池终止服务
                  if (log.isDebugEnabled()) {
                    log.debug("PooledDataSource: Could not get a good connection to the database.");
                  }
                  throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
                }
              }
            }
          }
    
        }
    
        if (conn == null) {
          if (log.isDebugEnabled()) {
            log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
          }
          //最后检测conn是否获取成功
          throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
        }
    
        return conn;
      }

    入队:pushConnection():关闭连接或入空闲连接队列

      protected void pushConnection(PooledConnection conn) throws SQLException {
        //加锁
        synchronized (state) {
          //从活动连接队列中删除
          state.activeConnections.remove(conn);
          if (conn.isValid()) {
            //连接有效
            if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
              //当前空闲连接数小于最大空闲连接数,并且连接属于同一个数据库
              //更新连接池总连接时间
              state.accumulatedCheckoutTime += conn.getCheckoutTime();
              if (!conn.getRealConnection().getAutoCommit()) {
                //连接不是自动提交,回退(加锁后另外保证策略,清除连接的职责)
                conn.getRealConnection().rollback();
              }
              //创建连接的新代理,旧代理作废
              PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
              //放入到空闲连接队列中,更新连接创建时间,最后使用时间
              state.idleConnections.add(newConn);
              newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
              newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
              //设置旧连接无效
              conn.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
              }
              //生产者--消费者模型,唤醒popConnection中所有等待线程
              state.notifyAll();
            } else {
              //空闲连接数已达到最大,关闭连接
              state.accumulatedCheckoutTime += conn.getCheckoutTime();
              if (!conn.getRealConnection().getAutoCommit()) {
                conn.getRealConnection().rollback();
              }
              conn.getRealConnection().close();
              if (log.isDebugEnabled()) {
                log.debug("Closed connection " + conn.getRealHashCode() + ".");
              }
              conn.invalidate();
            }
          } else {
            //连接无效,更新连接池坏连接数+1
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
            }
            state.badConnectionCount++;
          }
        }
      }

    3、总结

    ① UnpooledDataSource是对JDBC的封装使用,PooledDataSource利用装饰者模式增强UnpooledDataSource的功能,实现了线程池。

    ② 采用队列实现:空闲队列+活动队列,队列最大容量分别为poolMaxinumIdleConnections(默认5),poolMaxinumActiveConnections(默认10)

    ③ PoolState封装了线程池的运行时数据,这些数据利于监控,调优。例如:等待连接次数过多,很可能是设置的最大活动连接数、最大空闲连接数过小。

    ④ getConnection实际返回的是连接对象的代理对象,利用JDK动态代理技术,connection.close()实际是调用proxyConneciton.close(),实现连接的回收(入空闲队列),便于复用。

    ⑤ 活动连接超出连接时间时,请求连接会抢占真实连接,作废旧的代理连接,创建一个新的代理连接。需要注意的是:旧的代理连接按道理会等连接关闭时,一起被GC。

    ⑥ 利用了生产者-消费者模型实现

    ⑦ 简单的流程图

     

  • 相关阅读:
    <img/>标签onerror事件在IE下的bug和解决方法
    IIS启用Gzip压缩造成OpenFlashChart不能正常显示问题及解决方法
    小心枚举陷阱
    "动软.Net代码生成器"的一次扩展经历
    旁听面试杂想
    .NET Remoting学习点滴(二):基本概念
    十字路口
    表变量和临时表
    动态创建WebService
    拼接SQL造成的意想不到的后果
  • 原文地址:https://www.cnblogs.com/wqff-biubiu/p/12466771.html
Copyright © 2011-2022 走看看