zoukankan      html  css  js  c++  java
  • ReentrantLock 源码分析

    ReentrantLock是一把重入锁,可中断,可以限时,支持公平锁和非公平锁。

    下面举一个生活中的例子,帮助大家来更好的理解ReentrantLock这些特性。

    火车购票。买票人争先恐后的跑到G1020检票口检票回家,谁先跑到检票口,谁先验票,谁先回家。那些跑的慢没有抢到检票权的,一个一个的在后面排成一队,先到的排在前面。在最前面获得检票权的人通过检票机验票走人,检票机通知下面一个人来验票。这样每次最前面的人验票走人,然后依次下一个。这样一来大家都井然有序的回家,心里面也比较平衡,谁让自己腿短跑的慢呢,排在跑的快的人后面也心安理得。这就是公平锁的基本思路。

    在看G1028检票口的人,这批人就比较聪明了,看着前面一眼望不到头的长长的队伍。把自己的行李箱放在队伍中代替自己,自己则在旁边的椅子上歇着。没办法谁让做这趟车的人聪明呢,这下可把检票机给累坏了,每次检查到行李箱到就大声喊,黑色行李箱的来安检了,然后听到的人慢悠悠的在过去验票走人。后来智能的检票机通过智能学习也变聪明了,如果在检查到放行李箱占位置的,如果此时刚好有人过来检票,则直接让此人检票,不需要在队伍尾部排队等候了。这样虽然对抢到前面的人不公平,但是却加快了进站效率。这就是非公平锁的基本思路.

    现在检票的规则变了以家庭为单位检票,只要是家庭中的一员抢到检票口,其余人就可以跟着过检票口。由于小明跑的快,先跑到了检票口。他的父母年龄比较大,跑的比较慢。等跑到检票口的时候,后面已经排起来了长队。可是人家有一个跑的快的儿子呀,现在的规则又是以家庭为单位,都是一家人,于是小明的父母也可以直接检票走人,不需要去队伍里面排队。这就是可重入锁,同一个线程可以重复拿锁。

    快看G1028检票口的人吵起来了,怎么回事呢。原来是秃头和长毛两个人刚才跑的快撞在了一起,身份ID都掉在了地上,2个人匆忙捡起来就跑到了检票口。秃头跑的快一点,跑到了长毛的前面。这时候秃头拿身份id验证的时候发现身份不对,上面写的是产品经理。后面那个人拿的是秃头的身份ID,上面写的程序员。于是秃头说,我把你的身份ID给你,你把我的身份ID给我。但是长毛平时提需求提的习惯了,对秃头说给你可以,但是让我排到你前面。秃头一听这无脑需求,火冒三丈,就和长毛干了起来。这下好了,长毛不给秃头身份ID,秃头也不给长毛身份ID,两个人就互相僵持着。这导致后面排队的人也没发进站了。这时候长毛手机发生了异常,一个电话打了过来,原来老板让长毛回去改需求,没办法最后中断了2人的争执。长毛灰溜溜的走了,秃头打赢了这场仗,脸上露出了阳光般的笑容。这就是可中断的,当2个线程互相占有锁,不释放导致死锁的时候,ReentrantLock可以用锁中断解决。

    在看另一边一个人想要插队,在耐心的说服前面的人,让我先进去吧,让我先进去吧。可是说服了几分钟也没说服成功,于是放弃了不在插队抢先检票了。这个就是在一定时间内锁尝试,尝试着去获取锁,如果没有获取到就结束。

    以上就是重入锁,锁中断,锁限时,公平锁和非公平锁的大致概念,相信大家应该会有所理解。下面对ReentrantLock的特性进行详细的讲解。

    ReentrantLock非公平锁




    ReentrantLock默认实现的是非公平锁,我具体看一下代码:

     ReentrantLock reentrantLock=new ReentrantLock();
     reentrantLock.lock();

    我们看一下构造函数

     public ReentrantLock() {
        sync = new NonfairSync();
    }

    从这里可以看出ReentrantLock默认实现的是非公平锁,我们在看一下非公平锁是怎么具体实现的。

    lock是一个接口,构造器默认实现的是NonfairSync,所以reentrantLock.lock() 调用的是NonfairSync的lock接口。

    看一下这块的具体代码

     static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    NonfairSync 继承了Sync,Sync继承了AbstractQueuedSynchronizer,到这里大家应该已经明白ReentrantLock是基于AQS实现的,所以只要你搞懂AQS,很多并发类你都会很容易的理解。

    reentrantLock.lock(),我们看一下lock方法做了哪些操作.首先通过CAS获取锁,如果获取到锁,把当前线程设置为独占线程。如果获取失败,则调用acquire方法,而此方法为AQS内部方法。

     public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    这里有一个知识点,我们看一下NonfairSync类中tryAcquire方法,此方法也是AQS中的方法,也就是子类NonfairSync重写了父类AQS的tryAcquire方法。

    当子类NonfairSync调用acquire方法的时候,执行的是AQS提供的acquire方法,然后从上面代码中可以看出来父类AQS在此方法中执行了tryAcquire方法,而tryAcquire方法在子类中已经重写,那么就会执行子类NonfairSync实现的tryAcquire方法。这就是多态。

    这也是AQS的好处,对外提供API,子类继承AQS,按照自己的业务逻辑重写提供的API。而AQS只管线程怎么进行入队列,怎么插入节点,怎么唤醒节点这些底层的方法,对外层提供调用的 API,然后子类只需要继承AQS,实现独有的业务方法即可,从而大大降低了耦合度。

    我们在看一下nonfairTryAcquire方法做了哪写操作

      final boolean nonfairTryAcquire(int acquires) {
          final Thread current = Thread.currentThread();//获取当前线程
          int c = getState();//获取当前线程的状态
          if (c == 0) {//如果当前线程处于初始状态
              if (compareAndSetState(0, acquires)) {//cas竞争锁
                  setExclusiveOwnerThread(current);//竞争到锁把当前线程设置为独占线程
                  return true;
              }
          }
          else if (current == getExclusiveOwnerThread()) {//如果当前线程是独占线程,注意此方法也是实现重入的地方。
              int nextc = c + acquires;//当前线程状态值+传入的值
              if (nextc < 0) // overflow
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);//设置最新的状态值
              return true;
          }
          return false;
      }

    首先取到当前线程和当前线程的值,如果当前线程是初始状态那么就去竞争锁,如果竞争到锁,把当前线程设置为独占线程。如果进来的线程是独占线程,那么更新此线程进入的次数,同时也可以获取锁。如果没有竞争到锁,也不是当前的独占线程,那么就返回false。

    从此方法中可以看出来,只要有线程进来,就让他获取锁,而不是排队到尾部。只要是独占线程,就可以重复进来.正是通过此方法可以看出来ReentrantLock是可以进行重入的也是可以是实现非公平锁的。

    ReentrantLock公平锁



    我们在看一下ReentrantLock实现的公平锁的源代码

     ReentrantLock reentrantLock=new ReentrantLock(true);
     reentrantLock.lock();

    我们点击构造器看一下

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

    下面具体看一下FairSync类的源代码

    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
              //没有线程在等队列里面等待同时获取到锁,则设置当前线程为独占线程
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    FairSync也是继承了Sync,也就是也是继承了AQS,调用lock方法,lock方法调用了父类acquire,此方法会调用子类重写的tryAcquire方法。它的实现方式和上面讲的非公平锁实现方式大致一样,业务逻辑都是在重写的tryAcquire里面。

    我们看一下hasQueuedPredecessors

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

    此方法主要是查看是否有线程在等待队列里面等待。

    FairSync类tryAcquire方法业务逻辑就是获取到当前线程和当前线程的状态,如果当前线程是初始状态,会去判断当前队列里面是否有等待的线程,如果队列中没有等待的线程,同时获取到锁,那么就把当前线程设置为独占线程。如果是相同的独占线程进来则,则更新独占线程进来的次数。同时返回true,否则返回false。从这里可以很容易的看出来,这是一个公平锁,进来的线程需要排队,队列中没有了线程才能轮到进来的线程。同时在else if 这个条件中可以看出来也是可重入的。

    ReentrantLock锁中断




    我们看一下可中断锁的源代码

    ReentrantLock reentrantLock=new ReentrantLock();
    reentrantLock.lockInterruptibly();
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);//调用AQS中方法
    }
    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//线程中断抛出异常
            throw new InterruptedException();
        if (!tryAcquire(arg))//此处还是调用创建对象子类的方法,获取不到锁执行下面的方法
            doAcquireInterruptibly(arg);
    }
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);//封装线程节点,并且添加到尾部。前面文章已经详细讲解过,此处不在详细展开。
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();//获取当前节点的前驱节点
                if (p == head && tryAcquire(arg)) {//前驱节点是头节点并且获取到锁
                    setHead(node);//设置当前节点为头节点
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//线程如果是阻塞的并且被中断,则直接抛出异常
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);//如果线程抛出了异常,那么就把线程状态设置为取消状态同时清除节点.
        }
    }

    此处讲解一下,为什么ReentrantLock可以进行锁中断,为什么可以在产生死锁的时候,可以通过锁中断技术解决死锁。看过源码其实已经明白,首先在当前线程如果调用了interrupted,那么直接抛出异常出来。如果线程被阻塞并且被中断了那么也是抛出异常。也就是他是通过线程调用中断方法抛出异常来打破持有锁的。如果前面的文章看过,你会发在AQS中doAcquireInterruptibly方法和acquireQueued方法很相似,区别就是一个是返回boolean类型的值,让上层做判断,一个是在返回boolean类型值的地方直接抛出了异常。

    ReentrantLock锁限时



    我们看一下可中断锁的源代码

    ReentrantLock reentrantLock=new ReentrantLock();
    reentrantLock.tryLock(300, TimeUnit.SECONDS);//300秒内持续获取锁,直到获取到锁或者时间截止
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())//此处可以看出来支持锁中断
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);//首先获取一次锁,如果没有获取到执行独占计时模式
    }
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)//时间小于0直接返回
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;//队列延迟时间为系统时间+设置的超时时间
        final Node node = addWaiter(Node.EXCLUSIVE);//把当前线程封装为Node并添加到队列
        boolean failed = true;
        try {
            for (;;) {//自旋
                final Node p = node.predecessor();//获取当前节点的前驱节点
                if (p == head && tryAcquire(arg)) {//如果前驱节点是头节点并且获取到锁
                    setHead(node);//设置当前节点为头节点
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();//超时间为延迟时间-当前系统的时间
                if (nanosTimeout <= 0L)//表示已经超过设置的尝试时间,直接返回
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)//如果当前线程阻塞,并且超时时间大于1000纳秒
                    LockSupport.parkNanos(this, nanosTimeout);//阻塞当前线程并在超时时间内返回
                if (Thread.interrupted())//线程中断
                    throw new InterruptedException();//抛出异常
            }
        } finally {
            if (failed)//线程发生异常
                cancelAcquire(node);//把当前线程设置为取消状态并清除该节点
        }
    }

    通过阅读源码我们发现锁限时获取的步骤

    1 首先调用tryAcquire方法获取一次锁,如果没有获取到调用AQS中的doAcquireNanos。

    2 System.nanoTime() 获取系统纳秒级时间+传递的延时时间为最后的时间。

    3 调用addWaiter方法把当前线程封装为节点并添加到队列的尾部。

    4 前驱节点是头节点并且获取到锁设置当前节点为头节点

    5 如果当前线程被阻塞了并且超时时间大于1000纳秒,调用LockSupport.parkNanos方法阻塞当前线程并且在规定的超时间内返回

    6 如果线程中断,直接抛出异常,这里可以看出支持锁中断

    7 如果线程在自旋的过程中发生了异常,那么调用cancelAcquire方法把当前线程设置为取消状态并且清除该节点。

    其实此方法和上一篇讲解的独占锁模式调用acquireQueued方法差不多。不同点在于这里增加了超时时间,如果超时时间大于spinForTimeoutThreshold,此值是一个常量为1000的值。也就是如果超时时间大于1000纳秒,那么就调用 LockSupport.parkNanos方法让该线程阻塞,最长阻塞的时间不会超过超时的时间。同时增加了线程中断的判断,发生线程中断则抛出异常,其余和acquireQueued实现都一样。

  • 相关阅读:
    LeetCode 230. 二叉搜索树中第K小的元素(Kth Smallest Element in a BST)
    LeetCode 216. 组合总和 III(Combination Sum III)
    LeetCode 179. 最大数(Largest Number)
    LeetCode 199. 二叉树的右视图(Binary Tree Right Side View)
    LeetCode 114. 二叉树展开为链表(Flatten Binary Tree to Linked List)
    LeetCode 106. 从中序与后序遍历序列构造二叉树(Construct Binary Tree from Inorder and Postorder Traversal)
    指针变量、普通变量、内存和地址的全面对比
    MiZ702学习笔记8——让MiZ702变身PC的方法
    你可能不知道的,定义,声明,初始化
    原创zynq文章整理(MiZ702教程+例程)
  • 原文地址:https://www.cnblogs.com/fengyun2050/p/12384569.html
Copyright © 2011-2022 走看看