zoukankan      html  css  js  c++  java
  • 源码分析— java读写锁ReentrantReadWriteLock

    前言

    今天看Jraft的时候发现了很多地方都用到了读写锁,所以心血来潮想要分析以下读写锁是怎么实现的。

    先上一个doc里面的例子:

    class CachedData {
      Object data;
      volatile boolean cacheValid;
      final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    
      void processCachedData() {
    	  //加上一个读锁
        rwl.readLock().lock();
        if (!cacheValid) {
          // Must release read lock before acquiring write lock
          //必须在加写锁之前释放读锁
          rwl.readLock().unlock();
          rwl.writeLock().lock();
          try {
            // Recheck state because another thread might have
            // acquired write lock and changed state before we did.
    		  //双重检查
            if (!cacheValid) {
    			//设置值
              data = ...
              cacheValid = true;
            }
            // Downgrade by acquiring read lock before releasing write lock
    		  //锁降级,反之则不行
            rwl.readLock().lock();
          } finally {
    		  //释放写锁,但是仍然持有写锁
            rwl.writeLock().unlock(); // Unlock write, still hold read
          }
        }
    
        try {
          use(data);
        } finally {
    		//释放读锁
          rwl.readLock().unlock();
        }
      }
    }}
    
    

    我们一般实例化一个ReentrantReadWriteLock,一般是调用空的构造器创建,所以默认使用的是非公平锁

    public ReentrantReadWriteLock() {
        this(false);
    }
    
    
    public ReentrantReadWriteLock(boolean fair) {
    	  //默认使用的是NonfairSync
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
    //分别调用writeLock和readLock会返回读写锁实例
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    
    

    ReentrantReadWriteLock内部类Sync

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;
     	  //位移量
    	  //在读写锁中,state是一个32位的int,所以用state的高16位表示读锁,用低16位表示写锁
        static final int SHARED_SHIFT   = 16;
    	  //因为读锁是高16位,所以用1向左移动16位表示读锁每次锁状态变化的量
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    	  //最大的可重入次数
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    	  //用来计算低16位的写锁状态
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
        //获取高16位读锁state次数,重入次数
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        //获取低16位写锁state次数,重入次数
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
        //用来记录每个线程持有的读锁数量
        static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }
    
        
        static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }
    
        private transient ThreadLocalHoldCounter readHolds;
     	  // 用于缓存,记录"最后一个获取读锁的线程"的读锁重入次数
        private transient HoldCounter cachedHoldCounter;
     	  // 第一个获取读锁的线程(并且其未释放读锁),以及它持有的读锁数量
        private transient Thread firstReader = null;
        private transient int firstReaderHoldCount;
    
        Sync() {
    		  // 初始化 readHolds 这个 ThreadLocal 属性
            readHolds = new ThreadLocalHoldCounter();
            setState(getState()); // ensures visibility of readHolds
        }
    	....
    }
    
    1. 因为int是32位的,所以在ReentrantReadWriteLock中将state分为两部分,高16位作为读锁的状态控制器,低16位作为写锁的状态控制器。
    2. 每次要获取读锁的当前状态都需要调用sharedCount传入当前的state,将state向右移动16位来获取
    3. 要获取低16位则需要将1左移16位减一,获得一个低16位全是1的数,然后和传入的state进行取与操作获取state的低16位的值
    4. cachedHoldCounter里面保存了最新的读锁的线程和调用次数
    5. firstReaderfirstReaderHoldCount 将”第一个”获取读锁的线程记录在 firstReader 属性中,这里的第一个不是全局的概念,等这个 firstReader 当前代表的线程释放掉读锁以后,会有后来的线程占用这个属性的。

    读锁获取

    //readLock#lock
    public void lock() {
    	  //这里会调用父类AQS的acquireShared,尝试获取锁
        sync.acquireShared(1);
    }
    //AQS#acquireShared
    public final void acquireShared(int arg) {
    	  //返回值小于 0 代表没有获取到共享锁
        if (tryAcquireShared(arg) < 0)
    		  //进入到阻塞队列,然后等待前驱节点唤醒
            doAcquireShared(arg);
    }
    

    这里的tryAcquireShared是调用ReentrantReadWriteLock的内部类Sync的tryAcquireShared的方法

    protected final int tryAcquireShared(int unused) {
     	  //获取当前线程
        Thread current = Thread.currentThread();
    	  //获取AQS中的state属性值
        int c = getState();
        //exclusiveCount方法是用来获取写锁状态,不等于0代表有写锁
        if (exclusiveCount(c) != 0 &&
    		  //如果不是当前线程获取的写锁,那么直接返回-1
            getExclusiveOwnerThread() != current)
            return -1;
    	  //获取读锁的锁定次数
        int r = sharedCount(c);
    	  // 读锁获取是否需要被阻塞
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            //因为高16位代表共享锁,所以CAS需要加上一个SHARED_UNIT
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
    			  //记录一下首次读线程
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
    			   //firstReader 重入获取读锁
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
    			  // 如果 cachedHoldCounter 缓存的不是当前线程,设置为缓存当前线程的 HoldCounter
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
    		   // return 大于 0 的数,代表获取到了共享锁
            return 1;
        }
        return fullTryAcquireShared(current);
    }
    
    
    1. 首先会去调用exclusiveCount方法来查看写锁是否被占用,如果被占用,那么查看当前线程是否是占用读锁的线程,如果不是则返回-1。通过这里可以看出可以先占用读锁再占用写锁
    2. 调用readerShouldBlock方法获取是否需要阻塞读锁获取,然后检查一下高16位读锁重入次数是否超过了2^16-1,最后通过CAS操作将state高16进行加1操作,如果没有其他线程抢占就会成功
    3. 如果state的高16位为零,那么就设置首次读线程和首次数次数,如果不是则校验首次读线程是不是当前线程,是的话将firstReaderHoldCount次数加1。如果不是首次读线程,那么校验一下最后一次读线程是不是当前线程,不是的话就从readHolds中获取,并将HoldCounter计数加1,如果最后读线程是当前线程那么计数加1

    readerShouldBlock

    //NonfairSync#readerShouldBlock
    final boolean readerShouldBlock() {
        return apparentlyFirstQueuedIsExclusive();
    }
    //AQS
    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }
    

    在非公平模式中readerShouldBlock会调用AQS的方法,判断当前头节点的下一个节点,如果不是共享节点,那么readerShouldBlock就返回true,读锁就会阻塞。

    //FairSync#readerShouldBlock
    final boolean readerShouldBlock() {
        return hasQueuedPredecessors();
    }
    //AQS
    public final boolean hasQueuedPredecessors() {
       
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    
    

    在公平模式中会去看看队列里有没有其他元素在队列里等待获取锁,如果有那么读锁就进行阻塞

    ReentrantReadWriteLock#fullTryAcquireShared

    final int fullTryAcquireShared(Thread current) {
       
        HoldCounter rh = null;
        for (;;) {
            int c = getState();
    		  //检查是否写锁被占用
            if (exclusiveCount(c) != 0) {
    			   //被占用,但是占用读锁线程不是当前线程,返回阻塞
                if (getExclusiveOwnerThread() != current)
                    return -1;
                // else we hold the exclusive lock; blocking here
                // would cause deadlock.
    			//检查读锁是否应该被阻塞
            } else if (readerShouldBlock()) {
                // Make sure we're not acquiring read lock reentrantly
    			  //首次读线程是当前线程,下面直接CAS
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    if (rh == null) {
    					   //设置最后一次读线程
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            rh = readHolds.get();
                            if (rh.count == 0)
    							   //如果发现 count == 0,也就是说,纯属上一行代码初始化的,那么执行 remove
                                readHolds.remove();
                        }
                    }
    				   //如果最后读取线程次数为0,那么阻塞
                    if (rh.count == 0)
                        return -1;
                }
            }
    		  //如果读锁重入次数达到上限,抛异常
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
    		  //尝试CAS读锁重入次数加1
            if (compareAndSetState(c, c + SHARED_UNIT)) {
    			   // 这里 CAS 成功,那么就意味着成功获取读锁了
                // 下面需要做的是设置 firstReader 或 cachedHoldCounter
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
    				  // 下面这几行,就是将 cachedHoldCounter 设置为当前线程
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
    			  // 返回大于 0 的数,代表获取到了读锁
                return 1;
            }
        }
    }
    
    

    这个方法主要是用来处理重入锁操作的。首先校验一下写锁是否被占用,如果没有被占用则判断当前线程是否是第一次读线程,如果不是则判断最后一次读线程是不是当前线程,如果不是则从readHolds获取,并判断HoldCounter实例中获取读锁次数如果为0,那么就不是重入。

    如果可以判断当前线程是重入的,那么则对state高16进行加1操作,操作成功,则对firstReader或cachedHoldCounter进行设置,并返回1,表示获取到锁。

    到这里我们看完了tryAcquireShared方法,我再把acquireShared方法贴出来:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    

    下面看doAcquireShared方法:

    private void doAcquireShared(int arg) {
    	  //实例化一个共享节点入队
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
    			  //获取当前节点的上一个前置节点
                final Node p = node.predecessor();
    			  //前置节点如果是头节点,那么代表队列里没有别的节点,先调用tryAcquireShared尝试获取锁
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
    					   //醒队列中其他共享节点
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
    					   //响应中断
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
    			  //设置前置节点waitStatus状态
                if (shouldParkAfterFailedAcquire(p, node) &&
    				  //阻塞当前线程
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    doAcquireShared方法中会实例化一个共享节点并入队。如果当前节点的前置节点是头节点,那么直接调用tryAcquireShared先获取一次锁,如果返回大于0,那么表示可以获取锁,调用setHeadAndPropagate唤醒队列中其他的线程;如果没有返回则会调用shouldParkAfterFailedAcquire方法将前置节点的waitStatus设值成SIGNAL,然后调用parkAndCheckInterrupt方法阻塞

    AQS#setHeadAndPropagate

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
    	  //把node节点设值为头节点
        setHead(node); 
    	  //因为是propagate大于零才进这个方法,所以这个必进这个if
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
    		  //获取node的下一个节点
            Node s = node.next;
    		  //判断下一个节点是否为空,或是共享节点
            if (s == null || s.isShared())
    			  //往下看
                doReleaseShared();
        }
    }
    

    这个方法主要是替换头节点为当前节点,然后调用doReleaseShared进行唤醒节点的操作

    AQS#doReleaseShared

    private void doReleaseShared() { 
        for (;;) {
            Node h = head;
            // 1. h == null: 说明阻塞队列为空
            // 2. h == tail: 说明头结点可能是刚刚初始化的头节点,
            //   或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了
            // 所以这两种情况不需要进行唤醒后继节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
    			   //后面的节点会把前置节点设置为Node.SIGNAL
                if (ws == Node.SIGNAL) {
    				  	//1
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
    					// 唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
    						//2
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
    		  //3 如果被唤醒的节点已经占领head了,那么继续循环,否则跳出循环
            if (h == head)                   // loop if head changed
                break;
        }
    }
    
    1. unparkSuccessor这里会唤醒下一个节点,那么下一个节点也会调用setHeadAndPropagate进行抢占头节点;如果同时有当前线程和被唤醒的下一个线程同时走到这里,那么只会有一个成功,另一个返回false的就不进行唤醒操作
    2. 这里CAS失败的原因可能是一个新的节点入队,然后将前置节点设值为了Node.SIGNAL,所以导致当前的CAS失败
    3. 如果被唤醒的节点抢占头节点成功,那么h == head 就不成立,那么会进行下一轮的循环,否则就是head没有被抢占成功

    AQS#unparkSuccessor

    private void unparkSuccessor(Node node) {
        //如果当前节点小于零,那么作为头节点要被清除一下状态
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待
        // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
    		  // 唤醒线程
            LockSupport.unpark(s.thread);
    }
    

    到这里加读锁的代码就讲解完毕了

    读锁释放

    //ReadLock
    public void unlock() {
        sync.releaseShared(1);
    }
    // Sync
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared(); 
            return true;
        }
        return false;
    }
    

    我们先看tryReleaseShared

    Sync#tryReleaseShared

    protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
    	  //如果当前是firstReader,那么需要进行重置或重入减一
        if (firstReader == current) {
            // assert firstReaderHoldCount > 0;
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;
        } else {
    		  // 判断 cachedHoldCounter 是否缓存的是当前线程,不是的话要到 ThreadLocal 中取
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();
            int count = rh.count;
            if (count <= 1) {
    			   // 这一步将 ThreadLocal remove 掉,防止内存泄漏。因为已经不再持有读锁了
                readHolds.remove();
    			   //unlock了几次的话会抛异常
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        for (;;) {
            int c = getState();
    		   // nextc 是 state 高 16 位减 1 后的值
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都空了
                // 此时这里返回 true 的话,其实是帮助唤醒后继节点中的获取写锁的线程
                return nextc == 0;
        }
    }
    

    这个读锁的释放,主要就是将 hold count 减 1,如果减到 0 的话,还要将 ThreadLocal 中的 remove 掉。然后是在 for 循环中将 state 的高 16 位减 1,如果发现读锁和写锁都释放光了,那么唤醒后继的获取写锁的线程,因为只有读锁是不会被阻塞的,所以等待的线程只可能是写锁的线程。

    写锁的获取

    //WriteLock
    public void lock() {
    	sync.acquire(1);
    }
    //sync
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    //AQS
    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
    	  //获取state的低16位
        int w = exclusiveCount(c);
    	  //不为零说明读锁或写锁被持有了
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            // 看下这里返回 false 的情况:
            //   c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有)
            //   c != 0 && w !=0 && current != getExclusiveOwnerThread(): 其他线程持有写锁
            //   也就是说,只要有读锁或写锁被占用,这次就不能获取到写锁
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
    		  // 这里不需要 CAS,仔细看就知道了,能到这里的,只可能是写锁重入,不然在上面的 if 就拦截了
            setState(c + acquires);
            return true;
        }
    	  //检查写锁是否需要block
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
    	  //走到这里说明写锁不需要block,并且CAS成功了
        setExclusiveOwnerThread(current);
        return true;
    }
    

    我们来看看writerShouldBlock

    //NonfairSync
    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }
    //FairSync
    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }
    

    如果是非公平模式,那么 lock 的时候就可以直接用 CAS 去抢锁,抢不到再排队

    如果是公平模式,那么如果阻塞队列有线程等待的话,就乖乖去排队

    写锁释放

    public void unlock() {
        sync.release(1);
    }
    
    //sync
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
    		  //如果独占锁释放"完全",唤醒后继节点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    //Sync
    protected final boolean tryRelease(int releases) {
    	  //检查一下持有所的线程是不是当前线程
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
    	  //将state减1
        int nextc = getState() - releases;
        //查看低16位是否为0
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
    		  //如果为0,那么说明写锁释放
            setExclusiveOwnerThread(null);
    	  //设置状态
        setState(nextc);
        return free;
    }
    
  • 相关阅读:
    模仿JavaAppArguments.java示例,编写一个程序,此程序从命令行接收多个数 字,求和之后输出结果,写出其的设计思想、程序流程图、源程序代码。
    大道至简第二章读后感
    大道至简第一章读后感
    md5实现
    jdk-动态代理
    使用反射复制对象
    java-二分查找法
    java-base64
    cxf框架使用(一)
    Freemarket学习笔记(一)
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/11973977.html
Copyright © 2011-2022 走看看