zoukankan      html  css  js  c++  java
  • Android数据库源码分析(2)-SQLiteDatabase的实现以及多线程行为

    Android数据库源码分析(2)-SQLiteDatabase的实现以及多线程行为

     

    本系列主要关注安卓数据库的线程行为,分为四个部分:
    (1)SQLiteOpenHelper的getReadableDatabase和getWritableDatabase
    (2)SQLiteDatabase的实现以及多线程行为
    (3)连接缓存池SQLiteConnectionPool
    (4)SQLiteDatabase多线程实践

    本篇主要关注SQLiteDatabase的线程同步实现与架构实现。

    1 SQLiteClosable的acquireReference与releaseReference方法

    SQLiteClosableSQLiteDatabase的父类,也同时是数据库下其他几个类的父类。其中实现了引用计数逻辑来控制资源释放的时机。

    private int mReferenceCount = 1;
    
    public void acquireReference() {
        synchronized(this) {
            if (mReferenceCount <= 0) {
                throw new IllegalStateException(
                        "attempt to re-open an already-closed object: " + this);
            }
            mReferenceCount++;
        }
    }
    
    public void releaseReference() {
        boolean refCountIsZero = false;
        synchronized(this) {
            refCountIsZero = --mReferenceCount == 0;
        }
        if (refCountIsZero) {
            onAllReferencesReleased();
        }
    }
    

    可以看到这里用mReferenceCount简单地实现了一个引用计数。而引用计数的初始值是1。SQLiteDatabase会在每次操作前调用一次acquireReference,而在结束后调用一次releaseReference。为了方便,下文中把这样的被acquireReferencereleaseReference包裹的过程称为一次“操作”。
    那么如果这两个方法保持成对调用的话,是不是就不可能触发onAllReferenceReleased方法?事实上,SQLiteClosable还有一个方法close调用了releaseReference。由于锁的存在,只要不在其它“操作”中调用close,调用close之后mReferenceCount的值可以断定是0。
    到这里为止,感觉上是可以用一个boolean值来标记引用状态的。因为由于锁的存在,只要各个“操作”是序列进行的(没有一个“操作”调用了另一个“操作”的情况),mReferenceCount只可能是0和1。推测引用计数就是为了应付“操作”之间存在调用这种情况。这就像同一个线程里的嵌套锁需要进行计数一样。

    2 SQLiteDatabase的打开与关闭

    2.1 关闭

    上文中提到的onAllReferenceReleased是一个抽象方法。其在SQLiteDatabase中的实现为

    @Override
    protected void onAllReferencesReleased() {
        dispose(false);
    }
    

    在finalize中同样调用了dispose方法

    protected void finalize() throws Throwable {
        try {
            dispose(true);
        } finally {
            super.finalize();
        }
    }
    

    dispose的实现为

    private void dispose(boolean finalized) {
        final SQLiteConnectionPool pool;
        synchronized (mLock) {
            if (mCloseGuardLocked != null) {
                if (finalized) {
                    //CloseGuard是一个监测是否及时调用close方法的类,一般来说除了输出日志并不会做别的什么
                    //这里事实上就是在finalize的时候如果没有close过,就输出一条日志
                    mCloseGuardLocked.warnIfOpen();
                }
                mCloseGuardLocked.close();
            }
            pool = mConnectionPoolLocked;//这个mConnectionPool是连接池。此方法里将其置空并关闭。后文详细讨论其作用。
            mConnectionPoolLocked = null;
        }
        if (!finalized) {
            //sActiveDatabases是一个静态的WeakHashMap,用key来放置所有活动数据库,而value并没有作用。dispose的时候自然要移除this。
            //跟踪代码分析下来,用这个map只是为了bug report
            synchronized (sActiveDatabases) {
                sActiveDatabases.remove(this);
            }
            if (pool != null) {
                pool.close();
            }
        }
    }
    

    2.2 打开

    在本系列第一篇中我们曾看到过,最终的打开数据库的是一个静态方法,SQLiteDatabase.openDatabase(String path, CursorFactory factory, int flags, DatabaseErrorHandler errorHandler)

    public static SQLiteDatabase openDatabase(String path, CursorFactory factory, int flags,
            DatabaseErrorHandler errorHandler) {
        SQLiteDatabase db = new SQLiteDatabase(path, flags, factory, errorHandler);
        db.open();
        return db;
    }
    

    这里很简单,就是新建一个对象,然后调用open。构造器里只有一些初始化,略过。着重看open方法:

    private void open() {
        try {
            try {
                openInner();//尝试一次
            } catch (SQLiteDatabaseCorruptException ex) {
                onCorruption();//失败了,再次尝试前调用另一个方法。
                openInner();
            }
        } catch (SQLiteException ex) {
            Log.e(TAG, "Failed to open database '" + getLabel() + "'.", ex);
            close();
            throw ex;
        }
    }
    
    private void openInner() {
        synchronized (mLock) {
            assert mConnectionPoolLocked == null;
            mConnectionPoolLocked = SQLiteConnectionPool.open(mConfigurationLocked);
            mCloseGuardLocked.open("close");
        }
    
        synchronized (sActiveDatabases) {//这是之前那个WeakHashMap
            sActiveDatabases.put(this, null);
        }
    }
    
    void onCorruption() {
        EventLog.writeEvent(EVENT_DB_CORRUPT, getLabel());
        mErrorHandler.onCorruption(this);
    }
    

    open中会尝试调用openInner。如果失败一次,则调用onCorruption,随后再尝试一次。mErrorHandler是构造器传入的,构造器参数由静态方法openDatabase传入,而这个参数又最终从SQLiteOpenHelper传入。

    openInner中做的事情,从命名上看,是开启一个SQLiteConnectionPool即数据库连接池。简单地说,数据库连接池维持了对数据库的多个连接。数据库连接的类是SQLiteConnection

    3 线程内单例的SQLiteSession

    private final ThreadLocal<SQLiteSession> mThreadSession = new ThreadLocal<SQLiteSession>() {
        @Override
        protected SQLiteSession initialValue() {
            return createSession();
        }
    };
    
    SQLiteSession getThreadSession() {
        return mThreadSession.get(); // initialValue() throws if database closed
    }
    
    SQLiteSession createSession() {
        final SQLiteConnectionPool pool;
        synchronized (mLock) {
            throwIfNotOpenLocked();
            pool = mConnectionPoolLocked;
        }
        return new SQLiteSession(pool);
    }
    

    ThreadLocal会在每个线程内维护一个对象,而在线程结束时解除对对象的引用。initialValue方法会在线程中不存在已有对象时创建一个,不Override的话会给出一个null。除此之外也可以通过ThreadLocal.set来给本线程配置一个对象。
    可以看到mThreadSession是一个ThreadLocal。调用getThreadSession会获取一个线程内单例的SQLiteSession对象。

    SQLiteSession是提供数据库操作能力(增删改查以及事务)的一个单元。它会从SQLiteConnectionPool即连接池中获取连接,最终对数据库进行操作。

    到这儿类已经有点多了。整理一下逻辑:
    (1)SQLiteDatabase持有一个ThreadLocal,用于对每个线程生成一个SQLiteSession
    (2)SQLiteSession持有SQLiteConnectionPool(虽然SQLiteDatabase也持有连接池对象,但它只用来传递给SQLiteSession),但是同一个SQLiteDatabase下的SQLiteSession是共用一个SQLiteConnectionPool的;
    (3)SQLiteConnectionPool管理SQLiteConnection并适时向SQLiteSession提供之;
    (4)SQLiteConnection直接对底层数据库进行操作(这个类里面才有大量的native方法)。

    接下来分析一下SQLiteSession

    获取与释放连接,还是一个引用计数实现:

    private final SQLiteConnectionPool mConnectionPool;//构造器中初始化,值从SQLiteDatabase对象中传入
    private SQLiteConnection mConnection;
    private int mConnectionFlags;
    private int mConnectionUseCount;//无处不在的引用计数
    
    private void acquireConnection(String sql, int connectionFlags,
            CancellationSignal cancellationSignal) {
        if (mConnection == null) {
            assert mConnectionUseCount == 0;
            mConnection = mConnectionPool.acquireConnection(sql, connectionFlags,
                    cancellationSignal); // might throw
            mConnectionFlags = connectionFlags;
        }
        mConnectionUseCount += 1;
    }
    
    private void releaseConnection() {
        assert mConnection != null;
        assert mConnectionUseCount > 0;
        if (--mConnectionUseCount == 0) {
            try {
                mConnectionPool.releaseConnection(mConnection); // might throw
            } finally {
                mConnection = null;
            }
        }
    }
    

    具体的数据库操作有很多executeXXX形式的方法,逻辑大同小异。挑一个看看:

    public int executeForChangedRowCount(String sql, Object[] bindArgs, int connectionFlags,
            CancellationSignal cancellationSignal) {
        if (sql == null) {
            throw new IllegalArgumentException("sql must not be null.");
        }
    
        if (executeSpecial(sql, bindArgs, connectionFlags, cancellationSignal)) {//排除特殊操作
            return 0;
        }
    
        //获取连接
        acquireConnection(sql, connectionFlags, cancellationSignal); // might throw
        try {
            //底层数据库操作。本文不关心。
            return mConnection.executeForChangedRowCount(sql, bindArgs,
                    cancellationSignal); // might throw
        } finally {
            //释放连接
            releaseConnection(); // might throw
        }
    }
    
    //用来支持'BEGIN','COMMIT','ROLLBACK'的操作。就是与Transaction相关的操作。
    private boolean executeSpecial(String sql, Object[] bindArgs, int connectionFlags,
            CancellationSignal cancellationSignal) {
        if (cancellationSignal != null) {
            cancellationSignal.throwIfCanceled();
        }
    
        final int type = DatabaseUtils.getSqlStatementType(sql);
        switch (type) {
            case DatabaseUtils.STATEMENT_BEGIN:
                beginTransaction(TRANSACTION_MODE_EXCLUSIVE, null, connectionFlags,
                        cancellationSignal);
                return true;
    
            case DatabaseUtils.STATEMENT_COMMIT:
                setTransactionSuccessful();
                endTransaction(cancellationSignal);
                return true;
    
            case DatabaseUtils.STATEMENT_ABORT:
                endTransaction(cancellationSignal);
                return true;
        }
        return false;
    }
    

    4 单次完整的SQLite操作

    4.1 SQLiteStatement

    以最简单的delete方法为例。其它方法的流程均大同小异。

    public int delete(String table, String whereClause, String[] whereArgs) {
        acquireReference();
        try {
            SQLiteStatement statement =  new SQLiteStatement(this, "DELETE FROM " + table +
                    (!TextUtils.isEmpty(whereClause) ? " WHERE " + whereClause : ""), whereArgs);
            try {
                return statement.executeUpdateDelete();
            } finally {
                statement.close();
            }
        } finally {
            releaseReference();
        }
    }
    

    先用SQLiteStatement做一些sql转义和拼接,然后调用statement.executeUpdateDelete()

    具体看一下executeUpdateDelete

    //以下来自SQLiteStatement
    public int executeUpdateDelete() {
        acquireReference();//注意这里是SQLiteStatement内的引用计数,不是SQLiteDatabase了。
        try {
            return getSession().executeForChangedRowCount(
                    getSql(), getBindArgs(), getConnectionFlags(), null);//上一节分析过了,执行SQL。
        } catch (SQLiteDatabaseCorruptException ex) {
            onCorruption();
            throw ex;
        } finally {
            releaseReference();
        }
    }
    
    //这个方法在父类SQLiteProgram中。又回到了上一小节的getThreadSession。获取线程内的单例。
    protected final SQLiteSession getSession() {
        return mDatabase.getThreadSession();
    }
    

    4.2 SQLiteDirectCursorDriver与SQLiteQuery

    与4.1不同的是,在进行query操作时,最终没有使用SQLiteStatement类,而是通过SQLiteDirectCursorDriver间接使用了SQLiteQuery。而SQLiteQuerySQLiteStatement同为SQLiteProgram的子类,完成类似的功能。

    所有的query操作最终均调用这样一个方法:

    public Cursor rawQueryWithFactory(
            CursorFactory cursorFactory, String sql, String[] selectionArgs,
            String editTable, CancellationSignal cancellationSignal) {
        acquireReference();
        try {
            SQLiteCursorDriver driver = new SQLiteDirectCursorDriver(this, sql, editTable,
                    cancellationSignal);
            return driver.query(cursorFactory != null ? cursorFactory : mCursorFactory,
                    selectionArgs);
        } finally {
            releaseReference();
        }
    }
    

    SQLiteDirectCursorDriver的query方法如下:

    public Cursor query(CursorFactory factory, String[] selectionArgs) {
        final SQLiteQuery query = new SQLiteQuery(mDatabase, mSql, mCancellationSignal);
        final Cursor cursor;
        try {
            query.bindAllArgsAsStrings(selectionArgs);
    
            if (factory == null) {
                cursor = new SQLiteCursor(this, mEditTable, query);
            } else {
                cursor = factory.newCursor(mDatabase, this, mEditTable, query);
            }
        } catch (RuntimeException ex) {
            query.close();
            throw ex;
        }
    
        mQuery = query;
        return cursor;
    }
    

    其中新建了一个SQLiteQuery,并绑定参数。随后新建一个Cursor,这就是最终返回的Cursor对象。接下来考察无CursorFactory情况下默认返回的SQLiteCursor

    AbstractCursor中各种move方法均会调用moveToPosition,而moveToPosition会调用onMoveSQliteCursoronMove的实现为:

    @Override
    public boolean onMove(int oldPosition, int newPosition) {
        // Make sure the row at newPosition is present in the window
        if (mWindow == null || newPosition < mWindow.getStartPosition() ||
                newPosition >= (mWindow.getStartPosition() + mWindow.getNumRows())) {
            fillWindow(newPosition);
        }
    
        return true;
    }
    
    private void fillWindow(int requiredPos) {
        clearOrCreateWindow(getDatabase().getPath());
    
        try {
            if (mCount == NO_COUNT) {
                int startPos = DatabaseUtils.cursorPickFillWindowStartPosition(requiredPos, 0);
                mCount = mQuery.fillWindow(mWindow, startPos, requiredPos, true);
                mCursorWindowCapacity = mWindow.getNumRows();
                if (Log.isLoggable(TAG, Log.DEBUG)) {
                    Log.d(TAG, "received count(*) from native_fill_window: " + mCount);
                }
            } else {
                int startPos = DatabaseUtils.cursorPickFillWindowStartPosition(requiredPos,
                        mCursorWindowCapacity);
                mQuery.fillWindow(mWindow, startPos, requiredPos, false);
            }
        } catch (RuntimeException ex) {
            // Close the cursor window if the query failed and therefore will
            // not produce any results.  This helps to avoid accidentally leaking
            // the cursor window if the client does not correctly handle exceptions
            // and fails to close the cursor.
            closeWindow();
            throw ex;
        }
    }
    

    核心逻辑在mQuery.fillWindow(mWindow, startPos, requiredPos, false);这里。mQuery就是之前传入的SQLiteQuery对象。查看其fillWindow方法:

    int fillWindow(CursorWindow window, int startPos, int requiredPos, boolean countAllRows) {
        acquireReference();
        try {
            window.acquireReference();
            try {
                int numRows = getSession().executeForCursorWindow(getSql(), getBindArgs(),
                        window, startPos, requiredPos, countAllRows, getConnectionFlags(),
                        mCancellationSignal);
                return numRows;
            } catch (SQLiteDatabaseCorruptException ex) {
                onCorruption();
                throw ex;
            } catch (SQLiteException ex) {
                Log.e(TAG, "exception: " + ex.getMessage() + "; query: " + getSql());
                throw ex;
            } finally {
                window.releaseReference();
            }
        } finally {
            releaseReference();
        }
    }
    

    可以看到,最终回到了SQLiteSession.executeXXX方法逻辑之下。其余即与上一节类似。

    而从Cursor中取出数据的过程,则最终是由CursorWindow下的一系列native方法来完成,我认为属于Cursor的代码体系了,这里不重点展开。

    5 Transaction

    5.1 beginTransaction

    //一群差不多的beginTransaction方法最终调用到了这里
    private void beginTransaction(SQLiteTransactionListener transactionListener,
            boolean exclusive) {
        acquireReference();//怎么老是你
        try {
            getThreadSession().beginTransaction(
                    exclusive ? SQLiteSession.TRANSACTION_MODE_EXCLUSIVE :
                            SQLiteSession.TRANSACTION_MODE_IMMEDIATE,
                    transactionListener,
                    getThreadDefaultConnectionFlags(false /*readOnly*/), null);
        } finally {
            releaseReference();
        }
    }
    
    //上面的方法调用了这个方法。这套flags做了两件小事:1.确定只读还是可写  2.如果是主线程,就要提高连接的优先级
    int getThreadDefaultConnectionFlags(boolean readOnly) {
        int flags = readOnly ? SQLiteConnectionPool.CONNECTION_FLAG_READ_ONLY :
                SQLiteConnectionPool.CONNECTION_FLAG_PRIMARY_CONNECTION_AFFINITY;
        if (isMainThread()) {
            flags |= SQLiteConnectionPool.CONNECTION_FLAG_INTERACTIVE;
        }
        return flags;
    }
    

    还是要看SQLiteSession内部:

    public void beginTransaction(int transactionMode,
            SQLiteTransactionListener transactionListener, int connectionFlags,
            CancellationSignal cancellationSignal) {
        throwIfTransactionMarkedSuccessful();//一点合法性检查,不贴了
        beginTransactionUnchecked(transactionMode, transactionListener, connectionFlags,
                cancellationSignal);
    }
    
    private void beginTransactionUnchecked(int transactionMode,
            SQLiteTransactionListener transactionListener, int connectionFlags,
            CancellationSignal cancellationSignal) {
        if (cancellationSignal != null) {
        //cancellationSignal从beginTransaction以及SQLiteStatement诸方法传入的均为null,调查发现仅query时可以传入此参数。
            cancellationSignal.throwIfCanceled();
        }
    
        if (mTransactionStack == null) {//Transaction栈为空时才获取连接。
            acquireConnection(null, connectionFlags, cancellationSignal); // might throw
        }
        try {
            // Set up the transaction such that we can back out safely
            // in case we fail part way.
            if (mTransactionStack == null) {//如果没有进行中的Transaction,创建一个并BEGIN
                // Execute SQL might throw a runtime exception.
                switch (transactionMode) {
                    case TRANSACTION_MODE_IMMEDIATE:
                        mConnection.execute("BEGIN IMMEDIATE;", null,
                                cancellationSignal); // might throw
                        break;
                    case TRANSACTION_MODE_EXCLUSIVE:
                        mConnection.execute("BEGIN EXCLUSIVE;", null,
                                cancellationSignal); // might throw
                        break;
                    default:
                        mConnection.execute("BEGIN;", null, cancellationSignal); // might throw
                        break;
                }
            }
    
            // Listener might throw a runtime exception.
            if (transactionListener != null) {
                try {
                    transactionListener.onBegin(); // might throw
                } catch (RuntimeException ex) {
                    if (mTransactionStack == null) {
                        mConnection.execute("ROLLBACK;", null, cancellationSignal); // might throw
                    }
                    throw ex;
                }
            }
    
            // Bookkeeping can't throw, except an OOM, which is just too bad...
            Transaction transaction = obtainTransaction(transactionMode, transactionListener);//创建事务
            transaction.mParent = mTransactionStack;
            mTransactionStack = transaction;//入栈
        } finally {
            if (mTransactionStack == null) {//这里要栈为空时才释放连接。不为空时永远持有一个连接。
                releaseConnection(); // might throw
            }
        }
    }
    
    private static final class Transaction {
        public Transaction mParent;//这个是个链表,或者说在这里充当了一个栈
        public int mMode;
        public SQLiteTransactionListener mListener;
        public boolean mMarkedSuccessful;
        public boolean mChildFailed;
    }
    

    5.2 setTransactionSuccessful与endTransaction

    直接看SQLiteSession吧:

    public void setTransactionSuccessful() {
        throwIfNoTransaction();
        throwIfTransactionMarkedSuccessful();
    
        mTransactionStack.mMarkedSuccessful = true;//仅仅是个标记
    }
    
    public void endTransaction(CancellationSignal cancellationSignal) {
        throwIfNoTransaction();
        assert mConnection != null;
    
        endTransactionUnchecked(cancellationSignal, false);
    }
    
    private void endTransactionUnchecked(CancellationSignal cancellationSignal, boolean yielding) {
        if (cancellationSignal != null) {
            cancellationSignal.throwIfCanceled();
        }
    
        final Transaction top = mTransactionStack;
        boolean successful = (top.mMarkedSuccessful || yielding) && !top.mChildFailed;//如果有子Transaction失败,也是失败的
    
        RuntimeException listenerException = null;
        final SQLiteTransactionListener listener = top.mListener;
        if (listener != null) {
            try {
                if (successful) {
                    listener.onCommit(); // might throw
                } else {
                    listener.onRollback(); // might throw
                }
            } catch (RuntimeException ex) {
                listenerException = ex;
                successful = false;
            }
        }
    
        mTransactionStack = top.mParent;//退栈
        recycleTransaction(top);//回收
    
        if (mTransactionStack != null) {//还没到最外层事务,只做个标记
            if (!successful) {
                mTransactionStack.mChildFailed = true;
            }
        } else {//到了最外层事务了,提交或回滚
            try {
                if (successful) {
                    mConnection.execute("COMMIT;", null, cancellationSignal); // might throw
                } else {
                    mConnection.execute("ROLLBACK;", null, cancellationSignal); // might throw
                }
            } finally {
                releaseConnection(); // might throw
            }
        }
    
        if (listenerException != null) {
            throw listenerException;
        }
    }
    

    6 总结

    (1)总的来说,SQLiteDatabase是线程安全且高效的。它并没有简单地对每次操作加锁,而是使用引用计数和ThreadLocal来保证连接复用的线程安全性,数据一致性则交由SQLite自身去保证,以达到最优性能。
    而很多时候我们在业务层封装时反而处处加锁,其实是没有必要的。
    (2)SQLiteDatabase的内部实现会让每个线程单独持有一个数据库连接(不一定是创建,因为有连接池优化),而不是每个SQLiteDatabase对象对应一个连接。
    (3)数据库会给主线程持有的连接提高优先级。如果执行的是读操作或者小量数据的写入操作的话,可能可以满足主线程低延迟的需要。但是还没有具体的数据来支撑这一结论,希望有大牛补充。
    (4)多线程下的事务行为本文中未作分析,下一篇会就此问题单独进行讨论。

     
     
  • 相关阅读:
    2019年8月20日 item系列
    变量和字符
    python命名规则
    五大常用算法之贪心算法
    过河卒
    组成三位数
    编码问题
    FatMouse' Trade
    A Boring Game
    螺旋矩阵
  • 原文地址:https://www.cnblogs.com/endv/p/12227745.html
Copyright © 2011-2022 走看看