zoukankan      html  css  js  c++  java
  • 从源码来看ReentrantLock和ReentrantReadWriteLock

    上一篇花了点时间将同步器看了一下,心中对锁的概念更加明确了一点,知道我们所使用到的锁是怎么样获取同步状态的,我们也写了一个自定义同步组件Mutex,讲到了它其实就是一个简版的ReentrantLock,本篇文章我们就来看看ReentrantLock底层是怎么样的!

    目录结构:

    • ReentrantLock
    • 公平锁与非公平锁
    • ReentrantReadWriteLock

    ReentrantLock

    ReentrantLock我们叫做可重入锁,也就是我们可以重复进入的意思,也就是表示,一个线程可以对指定资源进行重复加锁,并且还能够选择公平锁和非公平锁。
    公平锁:先请求获取加锁的线程先被满足
    非公平锁:XXXX

    可重入锁可以对一个资源重复加锁,同时,在释放锁时也要进行多次release,所以不难想到,ReentrantLock只要维持一个值,用来控制这个资源加锁的次数就Ok了,初始化为零,当加锁时对这个值+1,release时-1。

    public class ReentrantLock implements Lock, java.io.Serializable{
    	.
    	.
    	.
    }
    

    这个是ReentrantLock的定义,上一节我就无耻的把它贴出来了→.→

    接下来我们来看看ReentrantLock是如何对资源进行加锁的!

    首先肯定要定义一个继承自AbstractQueuedSynchronizer的内部类:

    abstract static class Sync extends AbstractQueuedSynchronizer {
    	.
    	.
    	.
    
    }
    

    构造函数的话,因为它可以选择公平和非公平锁,所以是否公平就是由构造函数决定的:

     public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    FairSync()和NonfairSync()都是构造函数,当然都是内部类啦,他们可以分别创建公平锁和非公平锁。
    我自己粗略的看了一下源码,将ReentrantLock的大概结构画了一下:
    这里写图片描述

    刚才我们也说了,根据参数ReentrantLock会根据我们的需要创建对应的锁,当没有参数的时候我们来看看它默认的是什么?

     /**
         * Creates an instance of {@code ReentrantLock}.
         * 创建一个实例
         * This is equivalent to using {@code ReentrantLock(false)}.
         */
        public ReentrantLock() {
            sync = new NonfairSync();//sync是Reentrant内的Sync类型的成员变量
        }
    

    从代码中我们可以知道,当我们使用无参构造的时候ReentrantLock会为我们创建一个非公平锁(事实上,大大多数情况下非公平锁的效率会更好);
    那么我们先来看一下非公平锁是怎么获得同步状态的:

    //非公平锁是继承Sync是毋庸置疑的
      static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
    	        //这个是初次加锁的时候,判断同步状态是否被占用
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
               //当被占用后开始调用这个方法了
               //(当然后面的方法还会对这占用同步状态的线程是否是当前线程,
               //因为这个锁是独占的,仅仅允许一个线程多次加锁)
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    (一)
    上面的compareAndSetState我们已经不陌生了,上一篇文章我们在写叫Mutex的自定义同步组件的时候看到过,它的功能是:判断当前的state是否为0,如果为0那么获取锁,如果不为0,那么将state设置为1,:

    //这个方法是同步器提供的方法,参数是expect和update,它继续调用了unsafe内的方法(内部是通过CAS来实现原子操作),
    //将this(当前状态)和expect比较,如果相同返回true,如果不同则将state设置为update
     protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    (二)
    接着便是setExclusiveOwnerThread(Thread thread),这个方法比较陌生,我们继续跟源码,

    //这个方法是在AbstractOwnableSynchronizer内,从名字估计大家应该知道是干嘛的了,
    //我们看看这个类的注释
    /**
     * A synchronizer that may be exclusively owned by a thread.  This
     * class provides a basis for creating locks and related synchronizers
     * that may entail a notion of ownership.  The
     * {@code AbstractOwnableSynchronizer} class itself does not manage or
     * use this information. However, subclasses and tools may use
     * appropriately maintained values to help control and monitor access
     * and provide diagnostics.
     *博主英语不太好,勉强知道意思是:这个类是一个线程独占的同步器,
     *这个类提供创建锁和相关同步器的基础(伴随着所有权的概念),
     *这个类本身不控制和使用信息,子类和和工具可以适当使用维持的值去帮助控制和监视访问,提  * 供诊断(额,好几个单词不会0.0)
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    

    所以我们可以看出这个是干嘛的了:

    //同步器继承自AbstractOwnableSynchronizer,Sync继承自同步器,
    //NonfairSync 继承自Sync,所以这下该该明白了!
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        }
    

    这里写图片描述

    NonfairSync 间接继承AbstractOwnableSynchronized,所以他可以调用它使用的方法,将exclusiveOwnerThread 变量设置为当前线程

    //这个变量是AbstractOwnableSynchronized的成员变量
    private transient Thread exclusiveOwnerThread;
    

    所以为我们知道这个设置是用来帮助控制与监视用的!


    (三)
    接下来我们继续向下看,如果无法第一次无法获取同步状态,调用acquire方法,之前我们也见过这个方法,这个方法尝试获取同步状态,否则将当前线程组装成一个节点放入同步队列中,但是在这里,ReentrantLock对它进行了重写:

     public final void acquire(int arg) {
    		 //其他的都没变,我们继续跟tryAcquire
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    这里的tryAcquire不再是 throw new UnsupportedOperationException();
    而是:

     protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    

    所以最终调用nonfairTryAcquire,至此我们真正的主角才上场!(鼓掌!!)

       final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                //再次判断当前状态
                if (c == 0) {
                //下面这行代码我就不用说了吧(参考上面的lock方法)
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果当前的线程为我们之前存的线程(就是前面已经获取同步状态的线程)
                else if (current == getExclusiveOwnerThread()) {
                    //c为当前的状态,acquires为参数为1
                    //当前状态最小值为0,表示当前无线程获取同步状态
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    //将当前状态设置为nextc,也就是在原来c的基础上加1了
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    以上就是非公平锁的加锁方法,如果彻底明白了加锁的方法,那么release方法也不成问题了,接着看释放方法:


    我们都知道ReentrantLock是同过unlock来释放锁的:

    public void unlock() {
            sync.release(1);
        }
    

    继续往下面跟release方法(记得参数是1):

    //tryRelease方法就是不同的地方下面我们说
     public final boolean release(int arg) {
     //这个tryRelease方法被我们的ReentrantLock重写过,
     //不再是抛出UnsupportedOperationException错误了
     //下面我们详解tryRelease
            if (tryRelease(arg)) {
    	        //获取要释放的节点
                Node h = head;
                //如果节点不为空,且waitStatus不为0(0为初始状态)
                if (h != null && h.waitStatus != 0)
                    //这个方法使用LockSupport来唤醒下一个节点
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    如果tryRelease释放成功(即不同步状态未0),那么获取要释放的节点,并判断当前节点是否存在,这个等待状态不能为0,只有这样才能进行下一步的唤醒操作!
    记得在上一篇文章中,我就提到了release方法,但是没有详细说明,只是说unparkSuccessor方法是用来唤醒下一个节点的,这次来看看unparkSuccessor方法:

     private void unparkSuccessor(Node node) {
    		//获取当前节点的waitStatus
            int ws = node.waitStatus;
            //如果ws小于零即表示当前节点满足可以通知下一个节点
            if (ws < 0)
    	        //CAS操作,将waitStatus设置为0(node的waitStatus一定是相等的)
                compareAndSetWaitStatus(node, ws, 0);
                //获取释放节点的下一个节点
            Node s = node.next;
            //如果s节点为null或者waitStatus > 0(即不是初始状态)
            //那么s是空的
            if (s == null || s.waitStatus > 0) {
                s = null;
                //以下为同步队列的变动
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            //如果s不为空,那么使用LockSupport来唤醒s
            //LockSupport是用来唤醒阻塞中断的线程的,后面一章我们详细来讲解
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    以上就完成了锁的释放和唤醒下一个节点,貌似我们少说了什么,对就是tryRelease

    //boolean类型,如果释放成功则返回true,否则返回false
       protected final boolean tryRelease(int releases) {
                //我们之前说过了这个参数为1
                //获取当前同步状态并减一
                int c = getState() - releases;
                //判断当前线程是否为独占线程,如果不是抛出错误
                //(错误释放错误,都不是你加锁的,你来凑什么热闹)
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                //如果c为0,即getState为1,那么free为true,同步状态未空闲
                if (c == 0) {
                    free = true;
                    //将独占线程成员变量设置为空
                    setExclusiveOwnerThread(null);
                }
                //将同步状态设置为0
                setState(c);
                这个时候返回free
                return free;
            }
    

    以上便是ReentrantLock的解锁代码,因为是可重入的,所以当同步状态部位0的时候(大于零),我们可以多次调用unlock方法来调用释放同步状态!

    以上就是非公平锁的的基本操作,接下来看看非公平锁是怎么样的:

    static final class FairSync extends Sync {
            private static final long serialVersionUID = -3000897897090466540L;
    
            final void lock() {
                acquire(1);
            }
    
            /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
    	        //获得当前线程
                final Thread current = Thread.currentThread();
                //获得同步状态
                int c = getState();
                //如果同步状态为0(同步状态空闲)
                if (c == 0) {
    	            //如果空闲,那么判断当前线程是否有前驱(意思是让当前线程不能插队)
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    非公平锁里面同样头lock方法并重写了tryAcquire方法,lock方法里面调用acquire方法,acquire方法和之前的一样:

     public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    所以唯一不同的还是这个tryAcquire方法:这个方法和非公平锁不同在哪里?就是这个方法里多了一个判断hasQueuedPredecessors,这个方法判断同步队列中是否有前驱节点,如果这个方法返回true,表示有前驱节点,有线程比当前线程更早的获取锁,所以要前驱线程获取锁后释放才能继续获取锁,其他的代码都和上面的相同,我们不必纠结!


    ReentrantReadWriteLock

    在前面我写了一篇文章是关于读写锁的应用,主要的内容是:读读共享,读写互斥,写写互斥,读写锁维护了两把锁,一把读锁和一把写锁,通过分离读锁和写锁来提高并发性,因为在大多数并发情况下都是读数据,所以这样可以提升并发处理的效率。

    我们先看下简单的结构图:
    这里写图片描述

    //ReentrantReadWriteLock 实现ReadWriteLock接口
    public class ReentrantReadWriteLock
            implements ReadWriteLock, java.io.Serializable {
    	.
    	.
    	.
    }
    

    ReadWriteLock的接口其实很简单,只是规范了两个方法:

    public interface ReadWriteLock {
        /**
         * Returns the lock used for reading.
         */
        Lock readLock();
    	 /**
         * Returns the lock used for writing.
         */
        Lock readLock();
    }
    

    看看ReentrantReadWriteLock有哪些字段:

    //这个是读锁ReadLock,这个类是ReentrantReadWriteLock的内部类
    private final ReentrantReadWriteLock.ReadLock readerLock;
    
    这个是读锁WriteLock,这个类是ReentrantReadWriteLock的内部类
    private final ReentrantReadWriteLock.WriteLock writerLock;
    
    final Sync sync;
    

    构造方法:

    //这个构造函数通过this调用下面的这个构造函数
     public ReentrantReadWriteLock() {
            this(false);
        }
    
    public ReentrantReadWriteLock(boolean fair) {
    		//读写锁同样有公平锁和非公平锁
    		//通过无参构造和当前的构造方法我们可以看出默认的是new一个非公平锁
            sync = fair ? new FairSync() : new NonfairSync();
            readerLock = new ReadLock(this);
            writerLock = new WriteLock(this);
        }
    

    实现接口的方法:

    //分别返回读锁和写锁的方法
        public ReentrantReadWriteLock.WriteLock writeLock()  {
         return writerLock;
     }
        public ReentrantReadWriteLock.ReadLock  readLock()  { 
    	return readerLock; 
    }
    

    我们上面看到了final类型的 sync,当然毋庸置疑,ReentrantReadWriteLock里面也有继承同步器的内部类:

    //这里的内部类是抽象类型的哟
     abstract static class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 6317671515068378041L;
    
    		//下面定义了四个常亮
    		//这个常亮表示共享位移为16
            static final int SHARED_SHIFT   = 16;
            //共享单元
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            //最大数目
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            //独占掩码
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    		//共享数
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    	    //独占数
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
    
            
            static final class HoldCounter {
                int count = 0;
                // Use id, not reference, to avoid garbage retention
                final long tid = getThreadId(Thread.currentThread());
            }
    
    	    //为当前类的内部类,继承ThreadLocal,以调用线程为key,
    	    //HoldCounter为value的对象
            static final class ThreadLocalHoldCounter
                extends ThreadLocal<HoldCounter> {
                public HoldCounter initialValue() {
                    return new HoldCounter();
                }
            }
    
            //一系列的字段
            
            //当前线程读锁的持有量
            private transient ThreadLocalHoldCounter readHolds;
    		
    		//最后一个线程成功获取读锁的持有量
            private transient HoldCounter cachedHoldCounter;
    		
    		//第一个获取读锁的线程
            private transient Thread firstReader = null;
            
            //第一个获取读锁线程的锁持有量
            private transient int firstReaderHoldCount;
    		
    		//无参构造
            Sync() {
                readHolds = new ThreadLocalHoldCounter();
                setState(getState()); // ensures visibility of readHolds
            }
    
           
    
            abstract boolean readerShouldBlock();
    
            abstract boolean writerShouldBlock();
    
            
    		//这个和ReentrantLock中的相同(因为继承了同步器,所以这些方法方法要实现)
    		//以下如果我们看见和之前和同步器内的方法名相同,那么你不用怀疑,它就是一样的
            protected final boolean tryRelease(int releases) {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                int nextc = getState() - releases;
                boolean free = exclusiveCount(nextc) == 0;
                if (free)
                    setExclusiveOwnerThread(null);
                setState(nextc);
                return free;
            }
    		
            protected final boolean tryAcquire(int acquires) {
              
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // (Note: if c != 0 and w == 0 then shared count != 0)
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    		//这个是读写锁特有的方法:释放共享锁(继承同步器后自定义的方法)
            protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;
                }
            }
    
            private IllegalMonitorStateException unmatchedUnlockException() {
                return new IllegalMonitorStateException(
                    "attempt to unlock read lock, not locked by current thread");
            }
    		//这个是读写锁特有的方法:获得共享锁(继承同步器后自定义的方法)
            protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return -1;
                int r = sharedCount(c);
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                return fullTryAcquireShared(current);
            }
    
            
            .
            .
            .
            //这里省略了一些代码
    

    上面我代码做了简单的注释,也pass了一些方法,为了代码的完整性所以将Sync所有的的代码都贴了出来,整体上看着有点乱,我们理一下,首先ReentrantReadWriteLock和ReentrantLock结构基本相同,都有公平锁和非公平锁,实现方式是一样的,不同的地方是读写锁内部维护了两把锁,读锁和写锁,ReentrantLock同步状态表示被同一个线程获取的次数,ReentrantReadWriteLock同样需要有一个同步状态值来表示当前锁(读写锁),被同一个线程获取的次数,在读写锁中如果在一个整型变量中维护多种状态,就需要“按位切割使用”这个变量,它的高16位用来记录读状态,低16位用来记录写状态:
    这里写图片描述

    我们来看看具体是如何操作的:
    写锁:
    当重进入时候仅需加1,当释放的时候减1,获取当前状态的时候进行与(&)操作(0000000000000000 1111111111111111)将高的16位置为0即可获得当前的写状态。

    读锁:
    当重进入的时候仅需加1<<16,释放的时候减去1>>>16(无符号补零右移16位),获取当前状态时整体右移16位(左边补零)。

    写所得获取与释放,在读写锁中,同样是重写了同步器的tryAcquire方法,和ReentrantLock不同的是在获取之前需要判定一下读锁是否存在,如果读锁存在那么获取失败!写锁的释放操作和ReentrantLock基本一致,无其他特别。

    读锁的获取与释放,在获取读锁的时候判断当前读锁容量是否充足(因为存储16位,所以这个读锁会有一个最大值),如果充足还要判断当前状态是否大于零,如果大于零,那么无非三种情况,
    ①读锁状态位为0,写锁状态位不为→当前有写锁占用读锁进入等待(01)
    ②读锁状态位不为0,写锁状态位0→当前读锁占用读锁可以获得锁(10)
    ③读状态位和写状态位都为0→读锁可以获取(00)
    以上的线程安全靠CAS进行保证!

    读锁的每次释放是线程安全的,每次状态位减1<<16.

    读写锁还有一个特性就是锁降级,指的是将写锁降级为读锁,是指当前线程获取的是写锁,先获取读锁然后释放写锁,保留读锁。

    本章到此结束!还有很多分析不到的地方,望指正,不胜感激!
    2018 3.29 10:34

  • 相关阅读:
    json
    网页版 treeview使用中遇到的问题
    随机获取一条数据
    oracle
    发送邮件
    DataGrid列的合并
    python简介
    SQLSERVER
    Maven 基础
    Maven 构建jar包
  • 原文地址:https://www.cnblogs.com/MindMrWang/p/8668388.html
Copyright © 2011-2022 走看看