zoukankan      html  css  js  c++  java
  • Java 1.7 ReentrantReadWriteLock源代码解析

    因为本人水平与表达能力有限,有错误的地方欢迎交流与指正。

    1 简单介绍

    可重入读写锁时基于AQS实现的,典型的用法如JDK1.7中的演示样例:

      class RWDictionary {
          private final Map<String, Data> m = new TreeMap<String, Data>();
          private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
          private final Lock r = rwl.readLock();
          private final Lock w = rwl.writeLock();
      
          public Data get(String key) {
              r.lock();
              try { return m.get(key); }
              finally { r.unlock(); }
          }
          public String[] allKeys() {
              r.lock();
              try { return m.keySet().toArray(); }
              finally { r.unlock(); }
          }
          public Data put(String key, Data value) {
              w.lock();
              try { return m.put(key, value); }
              finally { w.unlock(); }
          }
          public void clear() {
              w.lock();
              try { m.clear(); }
              finally { w.unlock(); }
          }
       }}
    

    读锁使用的是AQS的共享模式。不会堵塞读锁,可是会堵塞写锁;写锁使用的是AQS的独占模式,读写锁都会被堵塞。读写锁是共用了一个Sync(AQS类)。也就是说AQS中独占模式和共享模式是并存的。

    Sync有两种实现方式。公平(FairSync)和非公平(NonfairSync)。

    公平的含义是指假设AQS的队列中有等待线程。则当前线程直接就放弃尝试获取锁,自觉的排队了;而非公平方式不一样,当前线程还是要尝试一下(只一下。和AQS队列中第一个结点竞争获取锁),假设成功了,相当于插队成功了,可是假设失败了(就是tryAcquire或tryAcquireShared失败)。还是要乖乖的排到最后去。

    以下的是整个类的结构图:


    Sync是个AQS类,它有两个子类FairSync和NonfairSync。

    ReadLock和WriteLock里有个成员变量sync(指向同个变量,FaireSync或NonfaireSync类型)。(UML图不是非常熟,就这样文字描写叙述了)

    2 ReentrantReadWriteLock主类

    public class ReentrantReadWriteLock
            implements ReadWriteLock, java.io.Serializable {
        /** Inner class providing readlock */
        private final ReentrantReadWriteLock.ReadLock readerLock;
        /** Inner class providing writelock */
        private final ReentrantReadWriteLock.WriteLock writerLock;
        /** Performs all synchronization mechanics */
        final Sync sync;
    
        /**
         * false,默认锁是非公平的
         */
        public ReentrantReadWriteLock() {
            this(false);
        }
    
        /**
         * Creates a new {@code ReentrantReadWriteLock} with
         * the given fairness policy.
         *
         * @param fair {@code true} if this lock should use a fair ordering policy
         */
        public ReentrantReadWriteLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
        public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
        public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    

    主类有三个重要的成员变量:读锁、写锁和同步器。从构造函数能够看出读写锁的同步器默认是非公平的(NonfaireSync)。

    3 ReadLock类

    实现了Lock接口,并使用sync成员变量实现加锁、解锁功能

    3.1 lock

    public void lock() {
                sync.acquireShared(1);
            }
    

     以AQS共享模式获取锁,參数值为1。假设系统中没有线程占有写锁。那么这个函数非常快就会返回。否则,当前线程会一直堵塞,直至获取到锁。

    3.2 lockInterruptibly

      public void lockInterruptibly() throws InterruptedException {
                sync.acquireSharedInterruptibly(1);
            }
    

      以AQS共享模式获取锁,參数值为1。假设系统中没有线程占有写锁。那么这个函数非常快就会返回;否则,当前线程会一直堵塞,直至获取到锁或被其它线程中断。

    3.3 tryLock

      public  boolean tryLock() {
                return sync.tryReadLock();
            }
    

     直接调用了sync的tryReadLock方法。这种方法和sync. tryAcquireShared基本一直(少了一个readerShouldBlock推断)。

    3.4 带超时的tryLock

    public boolean tryLock(long timeout, TimeUnit unit)
                    throws InterruptedException {
                return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
            }
    

    tryLock函数尝试获取锁,直到获取到或超时或被中断。

    假设当前没有线程占用写锁。会马上返回成功。和3.3的tryLock不一样,这个公平模式下会排队的。假设你不想排队又想支持超时,能够这么写代码:if(lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }

    3.5 newCondition

    public Condition newCondition() {
                throw new UnsupportedOperationException();
            }
    

     读锁不支持条件队列的。

    4 WriteLock类

    实现了Lock接口。并使用sync成员变量实现加锁、解锁功能

    4.1 lock

    public void lock() {
                sync.acquire(1);
            }
    

      以AQS独占方式获取写锁,假设当前没有线程占有读锁和写锁。该函数会马上返回;假设当前线程已经获取写锁了。holdCount的值会加1。否则。会堵塞直至获取到锁。

    注意:假设当前线程已经获取到读锁了,紧接着就获取写锁就会死锁了。

    4.2 lockInterruptibly

    public void lockInterruptibly() throws InterruptedException {
                sync.acquireInterruptibly(1);
            }
    

       以AQS独占方式 获取写锁直到被中断。

    其它跟lock一致。

    4.3 tryLock

    public boolean tryLock( ) {
                return sync.tryWriteLock();
            }
    

    直接调用了sync的tryWriteLock方法。这种方法和sync. tryAcquire基本一直(少了一个writerShouldBlock推断)。

    无论在公平模式还是非公平模式下。都不用排队。

    4.4 带超时的tryLock

    public boolean tryLock(long timeout, TimeUnit unit)
                    throws InterruptedException {
                return sync.tryAcquireNanos(1, unit.toNanos(timeout));
            }
    

       tryLock函数尝试获取锁。直到获取到或超时或被中断。假设没有其它线程占有读锁和写锁,立刻返回true。和4.3不一样。公平模式下会排队的。假设你又想插队又想支持超时。能够这么写:if(lock.tryLock() || lock.tryLock(timeout, unit) ) { ... }

    4.5 newCondition

    public Condition newCondition() {
                return sync.newCondition();
            }
    

    4.6 isHeldByCurrentThread

    public boolean isHeldByCurrentThread() {
                return sync.isHeldExclusively();
            }
    

      当前线程是否占有写锁。

    4.7 getHoldCount

    public int getHoldCount() {
                return sync.getWriteHoldCount();
            }
    

     当前线程占有几个写锁。

    5 Sync类

        读锁和写锁公用的同步器,有两个版本号:公平的和非公平的。

    5.1 内部类结构

    abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 6317671515068378041L;
    
            /*
             * AQS的state字段被拆成两部分了:高16位表示获取读锁的次数,低16位表示
             * 获取写锁的次数
             */
            static final int SHARED_SHIFT   = 16;
    // 每次线程获取读锁成功就会运行state+=SHARED_UNIT操作。不是+1由于
    // 高16位表示获取读锁的次数。
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    // 同意读或写获取锁的最大次数。都是65535
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            /** 获取当前读锁的总数 */
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            /** 获取当前写锁的总数 */
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
            /**
             * A counter for per-thread read hold counts.
             * Maintained as a ThreadLocal; cached in cachedHoldCounter
             */
            static final class HoldCounter {
                int count = 0;
                // Use id, not reference, to avoid garbage retention
                final long tid = Thread.currentThread().getId();
            }
    
            /**
             * 每一个线程都绑定一个HoldCounter对象
             */
            static final class ThreadLocalHoldCounter
                extends ThreadLocal<HoldCounter> {
                public HoldCounter initialValue() {
                    return new HoldCounter();
                }
            }
    
            /**
             * The number of reentrant read locks held by current thread.
             * Initialized only in constructor and readObject.
             * Removed whenever a thread's read hold count drops to 0.
             */
    // 第一个获取读锁线程的HoldCounter没有在里面管理,而是通过firstReader
    // 和firstReaderHoldCount两个变量维护的。

    private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds } /* * Acquires and releases use the same code for fair and * nonfair locks, but differ in whether/how they allow barging * when queues are non-empty. */ /** * Returns true if the current thread, when trying to acquire * the read lock, and otherwise eligible to do so, should block * because of policy for overtaking other waiting threads. */ abstract boolean readerShouldBlock(); /** * Returns true if the current thread, when trying to acquire * the write lock, and otherwise eligible to do so, should block * because of policy for overtaking other waiting threads. */ abstract boolean writerShouldBlock();

    5.2 tryAcquire

    protected final boolean tryAcquire(int acquires) {
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                  // c!=0&&w==0说明有线程(包含当前线程)持有读锁。直接返回false
    // c!=0&&w!=0&¤t!=当前持有锁的线程,直接返回false
    // 注意:假设一个线程先获取读锁。紧接着获取写锁时会死锁的
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
    // 写锁数量超过65535。直接抛异常了
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // 走到这边说明这个锁是重入了的,不须要CAS了,直接设置state
                    setState(c + acquires);
                    return true;
                }
    // c==0或者重入的,假设写须要堵塞或者CAS设置state失败,直接返回false
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
    // 设置独占线程标识
                setExclusiveOwnerThread(current);
                return true;
            }
    

     tryAcquire 函数是尝试获取写锁:1.假设有读线程或者写线程且不是当前线程。直接失败;2.假设写锁的count超过了65535。直接失败;3.否则,这个线程可以拥有锁(eligible),队列策略同意(writerShouldBlock返回false)或者是重入的锁。

    5.3 tryRelease

    protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
    // state的高16位和低16位肯定都是大于releases的,能够直接相减
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;
    // 假设当前写锁释放后没有写锁了,置空独占标识
                if (free)
                    setExclusiveOwnerThread(null);
    // 设置新的state
                setState(nextc);
                return free;
            }
    

     tryRelease函数普通情况下就是释放写锁,可是也有不一般的情况,就是在Condition中调用tryRelease,因此,releases參数可能会包括读和写两个锁的信息。

    5.4 tryAcquireShared

    protected final int tryAcquireShared(int unused) {
    // 获取当前线程和state值         
                Thread current = Thread.currentThread();
                int c = getState();
    // 假设有线程持有写锁且该线程不是当前线程。直接返回-1
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            cachedHoldCounter = rh = readHolds.get();
    // 为啥等于0的时候要set下?由于release时,count=0会运行readHolds.remove
    // 方法,可是不会清空cachedHoldCounter。

    此时。同个线程再成功获取读锁时count // 的值是0且readHolds已经为空了,因此要又一次set下。 else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }

      tryAcquireShared函数的參数(unused)没实用。主要是尝试获取读锁:

    1.假设有线程持有写锁。直接返回失败;

    2.否则,继续推断,假设队列策略同意(readerShouldBlock返回false)获取锁且CAS设置state成功。则设置读锁count的值。这一步并没有检查读锁重入的情况,被延迟到fullTryAcquireShared里了。由于大多数情况下不是重入的;

    3.假设步骤2失败了,也许是队列策略返回false也许是CAS设置失败了等,则运行fullTryAcquireShared。

    final int fullTryAcquireShared(Thread current) {          
                HoldCounter rh = null;
    // 是一个循环   
                for (;;) {
                    int c = getState();
                    if (exclusiveCount(c) != 0) {
    // 有线程持有写锁且不是当前线程,直接失败
                        if (getExclusiveOwnerThread() != current)
                            return -1;
                        // else we hold the exclusive lock; blocking here
                        // would cause deadlock.
    // 假设队列策略不同意,须要检查是否是读锁重入的情况。

    队列策略是否同意,分两种情况: // 1.公平模式:假设当前AQS队列前面有等待的结点。返回false;2.非公平模式:假设 // AQS前面有线程在等待写锁。返回false(这样做的原因是为了防止写饥饿)。 } else if (readerShouldBlock()) { // 假设当前线程是第一个获取读锁的线程,则有资格获取读锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { if (rh == null) { // 优先赋值成上一次获取读锁成功的cache,假设发现线程tid和当前线程不相等,在从 // ThreadLocal里获取 rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) { rh = readHolds.get(); if (rh.count == 0) // 帮助GC readHolds.remove(); } } // 说明不是读锁重入的情况。直接返回失败了 if (rh.count == 0) return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 设置当前线程为第一个获取读锁的线程 if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; // 说明当前线程为第一个获取读锁的线程 } else if (firstReader == current) { firstReaderHoldCount++; } else { // 其它获取读锁成功的情况 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }

     fullTryAcquireShared函数处理上面tryAcquireShared中没有处理的读锁重入的问题或者CAS设置失败。事实上。这函数代码和tryAcquireShared有些反复,可是把处理读锁重入的问题从tryAcquireShared中分离出来了。

    5.5 tryReleaseShared

    protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
    // 假设当前线程是第一个获取读锁的,须要特殊处理(第一个reader不是用
    // HoldCounter类型变量保存的)
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
    // 假设firstReaderHoldCount<=0也没报错啊(不会出现这样的情况的)。
    // 由于firstReaderHoldCount==1时,firstReader就是null了。条件
    // firstReader==current就不会成立了
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
    // 假设当前线程是最后一个获取锁的,就是cachedHoldCounter了
    // 假设也不是最后一个获取锁的。就要从threadlocal里取了
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId())
                        rh = readHolds.get();
                    int count = rh.count;
    // count==1的话直接运行readHolds.remove;count<=0就报错
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
    // mark:当且仅当全部的读和写线程都释放锁了,才会返回true
                        return nextc == 0;
                }
            }
    

    tryReleaseShared函数的作用就是释放读锁。须要注意两点:1、假设一个线程没有获取过读锁,运行release方法会报异常。2、当且仅当全部的读和写线程都释放了,这个函数才会返回true。

    5.6 tryWriteLock

    final boolean tryWriteLock() {
                Thread current = Thread.currentThread();
                int c = getState();
                if (c != 0) {
                    int w = exclusiveCount(c);
    // 假设有线程持有读锁(自己也不行)直接返回false
    // 假设不是重入获取写锁的直接返回false
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                }
    // c==0或者写锁重入
                if (!compareAndSetState(c, c + 1))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    

      tryWriteLock函数会被写锁调用。和tryAcquire基本一致除了少调用一个writerShouldBlock函数,公平和非公平两种模式都同意插队(barging in)相当于是非公平模式了。

    5.7 tryReadLock

    final boolean tryReadLock() {
                Thread current = Thread.currentThread();
                for (;;) {
                    int c = getState();
    // 假设有线程持有写锁且该线程不是当前线程,直接返回-1
                    if (exclusiveCount(c) != 0 &&
                        getExclusiveOwnerThread() != current)
                        return false;
                    int r = sharedCount(c);
                    if (r == MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    if (compareAndSetState(c, c + SHARED_UNIT)) {
                        if (r == 0) {
                            firstReader = current;
                            firstReaderHoldCount = 1;
                        } else if (firstReader == current) {
                            firstReaderHoldCount++;
                        } else {
                            HoldCounter rh = cachedHoldCounter;
                            if (rh == null || rh.tid != current.getId())
                                cachedHoldCounter = rh = readHolds.get();
                            else if (rh.count == 0)
                                readHolds.set(rh);
                            rh.count++;
                        }
                        return true;
                    }
                }
            }
    

    tryReadLock函数被读锁调用(tryLock),和tryAcquireShared函数基本一致,除了少调用一个readerShouldBlock方法,公平和非公平两种模式都同意插队(barging in)相当于是非公平模式了。

    6 NonfairSync类

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -8159625535654395037L;
    // 写线程无条件插队
            final boolean writerShouldBlock() {
                return false; // writers can always barge
            }
            final boolean readerShouldBlock() {
                // 为了方式写线程饥饿的情况,假设AQS等待队里的第一个线程是独占的,
                // 当前读线程就堵塞。
                return apparentlyFirstQueuedIsExclusive();
            }
        }
    

    非公平锁的实现类。

    7 FairSync类

    static final class FairSync extends Sync {
            private static final long serialVersionUID = -2274990926593161451L;
    // 假设AQS队列里有等待的线程,当前线程就堵塞
            final boolean writerShouldBlock() {
                return hasQueuedPredecessors();
            }
    // 假设AQS队列里有等待的线程,当前线程就堵塞
            final boolean readerShouldBlock() {
                return hasQueuedPredecessors();
            }
        }
    

     公平锁的实现类writerShouldBlock和readerShouldBlock实现方式全然一样。仅仅要队列里有其它线程在等待。当前线程就堵塞,FIFO模式。





  • 相关阅读:
    pageload 重新生成动态控件的問題
    管理員維護完整代碼如下:
    Control 'GridView1' of type 'GridView' must be placed inside a form tag with runat=server
    sqlserver兩種查詢方式效率比較
    管理員修改頁面代碼
    SQL2008卸载 检查是否重启错误
    安装Java环境
    asp.net cookie读写
    绘制几何图形——使用android.graphics类
    正则获取图片路径
  • 原文地址:https://www.cnblogs.com/brucemengbm/p/7272573.html
Copyright © 2011-2022 走看看