zoukankan      html  css  js  c++  java
  • 聊聊高并发(二十八)解析java.util.concurrent各个组件(十) 理解ReentrantReadWriteLock可重入读-写锁

    这篇讲讲ReentrantReadWriteLock可重入读写锁,它不仅是读写锁的实现,而且支持可重入性。 聊聊高并发(十五)实现一个简单的读-写锁(共享-排他锁) 这篇讲了怎样模拟一个读写锁。


    可重入的读写锁的特点是

    1. 当有线程获取读锁时,不同意再有线程获得写锁

    2. 当有线程获得写锁时。不同意其它线程获得读锁和写锁


    这里隐含着几层含义:

     static final int SHARED_SHIFT   = 16;
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            /** Returns the number of shared holds represented in count  */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** Returns the number of exclusive holds represented in count  */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    1. 能够同一时候有多个线程同一时候获得读锁,进入临界区。这时候的读锁的行为和Semaphore信号量是类似的

    2. 因为是可重入的。所以1个线程假设获得了读锁,那么它能够重入这个读锁

    3. 假设1个线程获得了读锁。那么它不能同一时候再获得写锁,这个就是所谓的“锁升级”,读锁升级到写锁可能会造成死锁,所以是不同意的

    4. 假设1个线程获得了写锁,那么不同意其它线程再获得读锁和写锁,可是它自己能够获得读锁,就是所谓的“锁降级”,锁降级是同意的


    关于读写锁的实现还要考虑的几个要点:

    1. 释放锁时的优先级问题。是让写锁先获得还是先让读锁先获得

    2. 是否同意读线程插队

    3. 是否同意写线程插队。由于读写锁一般用在大量读,少量写的情况,假设写线程没有优先级,那么可能造成写线程的饥饿

    4. 锁的升降级问题,通常是同意1个线程的写锁降级为读锁,不同意读锁升级成写锁


    带着问题看看ReentrantReadWriteLock的源代码。 它相同提供了Sync来继承AQS并提供扩展,可是它的Sync相比較Semaphore和CountDownLatch要更加复杂。

    1. 把State状态作为一个读写锁的计数器,包含了重入的次数。

    state是32位的int值,所以把高位16位作为读锁的计数器,低位的16位作为写锁的计数器,并提供了响应的读写这两个计数器的位操作方法。

    计算sharedCount时,採用无符号的移位操作,右移16位就是读锁计数器的值

    写锁直接用EXCLUSIVE_MASK和state做与运算。EXCLUSIVE_MASK的值是00000000000000001111111111111111,相当于计算了低位16位的值

    须要注意计算出来的值包括了重入的次数。

    所以MAX_COUNT限定了最大值是2^17 - 1

     static final int SHARED_SHIFT   = 16;
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            /** Returns the number of shared holds represented in count  */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** Returns the number of exclusive holds represented in count  */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

    HoldCount类用来计算1个线程的重入次数,并使用了1个ThreadLocal类型的HoldCounter,能够记录每一个线程的锁的重入次数。 cachedHoldCounter记录了最后1个获取读锁的线程的重入次数。 firstReader指向了第一个获取读锁的线程,firstReaderHoldCounter记录了第一个获取读锁的线程的重入次数

    static final class HoldCounter {
                int count = 0;
                // Use id, not reference, to avoid garbage retention
                final long tid = Thread.currentThread().getId();
            }
    
            /**
             * ThreadLocal subclass. Easiest to explicitly define for sake
             * of deserialization mechanics.
             */
            static final class ThreadLocalHoldCounter
                extends ThreadLocal<HoldCounter> {
                public HoldCounter initialValue() {
                    return new HoldCounter();
                }
            }
    
    /**
             * The hold count of the last thread to successfully acquire
             * readLock. This saves ThreadLocal lookup in the common case
             * where the next thread to release is the last one to
             * acquire. This is non-volatile since it is just used
             * as a heuristic, and would be great for threads to cache.
             *
             * <p>Can outlive the Thread for which it is caching the read
             * hold count, but avoids garbage retention by not retaining a
             * reference to the Thread.
             *
             * <p>Accessed via a benign data race; relies on the memory
             * model's final field and out-of-thin-air guarantees.
             */
            private transient HoldCounter cachedHoldCounter;
    

    Sync提供了两个抽象方法给子类扩展。用来表示读锁和写锁是否应该堵塞等待

    /**
             * Returns true if the current thread, when trying to acquire
             * the read lock, and otherwise eligible to do so, should block
             * because of policy for overtaking other waiting threads.
             */
            abstract boolean readerShouldBlock();
    
            /**
             * Returns true if the current thread, when trying to acquire
             * the write lock, and otherwise eligible to do so, should block
             * because of policy for overtaking other waiting threads.
             */
            abstract boolean writerShouldBlock();

    写锁的tryXXX获取和释放

    1. 写锁释放时,因为没有其它线程获得临界区。它的tryRelease()方法仅仅须要设置状态的值。通过exclusiveCount计算写锁的计数器,假设为0表示释放了写锁,就把exclusiveOwnerThread设置为null.

    2. 写锁的tryAcquire获取时。

        先推断状态是否为0,为0表示没有线程获得锁,就能够直接设置状态。然后把exclusiveOwnerThread设置为当前线程

        假设状态不为0,那表示有几种可能:写锁为0。读锁不为0。写锁不为0。读锁为0。写锁不为0,读锁也不为0。

        所以它先推断写锁是否为0。写锁为0,那么表示读锁肯定不会为0,就失败,

        或者写锁不为0,可是exclusiveOwnerThread不是自己。那么表示已经有其它线程获得了写锁,就失败

        写锁不为0,而且exclusiveOwnerThread是自己。那么肯定表示是写锁的重入的情况,所以设置state状态。返回成功。

       

    protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    
    protected final boolean tryAcquire(int acquires) {
                /*
                 * Walkthrough:
                 * 1. If read count nonzero or write count nonzero
                 *    and owner is a different thread, fail.
                 * 2. If count would saturate, fail. (This can only
                 *    happen if count is already nonzero.)
                 * 3. Otherwise, this thread is eligible for lock if
                 *    it is either a reentrant acquire or
                 *    queue policy allows it. If so, update state
                 *    and set owner.
                 */
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    

    读锁的tryXXX获取和释放

    1. 读锁释放时基于共享的方式,改动线程各自的HoldCounter的值。最后採用位操作改动位于state的整体的读锁计数器。tryReleaseShared()之后详细的释放兴许线程的操作由AQS依据队列状态来决定。

    2. 读所获取时先看写锁的计数器,假设写锁已经被获取。而且不是当前线程所获取的。就直接失败返回

        这里会进行一次高速路径获取,尝试获取一次,假设readShouldBlock()返回false,而且CAS操作成功了,意思是能够获得锁,就更新相关读锁计数器

        否则就进行轮询方式的获取fullTryAcquireShared()

        也就是说假设当前没有线程获取写锁,或者是自己获取写锁。就能够获取读锁

        一个线程获取了写锁之后,它还能够获取读锁,也就是所谓的“锁降级”,但这时候其它线程无法获取读锁。在检查到有其它写锁存在时就退出了

    protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId())
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;
                }
            }
    
    protected final int tryAcquireShared(int unused) {
                /*
                 * Walkthrough:
                 * 1. If write lock held by another thread, fail.
                 * 2. Otherwise, this thread is eligible for
                 *    lock wrt state, so ask if it should block
                 *    because of queue policy. If not, try
                 *    to grant by CASing state and updating count.
                 *    Note that step does not check for reentrant
                 *    acquires, which is postponed to full version
                 *    to avoid having to check hold count in
                 *    the more typical non-reentrant case.
                 * 3. If step 2 fails either because thread
                 *    apparently not eligible or CAS fails or count
                 *    saturated, chain to version with full retry loop.
                 */
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    
     /**
             * Full version of acquire for reads, that handles CAS misses
             * and reentrant reads not dealt with in tryAcquireShared.
             */
            final int fullTryAcquireShared(Thread current) {
                /*
                 * This code is in part redundant with that in
                 * tryAcquireShared but is simpler overall by not
                 * complicating tryAcquireShared with interactions between
                 * retries and lazily reading hold counts.
                 */
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    rh = readHolds.get();
                for (;;) {
                    int c = getState();
                    int w = exclusiveCount(c);
                    if ((w != 0 && getExclusiveOwnerThread() != current) ||
                        ((rh.count | w) == 0 && readerShouldBlock(current)))
                        return -1;
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        cachedHoldCounter = rh; // cache for release
                        rh.count++;
                        return 1;
                    }
                }
            } 


    tryWriteLock和tryReadLock操作和上面的操作类似,它们是读写锁的tryLock()的实际实现,表示尝试获取一次锁

    1. tryWriteLock方法尝试获得写锁,先推断状态是否为0,为0而且CAS操作成功就表示获得锁。假设状态不为0,就推断写锁计数器的值。假设写锁计数器为0就表示存在读锁,就返回失败。获取写锁不为0,可是不是当前线程所获取的,也返回失败。仅仅有写锁不为0而且是当前线程自己获取的写锁,就是所谓的写锁重入操作。

    CAS成功后就表示获得写锁

    final boolean tryWriteLock() {
                Thread current = Thread.currentThread();
                int c = getState();
                if (c != 0) {
                    int w = exclusiveCount(c);
                    if (w == 0 ||current != getExclusiveOwnerThread())
                        return false;
                    if (w == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                }
                if (!compareAndSetState(c, c + 1))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    
    final boolean tryReadLock() {
                Thread current = Thread.currentThread();
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0 &&
                        getExclusiveOwnerThread() != current)
                        return false;
                    if (sharedCount(c) == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            cachedHoldCounter = rh = readHolds.get();
                        rh.count++;
                        return true;
                    }
                }
            }
    

    ReentrantReadWriteLock也提供了非公平和公平的两个Sync版本号

    非公平的版本号中

    1. 写锁总是优先获取。不考虑AQS队列中先来的线程

    2. 读锁也不按FIFO队列排队,而是看当前获得锁是否是写锁,假设是写锁,就等待。否则就尝试获得锁

    而公平版本号中

    1. 假设有其它锁存在,获取写锁操作就失败。应该(should)进AQS队列等待

    2. 假设有其它锁存在。获取读锁操作就失败。应该(should)进AQS队列等待

    final static class NonfairSync extends Sync {
            private static final long serialVersionUID = -8159625535654395037L;
            final boolean writerShouldBlock(Thread current) {
                return false; // writers can always barge
            }
            final boolean readerShouldBlock(Thread current) {
                /* As a heuristic to avoid indefinite writer starvation,
                 * block if the thread that momentarily appears to be head
                 * of queue, if one exists, is a waiting writer. This is
                 * only a probablistic effect since a new reader will not
                 * block if there is a waiting writer behind other enabled
                 * readers that have not yet drained from the queue.
                 */
                return apparentlyFirstQueuedIsExclusive();
            }
        }
    
        /**
         * Fair version of Sync
         */
        final static class FairSync extends Sync {
            private static final long serialVersionUID = -2274990926593161451L;
            final boolean writerShouldBlock(Thread current) {
                // only proceed if queue is empty or current thread at head
                return !isFirst(current);
            }
            final boolean readerShouldBlock(Thread current) {
                // only proceed if queue is empty or current thread at head
                return !isFirst(current);
            }
        }

    详细ReadLock和WriteLock的实现就是依赖Sync来实现的,默认是非公平版本号的Sync。

    读锁採用共享默认的AQS,它提供了中断/不可中断的lock操作,tryLock操作,限时的tryLock操作。

    值得注意的时读锁不支持newCondition操作。

     public static class ReadLock implements Lock, java.io.Serializable  {
            private static final long serialVersionUID = -5992448646407690164L;
            private final Sync sync;
    
            protected ReadLock(ReentrantReadWriteLock lock) {
                sync = lock.sync;
            }
    
            
            public void lock() {
                sync.acquireShared(1);
            }
    
            public void lockInterruptibly() throws InterruptedException {
                sync.acquireSharedInterruptibly(1);
            }
    
            public  boolean tryLock() {
                return sync.tryReadLock();
            }
    
            public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
                return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
            }
    
            public  void unlock() {
                sync.releaseShared(1);
            }
    
            public Condition newCondition() {
                throw new UnsupportedOperationException();
            }
    

    WriteLock基于独占模式的AQS,它提供了中断/不可中断的lock操作。tryLock操作,限时的tryLock操作

     public static class WriteLock implements Lock, java.io.Serializable  {
            private static final long serialVersionUID = -4992448646407690164L;
    	private final Sync sync;
    
            protected WriteLock(ReentrantReadWriteLock lock) {
                sync = lock.sync;
            }
    
            public void lock() {
                sync.acquire(1);
            }
    
            public void lockInterruptibly() throws InterruptedException {
                sync.acquireInterruptibly(1);
            }
    
            public boolean tryLock( ) {
                return sync.tryWriteLock();
            }
    
            public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
                return sync.tryAcquireNanos(1, unit.toNanos(timeout));
            }
    
            public void unlock() {
                sync.release(1);
            }
    
            public Condition newCondition() {
                return sync.newCondition();
            }
    


    最后再说一下AQS和各种同步器实现的关系,AQS提供了同步队列和条件队列的管理。包含各种情况下的入队出队操作。

    而同步器子类实现了tryAcquire和tryRelease方法来操作状态。来表示什么情况下能够直接获得锁而不须要进入AQS。什么情况下获取锁失败则须要进入AQS队列等待


  • 相关阅读:
    67. Add Binary
    66. Plus One
    64. Minimum Path Sum
    63. Unique Paths II
    How to skip all the wizard pages and go directly to the installation process?
    Inno Setup打包之先卸载再安装
    How to change the header background color of a QTableView
    Openstack object list 一次最多有一万个 object
    Openstack 的 Log 在 /var/log/syslog 里 【Ubuntu】
    Git 分支
  • 原文地址:https://www.cnblogs.com/liguangsunls/p/7088090.html
Copyright © 2011-2022 走看看