zoukankan      html  css  js  c++  java
  • Android数据库源码分析(3)-连接缓存池SQLiteConnectionPool

    本系列主要关注安卓数据库的线程行为,分为四个部分:
    (1)SQLiteOpenHelper的getReadableDatabase和getWritableDatabase
    (2)SQLiteDatabase的实现以及多线程行为
    (3)连接缓存池SQLiteConnectionPool
    (4)SQLiteDatabase多线程实践

    本篇主要关注SQLiteConnectionPool(连接池)在并发下的行为。

    上文提到,SQLiteDatabase会在每个线程中使用一个SQLiteSession,而SQLiteSession会共用一个SQLiteConnectionPool对象,并通过SQLiteConnectionPoolacquireConnectionreleaseConnection方法来获取和释放数据库连接(一个SQLiteConnection对象)。

    public SQLiteConnection acquireConnection(String sql, int connectionFlags,
            CancellationSignal cancellationSignal) {
        return waitForConnection(sql, connectionFlags, cancellationSignal);//看名字是要等待什么锁了
    }
    

    connectionFlags是用SQLiteDatabase.getThreadDefaultConnectionFlags的返回值一路传下来的,这个方法在前文讨论过这个方法,会记录两件事:1.数据库是只读还是可写;2.当前是否主线程。

    waitForConnection方法比较长,我们一段一段地看。

    1 尝试立即获取连接

        //是否可写连接。可写的连接同一时间只能存在一个。
        final boolean wantPrimaryConnection =
                (connectionFlags & CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY) != 0;
    
        final ConnectionWaiter waiter;
        final int nonce;
        synchronized (mLock) {//加锁。留意这一段代码中加锁部分并未结束。
            throwIfClosedLocked();
    
            // Abort if canceled.
            if (cancellationSignal != null) {
                cancellationSignal.throwIfCanceled();
            }
    
            // Try to acquire a connection.
            SQLiteConnection connection = null;
            if (!wantPrimaryConnection) {
                //尝试获取只读连接
                connection = tryAcquireNonPrimaryConnectionLocked(
                        sql, connectionFlags); // might throw
            }
            if (connection == null) {
                //尝试获取可写连接
                connection = tryAcquirePrimaryConnectionLocked(connectionFlags); // might throw
            }
            if (connection != null) {
                return connection;
            }
    

    到这里是尝试直接获取连接。尝试的方法有tryAcquireNonPrimaryConnectionLockedtryAcquirePrimaryConnectionLocked。只读时只需要 non primary connection,而需要写时要primary connection。
    先看tryAcquirePrimaryConnectionLocked

    // Might throw.
    private SQLiteConnection tryAcquirePrimaryConnectionLocked(int connectionFlags) {
        // If the primary connection is available, acquire it now.
        SQLiteConnection connection = mAvailablePrimaryConnection;//同时只能存在一个可写连接,用一个成员变量mAvailablePrimaryConnection缓存空闲连接
        if (connection != null) {//有缓存返回即可。finishAcquirePrimaryConnection会把connection放到mAcquiredConnections中。mAcquiredConnections存储正在使用的连接。
            mAvailablePrimaryConnection = null;//不再空闲
            finishAcquireConnectionLocked(connection, connectionFlags); // might throw
            return connection;
        }
    
        // Make sure that the primary connection actually exists and has just been acquired.
        //如果上一个if造成了不再空闲,则mAcquiredConnections中就会有一个primary connection,这里就会返回null。上一层的waitForConnection接到null会进入等待状态,这个后面讨论。
        for (SQLiteConnection acquiredConnection : mAcquiredConnections.keySet()) {
            if (acquiredConnection.isPrimaryConnection()) {
                return null;
            }
        }
    
        //如果没有在上面返回null,那么这一定是第一次请求primary connnection,或者有一个连接泄露了(未recycle的情况下finalize),这时候就需要用openConnectionLocked去新开一个连接。
        // Uhoh.  No primary connection!  Either this is the first time we asked
        // for it, or maybe it leaked?
        connection = openConnectionLocked(mConfiguration,
                true /*primaryConnection*/); // might throw
        finishAcquireConnectionLocked(connection, connectionFlags); // might throw
        return connection;
    }
    

    然后看tryAcquireNonPrimaryConnectionLocked

    // Might throw.
    private SQLiteConnection tryAcquireNonPrimaryConnectionLocked(
            String sql, int connectionFlags) {
        // Try to acquire the next connection in the queue.
        SQLiteConnection connection;
        //只读连接可以有多个,用一个ArrayList缓存了所有空闲连接
        final int availableCount = mAvailableNonPrimaryConnections.size();
        if (availableCount > 1 && sql != null) {
            // If we have a choice, then prefer a connection that has the
            // prepared statement in its cache.
            // 如上面的英文注释说的,如果有不止一个连接可选,那么挑选缓存了相同sql语句的那个。可能SQLiteConnection对此有优化?
            for (int i = 0; i < availableCount; i++) {
                connection = mAvailableNonPrimaryConnections.get(i);
                if (connection.isPreparedStatementInCache(sql)) {//如果有相同sql,返回
                    mAvailableNonPrimaryConnections.remove(i);
                    finishAcquireConnectionLocked(connection, connectionFlags); // might throw
                    return connection;
                }
            }
        }
        if (availableCount > 0) {
            // Otherwise, just grab the next one.
            //没有挑到,随便给一个
            connection = mAvailableNonPrimaryConnections.remove(availableCount - 1);
            finishAcquireConnectionLocked(connection, connectionFlags); // might throw
            return connection;
        }
    
        // 一个空闲连接都没有。
        // Expand the pool if needed.
        int openConnections = mAcquiredConnections.size();
        if (mAvailablePrimaryConnection != null) {
            openConnections += 1;
        }
        // 上面在计算有多少已打开连接(空闲+使用中)。这里肯定没有空闲non primary连接了,而如果有空闲primary连接,则要 += 1。
        if (openConnections >= mMaxConnectionPoolSize) {
            // 超过数据库连接限制,放弃治疗。连接限制与数据库底层实现有关。
            return null;
        }
        // 没超限,还能再开一个连接。所以开连接并返回。
        connection = openConnectionLocked(mConfiguration,
                false /*primaryConnection*/); // might throw
        finishAcquireConnectionLocked(connection, connectionFlags); // might throw
        return connection;
    }
    

    在这一步中,进行了从缓存中取得连接的尝试;而如果无法取得连接,也进行了打开连接的尝试。如果再无法打开的话,就会拿到一个null了。后续就需要进行等待。

    2 等待获取连接

            // 留意这里还在上一个锁mLock中
            // No connections available.  Enqueue a waiter in priority order.
            final int priority = getPriority(connectionFlags);//主线程中的连接优先级更高,记得吗?
            final long startTime = SystemClock.uptimeMillis();
            // waiter是一个ConnectionWaiter对象。它同时也是一个链表,有一个同类的mNext成员变量。
            // obtainConnectionWaiterLocked会去复用(取链表头)或者新建一个对象。
            waiter = obtainConnectionWaiterLocked(Thread.currentThread(), startTime,
                    priority, wantPrimaryConnection, sql, connectionFlags);
            ConnectionWaiter predecessor = null;
            // 按照优先级向mConnectionWaiterQueue添加waitor对象。mConnectionWaiterQueue不是复用池,而是有效的等待队列(也是链表)。
            ConnectionWaiter successor = mConnectionWaiterQueue;
            while (successor != null) {
                if (priority > successor.mPriority) {
                    waiter.mNext = successor;
                    break;
                }
                predecessor = successor;
                successor = successor.mNext;
            }
            if (predecessor != null) {
                predecessor.mNext = waiter;
            } else {
                mConnectionWaiterQueue = waiter;
            }
    
            nonce = waiter.mNonce;//观察recycleConnectionWaiterLocked方法,mNonce在waiter每次被复用完成回收时自增1
        }//锁mLock结束
    

    到这里就是把需要等待的连接信息封装到ConnectionWaiter中,并将ConnectionWaiter对象放到一个链表里。那么什么时候会结束等待并返回呢?继续看代码:

        // Set up the cancellation listener.
        if (cancellationSignal != null) {
            cancellationSignal.setOnCancelListener(new CancellationSignal.OnCancelListener() {
                @Override
                public void onCancel() {
                    synchronized (mLock) {
                        if (waiter.mNonce == nonce) {//nonce的作用在这里体现。防止waiter对象复用造成误取消。
                            cancelConnectionWaiterLocked(waiter);
                        }
                    }
                }
            });
        }
    

    这一段用于额外处理取消信号的。在等待连接过程中取消,就可以把这一个waiter去除了。

    接下来:

        try {
            // Park the thread until a connection is assigned or the pool is closed.
            // Rethrow an exception from the wait, if we got one.
            long busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS;
            long nextBusyTimeoutTime = waiter.mStartTime + busyTimeoutMillis;
            for (;;) {//循环开始
                // Detect and recover from connection leaks.
                // 管理泄露连接的。如果一个SQLiteConnection在finalize时还未关闭,则会置泄露状态。
                // mConnectionLeaked是一个AtomicBoolean。
                if (mConnectionLeaked.compareAndSet(true, false)) {
                    synchronized (mLock) {
                        wakeConnectionWaitersLocked();//有泄露连接被关闭的话,最大连接限制下就可能有位置空出来,这时候就可以尝试分配一个连接
                    }
                }
    
                // Wait to be unparked (may already have happened), a timeout, or interruption.
                // 等待。那么unpark在哪里?在wakeConnectionWaitersLocked中。这个方法在上面泄露测试时调用过。
                // 还有cancelConnectionWaiterLocked中,取消等待自然要唤醒线程处理一下。
                LockSupport.parkNanos(this, busyTimeoutMillis * 1000000L);
    
                // Clear the interrupted flag, just in case.
                Thread.interrupted();
    
                // Check whether we are done waiting yet.
                synchronized (mLock) {
                    throwIfClosedLocked();
    
                    //等到了一个Connection。这个mAssignedConnection是何时赋值的呢?
                    //也是在wakeConnectionWaitersLocked中赋值的。
                    final SQLiteConnection connection = waiter.mAssignedConnection;
                    final RuntimeException ex = waiter.mException;
                    if (connection != null || ex != null) {
                        recycleConnectionWaiterLocked(waiter);//回收waiter,会造成mNonce自增1
                        if (connection != null) {
                            return connection;
                        }
                        throw ex; // rethrow!
                    }
    
                    //没拿到连接,继续等。
                    final long now = SystemClock.uptimeMillis();
                    if (now < nextBusyTimeoutTime) {
                        busyTimeoutMillis = now - nextBusyTimeoutTime;
                    } else {
                        logConnectionPoolBusyLocked(now - waiter.mStartTime, connectionFlags);
                        busyTimeoutMillis = CONNECTION_POOL_BUSY_MILLIS;
                        nextBusyTimeoutTime = now + busyTimeoutMillis;
                    }
                }
            }//循环结束
        } finally {
            // Remove the cancellation listener.
            if (cancellationSignal != null) {
                cancellationSignal.setOnCancelListener(null);
            }
        }
    }
    

    在这一步中,用ConnectionWaiter来封装等待中的连接信息,并按优先级放入一个链表,随后进入等待状态。获取到连接后,等待状态结束,返回连接。

    3 连接的释放

    这里我们可以先预估以下:释放连接时需要把被释放的连接放回到空闲连接集合,并进行unpark操作,通知正在等待连接的线程。

    代码如下:

    public void releaseConnection(SQLiteConnection connection) {
        synchronized (mLock) {
            AcquiredConnectionStatus status = mAcquiredConnections.remove(connection);//从活跃连接池中移除
            if (status == null) {
                throw new IllegalStateException("Cannot perform this operation "
                        + "because the specified connection was not acquired "
                        + "from this pool or has already been released.");
            }
    
            if (!mIsOpen) {
                closeConnectionAndLogExceptionsLocked(connection);
            } else if (connection.isPrimaryConnection()) {
                if (recycleConnectionLocked(connection, status)) {
                    assert mAvailablePrimaryConnection == null;
                    mAvailablePrimaryConnection = connection;//放回可写连接mAvailablePrimaryConnection
                }
                wakeConnectionWaitersLocked();//通知其它线程
            } else if (mAvailableNonPrimaryConnections.size() >= mMaxConnectionPoolSize - 1) {
                closeConnectionAndLogExceptionsLocked(connection);
            } else {
                if (recycleConnectionLocked(connection, status)) {
                    mAvailableNonPrimaryConnections.add(connection);//放回空闲只读连接池
                }
                wakeConnectionWaitersLocked();//通知其它线程
            }
        }
    }
    

    4 总结

    综上所述,SQLiteConnectionPool提供数据库连接的流程如下:
    (1)从缓存中获取一个空闲的连接。若有多个空闲连接,优先挑选执行过相同SQL的那个。注意如果是写操作的话,则会返回一个primary connection,并将其它尝试获得primary connection的线程阻塞,直到当前线程结束使用连接。而只读的操作则可以同时存在多个,并可以和写操作的连接共存。
    (2)如果缓存中没有连接,检查底层数据库是否可以容纳更多连接。如果可以,新建一个连接并返回。
    (3)如果底层数据库不再允许增加连接,则进入等待。到超时或者有其它连接被释放结束等待。如果此时可以获取连接,则返回连接。如果不能,进入新一轮等待。

    5 多线程下的transaction

    了解了以上的特性之后,transaction的多线程行为就比较好理解了。
    以下是SQLiteDatabasebeginTransaction方法:

    private void beginTransaction(SQLiteTransactionListener transactionListener,
            boolean exclusive) {
        acquireReference();
        try {
            getThreadSession().beginTransaction(
                    exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE :
                            SQLiteSession.TRANSACTION_MODE_IMMEDIATE,
                    transactionListener,
                    getThreadDefaultConnectionFlags(false /*readOnly*/), null);
        } finally {
            releaseReference();
        }
    }
    

    留意flags参数传入的readOnly为false,所以SQLiteSession会从SQLiteConnectionPool中获取一个独占的连接。并且在SQLiteSession执行其它SQL语句的情况下,执行完成会将连接释放回连接池,而beginTransaction操作则不会,而是持有这一个连接直至同一线程内调用endTransaction。这里再贴一遍SQLiteSession.execute源码:

    public void execute(String sql, Object[] bindArgs, int connectionFlags,
            CancellationSignal cancellationSignal) {
        if (sql == null) {
            throw new IllegalArgumentException("sql must not be null.");
        }
    
        if (executeSpecial(sql, bindArgs, connectionFlags, cancellationSignal)) {
            return;
        }
    
        acquireConnection(sql, connectionFlags, cancellationSignal); // might throw
        try {
            mConnection.execute(sql, bindArgs, cancellationSignal); // might throw
        } finally {
            //这里释放了连接(其实是交还给连接池)
            releaseConnection(); // might throw
        }
    }
    

    所以,当有一个线程在transaction过程中时,其它线程的写操作和beginTransaction操作都会被阻塞住,直至当前线程的transaction完成才会按照优先级挑选一个线程继续。

     
  • 相关阅读:
    start tag, end tag issues in IE7, particularly in xslt transformation
    用SandCastle为注释生成chm文档
    Firebug
    架构的重点
    Linux Shell常用技巧(十) 管道组合
    Linux JDK升级
    Linux Shell常用技巧(十二) Shell编程
    Packet Tracer 5.0实验(一) 交换机的基本配置与管理
    Linux Shell常用技巧(六) sort uniq tar split
    Linux Shell常用技巧(二) grep
  • 原文地址:https://www.cnblogs.com/endv/p/12227749.html
Copyright © 2011-2022 走看看