zoukankan      html  css  js  c++  java
  • Mybatis源码学习之DataSource(七)_2

    接上节数据源,本节我们将继续学习未完成的部分,包括无连接池情况下的分析、为什么使用连接池、及mybatis连接池的具体管理原理


    不使用连接池的UnpooledDataSource

    当 的type属性为【UNPOOLED】时,MyBatis首先会实例化一个UnpooledDataSourceFactory工厂实例,然后通过getDataSource()方法返回一个UnpooledDataSource实例对象引用。

    使用UnpooledDataSource的getConnection(),每调用一次就会产生一个新的Connection实例对象。

    首先看一下 UnpooledDataSourceFactory 的实现:

    /**
     * @author kaifeng
     * @author Clinton Begin
     */
    public class UnpooledDataSourceFactory implements DataSourceFactory {
    
        private static final String DRIVER_PROPERTY_PREFIX = "driver.";
        private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();
    
        protected DataSource dataSource;
    
        public UnpooledDataSourceFactory() {
            this.dataSource = new UnpooledDataSource();
        }
    
        @Override
        public void setProperties(Properties properties) {
            Properties driverProperties = new Properties();
            //创建DataSource相应的MetaObject对象
            MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
            //遍历properties集合,该集合中有数据源需要的属性
            for (Object key : properties.keySet()) {
                String propertyName = (String) key;
                //以driver.开头的属性放入driverProperties中
                if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
                    String value = properties.getProperty(propertyName);
                    driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
                } else if (metaDataSource.hasSetter(propertyName)) {
                    String value = (String) properties.get(propertyName);
                    //类型转换
                    Object convertedValue = convertValue(metaDataSource, propertyName, value);
                    metaDataSource.setValue(propertyName, convertedValue);
                } else {
                    throw new DataSourceException("Unknown DataSource property: " + propertyName);
                }
            }
            if (driverProperties.size() > 0) {
                metaDataSource.setValue("driverProperties", driverProperties);
            }
        }
    
        @Override
        public DataSource getDataSource() {
            return dataSource;
        }
    
        /**
         * 类型转换,只支持三种类型转换,分别是Integer、Long、Boolean
         */
        private Object convertValue(MetaObject metaDataSource, String propertyName, String value) {
            Object convertedValue = value;
            Class<?> targetType = metaDataSource.getSetterType(propertyName);
            if (targetType == Integer.class || targetType == int.class) {
                convertedValue = Integer.valueOf(value);
            } else if (targetType == Long.class || targetType == long.class) {
                convertedValue = Long.valueOf(value);
            } else if (targetType == Boolean.class || targetType == boolean.class) {
                convertedValue = Boolean.valueOf(value);
            }
            return convertedValue;
        }
    
    }

    UnpooledDataSourceFactory实现了DataSourceFactory的方法,这里我们需要注意看它的构造函数,当实例化UnpooledDataSourceFactory对象时,会先执行其构造函数,将dataSource的引用指向了UnpooledDataSource的实例对象。

    接着我们看一下UnPooledDataSource的getConnection()方法的实现:

     /**
         * 获取数据源连接对象
         */
        @Override
        public Connection getConnection() throws SQLException {
            return doGetConnection(username, password);
        }
    
        /**
         * 获取数据源连接对象
         *
         * @param username 用户名
         * @param password 密码
         */
        @Override
        public Connection getConnection(String username, String password) throws SQLException {
            return doGetConnection(username, password);
        }
        /**
         * 根据指定用户名和密码获取数据源连接对象
         *
         * @param username 用户名
         * @param password 密码
         */
        private Connection doGetConnection(String username, String password) throws SQLException {
            Properties props = new Properties();
            if (driverProperties != null) {
                props.putAll(driverProperties);
            }
            if (username != null) {
                props.setProperty("user", username);
            }
            if (password != null) {
                props.setProperty("password", password);
            }
            return doGetConnection(props);
        }
    
        /**
         * 根据指定属性获取数据源连接对象
         *
         * @param properties 配置属性
         */
        private Connection doGetConnection(Properties properties) throws SQLException {
            //初始化数据源连接驱动
            initializeDriver();
            //从DriverManager中获取数据库连接
            Connection connection = DriverManager.getConnection(url, properties);
            //设置连接对象
            configureConnection(connection);
            return connection;
        }
    
        /**
         * 初始化数据源连接驱动
         */
        private synchronized void initializeDriver() throws SQLException {
            //没有注册的驱动,需要加载到registeredDrivers集合中
            if (!registeredDrivers.containsKey(driver)) {
                Class<?> driverType;
                try {
                    // 加载数据库连接驱动
                    if (driverClassLoader != null) {
                        driverType = Class.forName(driver, true, driverClassLoader);
                    } else {
                        driverType = Resources.classForName(driver);
                    }
                    // DriverManager requires the driver to be loaded via the system ClassLoader.
                    // http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
                    //创建驱动实例
                    Driver driverInstance = (Driver) driverType.newInstance();
                    //注册到DriverManager中,用于创建数据库连接,代理模式实例化driver对象
                    DriverManager.registerDriver(new DriverProxy(driverInstance));
                    registeredDrivers.put(driver, driverInstance);
                } catch (Exception e) {
                    throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
                }
            }
        }
    

    如代码所示,UnpooledDataSource会做以下事情:

    1. 初始化驱动: 判断driver驱动是否已经加载到内存中,如果还没有加载,则会动态地加载driver类,并实例化一个Driver对象,使用DriverManager.registerDriver()方法将其注册到内存中,以供后续使用。

    2. 创建Connection对象: 使用DriverManager.getConnection()方法创建连接。

    3. 配置Connection对象: 设置是否自动提交autoCommit和隔离级别isolationLevel。

    4. 返回Connection对象。
      image

    为什么要使用连接池

    由UnpooledDataSource代码可知我们每调用一次getConnection()方法,都会通过DriverManager.getConnection()返回新的java.sql.Connection实例,创建一个Connection对象的过程,在底层就相当于和数据库建立的通信连接,在建立通信连接的过程,每次都会消耗一部分时间,而往往我们建立连接后,就执行一个简单的SQL语句,然后就要抛弃掉,这是一个非常大的资源浪费。

    对于需要频繁地跟数据库交互的应用程序,可以在创建了Connection对象,并操作完数据库后,可以不释放掉资源,而是将它放到内存中,当下次需要操作数据库时,可以直接从内存中取出Connection对象,不需要再创建了,这样就极大地节省了创建Connection对象的资源消耗。由于内存也是有限和宝贵的,这就要求我们对内存中的Connection对象怎么有效地管理提出了很高的要求。我们把在内存中存放Connection对象的容器称之为 连接池(Connection Pool)。下面让我们来看一下MyBatis的线程池是怎样实现的。

    使用连接池的PooledDataSource

    同UnpooledDataSource一样,我们也先看一下PooledDataSource的getConnection()方法的基本原理:

    PooledDataSource将java.sql.Connection对象包裹成PooledConnection对象放到了PoolState类型的容器中维护。

    MyBatis将连接池中的PooledConnection分为两种状态: 空闲状态(idle)和活动状态(active),这两种状态的PooledConnection对象分别被存储到PoolState容器内的idleConnections和activeConnections两个List集合中:

    idleConnections:空闲(idle)状态PooledConnection对象被放置到此集合中,表示当前闲置的没有被使用的PooledConnection集合,调用PooledDataSource的getConnection()方法时,会优先从此集合中取PooledConnection对象。当用完一个java.sql.Connection对象时,MyBatis会将其包裹成PooledConnection对象放到此集合中。

    activeConnections:活动(active)状态的PooledConnection对象被放置到名为activeConnections的ArrayList中,表示当前正在被使用的PooledConnection集合,调用PooledDataSource的getConnection()方法时,会优先从idleConnections集合中取PooledConnection对象,如果没有,则看此集合是否已满,如果未满,PooledDataSource会创建出一个PooledConnection,添加到此集合中,并返回。

    以下是PoolState的源码,它用来维护连接池中,空闲连接对象和活动连接对象:

    /**
     * 维护连接状态
     *
     * @author Clinton Begin
     */
    public class PoolState {
    
        protected PooledDataSource dataSource;
    
        /**
         * 空闲连接
         */
        protected final List<PooledConnection> idleConnections = new ArrayList<PooledConnection>();
    
        /**
         * 活动连接
         */
        protected final List<PooledConnection> activeConnections = new ArrayList<PooledConnection>();
    
        /**
         * 请求数量
         */
        protected long requestCount = 0;
    
        /**
         * 请求获得连接所需时间
         */
        protected long accumulatedRequestTime = 0;
    
        /**
         * 统计连接使用时间
         */
        protected long accumulatedCheckoutTime = 0;
    
        /**
         * 统计过期回收连接数
         */
        protected long claimedOverdueConnectionCount = 0;
    
        /**
         * 统计过期连接使用时间
         */
        protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
    
        /**
         * 统计获取连接需要等待的时间
         */
        protected long accumulatedWaitTime = 0;
    
        /**
         * 统计获取连接需要等待的次数
         */
        protected long hadToWaitCount = 0;
    
        /**
         * 统计无效连接个数
         */
        protected long badConnectionCount = 0;
    
        public PoolState( PooledDataSource dataSource ) {
            this.dataSource = dataSource;
        }
    
        public synchronized long getRequestCount() {
            return requestCount;
        }
    
        public synchronized long getAverageRequestTime() {
            return requestCount == 0 ? 0 : accumulatedRequestTime / requestCount;
        }
    
        public synchronized long getAverageWaitTime() {
            return hadToWaitCount == 0 ? 0 : accumulatedWaitTime / hadToWaitCount;
    
        }
    
        public synchronized long getHadToWaitCount() {
            return hadToWaitCount;
        }
    
        public synchronized long getBadConnectionCount() {
            return badConnectionCount;
        }
    
        public synchronized long getClaimedOverdueConnectionCount() {
            return claimedOverdueConnectionCount;
        }
    
        public synchronized long getAverageOverdueCheckoutTime() {
            return claimedOverdueConnectionCount == 0 ? 0 : accumulatedCheckoutTimeOfOverdueConnections / claimedOverdueConnectionCount;
        }
    
        public synchronized long getAverageCheckoutTime() {
            return requestCount == 0 ? 0 : accumulatedCheckoutTime / requestCount;
        }
    
    
        public synchronized int getIdleConnectionCount() {
            return idleConnections.size();
        }
    
        public synchronized int getActiveConnectionCount() {
            return activeConnections.size();
        }
    
        @Override
        public synchronized String toString() {
            StringBuilder builder = new StringBuilder();
            builder.append("
    ===CONFINGURATION==============================================");
            builder.append("
     jdbcDriver                     ").append(dataSource.getDriver());
            builder.append("
     jdbcUrl                        ").append(dataSource.getUrl());
            builder.append("
     jdbcUsername                   ").append(dataSource.getUsername());
            builder.append("
     jdbcPassword                   ").append((dataSource.getPassword() == null ? "NULL" : "************"));
            builder.append("
     poolMaxActiveConnections       ").append(dataSource.poolMaximumActiveConnections);
            builder.append("
     poolMaxIdleConnections         ").append(dataSource.poolMaximumIdleConnections);
            builder.append("
     poolMaxCheckoutTime            ").append(dataSource.poolMaximumCheckoutTime);
            builder.append("
     poolTimeToWait                 ").append(dataSource.poolTimeToWait);
            builder.append("
     poolPingEnabled                ").append(dataSource.poolPingEnabled);
            builder.append("
     poolPingQuery                  ").append(dataSource.poolPingQuery);
            builder.append("
     poolPingConnectionsNotUsedFor  ").append(dataSource.poolPingConnectionsNotUsedFor);
            builder.append("
     ---STATUS-----------------------------------------------------");
            builder.append("
     activeConnections              ").append(getActiveConnectionCount());
            builder.append("
     idleConnections                ").append(getIdleConnectionCount());
            builder.append("
     requestCount                   ").append(getRequestCount());
            builder.append("
     averageRequestTime             ").append(getAverageRequestTime());
            builder.append("
     averageCheckoutTime            ").append(getAverageCheckoutTime());
            builder.append("
     claimedOverdue                 ").append(getClaimedOverdueConnectionCount());
            builder.append("
     averageOverdueCheckoutTime     ").append(getAverageOverdueCheckoutTime());
            builder.append("
     hadToWait                      ").append(getHadToWaitCount());
            builder.append("
     averageWaitTime                ").append(getAverageWaitTime());
            builder.append("
     badConnectionCount             ").append(getBadConnectionCount());
            builder.append("
    ===============================================================");
            return builder.toString();
        }
    
    }

    PooledDataSource中获取java.sql.Connection对象的过程

    连接池对象PooledDataSource中的getConnection()方法,该方法最终会调用popConnection方法获取一个Conection对象,下面我们一起看一下popConnection方法具体产生Connection对象的过程。

     @Override
        public Connection getConnection() throws SQLException {
            return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
        }
    
        @Override
        public Connection getConnection(String username, String password) throws SQLException {
            return popConnection(username, password).getProxyConnection();
        }
    
     /**
         * 获取连接
         *
         * @param username 用户名
         * @param password 密码
         */
        private PooledConnection popConnection(String username, String password) throws SQLException {
            //是否需要等待连接标记
            boolean countedWait = false;
            //返回对象
            PooledConnection conn = null;
            //方法开始的时间,用来计算请求连接耗费时间的
            long t = System.currentTimeMillis();
            //坏连接计数
            int localBadConnectionCount = 0;
    
    
            //如果没获取到连接就一直重试,有3种情况会退出循环:
            //1、成功获取到连接
            //2、调用wait(timeout)的时候抛出InterruptedException异常
            //3、获取失效连接的次数已经大于最大空闲连接数+3次,localBadConnectionCount > (poolMaximumIdleConnections + 3)的时候抛出异常
            while (conn == null) {
                //对PoolState这个对象加锁,这个对象在连接池是唯一的。
                synchronized (state) {
                    //如果空闲连接列表不为空,取出第一个连接
                    if (!state.idleConnections.isEmpty()) {
                        // Pool has available connection
                        conn = state.idleConnections.remove(0);
                        if (log.isDebugEnabled()) {
                            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
                        }
                    } else {
                        // 活动连接数 < 最大活动连接限制的时候直接new一个新连接
                        if (state.activeConnections.size() < poolMaximumActiveConnections) {
                            // 创建一个新的连接对象
                            conn = new PooledConnection(dataSource.getConnection(), this);
                            if (log.isDebugEnabled()) {
                                log.debug("Created connection " + conn.getRealHashCode() + ".");
                            }
                        } else {
                            // 活动连接数 > 最大活动连接限制
                            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
                            //获取连接被线程持有的时间
                            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
                            //判断是否已经执行超时,默认是20秒
                            if (longestCheckoutTime > poolMaximumCheckoutTime) {
                                // Can claim overdue connection
                                state.claimedOverdueConnectionCount++;
                                state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
                                state.accumulatedCheckoutTime += longestCheckoutTime;
                                //从活动连接列表移除
                                state.activeConnections.remove(oldestActiveConnection);
                                //如果连接不是自动提交的,调用它的回滚
                                if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                                    try {
                                        oldestActiveConnection.getRealConnection().rollback();
                                    } catch (SQLException e) {
                                        log.debug("Bad connection. Could not roll back");
                                    }
                                }
                                //创建新的连接
                                conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
                                conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
                                conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
                                //移除旧的连接
                                oldestActiveConnection.invalidate();
                                if (log.isDebugEnabled()) {
                                    log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
                                }
                            } else {
                                // Must wait 需要等待的处理
                                try {
                                    if (!countedWait) {
                                        //累加一次等待计数,判断条件是避免循环等待的时候多次累计
                                        state.hadToWaitCount++;
                                        countedWait = true;
                                    }
                                    if (log.isDebugEnabled()) {
                                        log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                                    }
                                    long wt = System.currentTimeMillis();
                                    //线程挂起等待被唤醒
                                    state.wait(poolTimeToWait);
                                    state.accumulatedWaitTime += System.currentTimeMillis() - wt;
                                } catch (InterruptedException e) {
                                    break;
                                }
                            }
                        }
                    }
                    //如果连接对象不为null且有效
                    if (conn != null) {
                        // ping to server and check the connection is valid or not
                        if (conn.isValid()) {
                            if (!conn.getRealConnection().getAutoCommit()) {
                                conn.getRealConnection().rollback();
                            }
                            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
                            //设置连接被获取的时间
                            conn.setCheckoutTimestamp(System.currentTimeMillis());
                            //最后更新时间
                            conn.setLastUsedTimestamp(System.currentTimeMillis());
                            state.activeConnections.add(conn);
                            state.requestCount++;
                            state.accumulatedRequestTime += System.currentTimeMillis() - t;
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
                            }
                            state.badConnectionCount++;
                            localBadConnectionCount++;
                            conn = null;
                            //获取失效连接超过一定次数,抛出异常
                            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
                                if (log.isDebugEnabled()) {
                                    log.debug("PooledDataSource: Could not get a good connection to the database.");
                                }
                                throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
                            }
                        }
                    }
                }
    
            }
    
            if (conn == null) {
                if (log.isDebugEnabled()) {
                    log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
                }
                throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
            }
    
            return conn;
        }
    

    popConnection()方法的大致流程如下:

    1. 先看是否有空闲(idle)状态下的PooledConnection对象,如果有,就直接返回一个可用的PooledConnection对象;否则进行第2步。

    2. 查看活动状态的PooledConnection池activeConnections是否已满;如果没有满,则创建一个新的PooledConnection对象,然后放到activeConnections池中,然后返回此PooledConnection对象;否则进行第三步;

    3. 看最先进入activeConnections池中的PooledConnection对象是否已经过期:如果已经过期,从activeConnections池中移除此对象,然后创建一个新的PooledConnection对象,添加到activeConnections中,然后将此对象返回;否则进行第4步。

    4. 线程等待,循环2步

    image

    PooledDataSource中归还java.sql.Connection对象的过程

    mybatis连接池对象的归还是通过PooledConnection.pushConnection()方法完成的,这里用到了代理模式,PooledConnection对象内持有一个真正的数据库连接java.sql.Connection实例对象和一个java.sql.Connection的代理:

    class PooledConnection implements InvocationHandler {
    
      //......
      //所创建它的datasource引用
      private PooledDataSource dataSource;
      //真正的Connection对象
      private Connection realConnection;
      //代理自己的代理Connection
      private Connection proxyConnection;
    
      //......
    }
    

    PooledConenction实现了InvocationHandler接口,proxyConnection对象也是根据它来生成的代理对象:

    public PooledConnection(Connection connection, PooledDataSource dataSource) {
        this.hashCode = connection.hashCode();
        this.realConnection = connection;
        this.dataSource = dataSource;
        this.createdTimestamp = System.currentTimeMillis();
        this.lastUsedTimestamp = System.currentTimeMillis();
        this.valid = true;
        this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
      }
    

    事实上,我们调用PooledDataSource的getConnection()方法返回的就是这个proxyConnection对象。

    当我们调用此proxyConnection对象上的任何方法时,都会调用PooledConnection对象内invoke()方法。

    我们看一下PooledConnection类中的invoke()方法:

     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        String methodName = method.getName();
        //当调用关闭的时候,回收此Connection到PooledDataSource中
        if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
          dataSource.pushConnection(this);
          return null;
        } else {
          try {
            if (!Object.class.equals(method.getDeclaringClass())) {
              checkConnection();
            }
            return method.invoke(realConnection, args);
          } catch (Throwable t) {
            throw ExceptionUtil.unwrapThrowable(t);
          }
        }
      }
    

    下面是真正的连接对象归还操作,我们一起看一下,它是怎么归还连接的:

     /**
         * 归还连接
         */
        protected void pushConnection(PooledConnection conn) throws SQLException {
    
            synchronized (state) {
                //从活动连接中移除连接对象
                state.activeConnections.remove(conn);
                //判断连接对象是否可用
                if (conn.isValid()) {
                    //空闲连接没有达到上限并且该连接对象属于当前连接池
                    if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
                        //累加checkoutTime
                        state.accumulatedCheckoutTime += conn.getCheckoutTime();
                        //回滚未提交的事务
                        if (!conn.getRealConnection().getAutoCommit()) {
                            conn.getRealConnection().rollback();
                        }
                        //创建新的连接对象
                        PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
                        //添加到空闲连接集合中
                        state.idleConnections.add(newConn);
                        //创建时间戳
                        newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
                        //最后活动时间戳
                        newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
                        //将原有连接对象置为无效
                        conn.invalidate();
                        if (log.isDebugEnabled()) {
                            log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
                        }
                        //唤醒阻塞的线程
                        state.notifyAll();
                    } else {
                        state.accumulatedCheckoutTime += conn.getCheckoutTime();
                        if (!conn.getRealConnection().getAutoCommit()) {
                            conn.getRealConnection().rollback();
                        }
                        //关闭真正的数据库连接对象
                        conn.getRealConnection().close();
                        if (log.isDebugEnabled()) {
                            log.debug("Closed connection " + conn.getRealHashCode() + ".");
                        }
                        //将原有连接对象置为无效
                        conn.invalidate();
                    }
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
                    }
                    //统计无效连接对象的个数
                    state.badConnectionCount++;
                }
            }
        }
    

    根据源码可知数据库连接对象归还连接池的流程如下:
    image

    至此关于mybatis的连接池的创建,数据库连接对象的创建和归还已经结束了,关于数据源部分下面我们将学习mybatis的事务管理的实现。

  • 相关阅读:
    网格走法数目
    字典序(数据字典)
    异或(数据字典)
    头条校招
    Python中的zip()
    什么是“背书”
    求十进制整数的任意进制转换
    安装并使用pyecharts库(0.5.10)
    pycharm使用Ctrl+滚轮调整字体大小
    Anaconda中安装pyecharts
  • 原文地址:https://www.cnblogs.com/liukaifeng/p/10052608.html
Copyright © 2011-2022 走看看