zoukankan      html  css  js  c++  java
  • 重入锁 ReentrantLock

     
    java中常用的可重入锁:synchronized,java.util.concurrent.locks.ReentrantLock
      jdk中独占锁的实现除了使用关键字synchronized外,还可以使用ReentrantLock。虽然在性能上ReentrantLock和synchronized没有什么区别,但在实现上却有很大差距,且ReentrantLock相比synchronized而言功能更加丰富,使用起来更为灵活,也更适合复杂的并发场景。

    一,ReentrantLock和synchronized的异同点

    • 1.ReentrantLock和synchronized都是独占锁,只允许线程互斥的访问临界区。但是实现上两者不同:synchronized加锁解锁的过程是隐式的,用户不用手动操作,优点是操作简单,但显得不够灵活。一般并发场景使用synchronized的就够了;ReentrantLock需要手动加锁和解锁,且解锁的操作尽量要放在finally代码块中,保证线程正确释放锁。ReentrantLock操作较为复杂,但是因为可以手动控制加锁和解锁过程,在复杂的并发场景中能派上用场。

    • 2.ReentrantLock和synchronized都是可重入的。synchronized因为可重入因此可以放在被递归执行的方法上,且不用担心线程最后能否正确释放锁;而ReentrantLock在重入时要却确保重复获取锁的次数必须和重复释放锁的次数一样,否则可能导致其他线程无法获得该锁。

    • synchronized是基于JVM层面实现的,而Lock是基于JDK层面实现的。曾经反复的找过synchronized的实现,可惜最终无果。但Lock却是基于JDK实现的,我们可以通过阅读JDK的源码来理解Lock的实现。
    二,重入锁ReentrantLock的理解
      重入锁ReentrantLock,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁,synchronized关键字也隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁,而不会说下一次获取锁时出现阻塞自己的情况。
      除此之外,该ReentrantLock锁的还支持获取锁时的公平和非公平性选择。那公平性和非公性是如何体现出来的呢?如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。

      在底层分析中我们可知ReentrantLock实现Lock接口,在ReentrantLock中引用了AbstractQueuedSynchronizer的子类,所有的同步操作都是依靠AbstractQueuedSynchronizer(队列同步器)实现。

      下面是Reetrantlock锁的方法调用图:

      

      可以结合上图对底下代码做分析: 

    public class ReentrantLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = 7373984872572414699L;
        private final Sync sync;
    //AbstractQueuedSynchronizer 是一个抽象类,所以在使用这个同步器的时候,需要通过自己实现预期的逻辑,Sync、FairSync和NonfairSync都是ReentrantLock为了实现自己的需求而实现的内部类,
    //之所以做成内部类,我认为是只在ReentrantLock使用上述几个类,在外部没有使用到。
     abstract static class Sync extends AbstractQueuedSynchronizer { //静态内部类Sync
            private static final long serialVersionUID = -5179523762034025860L;
            abstract void lock();//由于子类锁有公平和非公平之分,所以此方法需待子类来重写
            final boolean nonfairTryAcquire(int acquires) {//父类中只实现了非公平性的获取锁
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
         ………………………………略
        }
      ***************************************************************************
      static final class NonfairSync extends Sync {//静态内部类NonfairSync非公平锁
            private static final long serialVersionUID = 7316153563782823691L;
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);//直接调用父类的方法
            }
        }
    *********************************************************************
      static final class FairSync extends Sync {//静态内部类FairSync公平锁
            private static final long serialVersionUID = -3000897897090466540L;
            final void lock() {
                acquire(1);
            }
            protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    ******************************************************************************************
        public ReentrantLock() {//调用默认的构造函数返回来的是非公平锁
            sync = new NonfairSync();
        }
        public ReentrantLock(boolean fair) {//掉用带参的构造函数时,参数为true返回公平锁,参数为false返回非公平锁
            sync = fair ? new FairSync() : new NonfairSync();
        }
    //动态调用,由于在方法体内调用的是父类的抽象方法,但子类已重写了此方法,所以又动态的调用子类重写后的方法
    public void lock() { sync.lock(); } public boolean tryLock() { return sync.nonfairTryAcquire(1); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } }
    //补充方法
    //从上面的代码中,我们可以观察到不论是公平还是非公平锁,在其方法体内最终都会调用acquire方法,该方法是继承的AQS机制的。
    public
    final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
    
    
    三,ReentrantLock锁的特性
    1.实现重进入
      重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,那要实现重进入需要注意什么问题呢?
    1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则允许线程再次成功获取锁。
    2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。这就要求锁对于线程获取时要进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
      
    1)线程再次获取锁:
      下面代码就是RL锁体现出重入性的地方:synchronized本身支持锁的重入,而ReentrantLock则是通过此处实现。在锁状态为0时,表示锁空闲,重新尝试获取锁。如果已经被占用,那么判断当前线程是否为占用锁的线程,如果是那么进行计数,当然在锁的relase过程中会进行递减,保证锁的正常释放。
     1 final boolean nonfairTryAcquire(int acquires) //非公平获取
     2 {   final Thread current = Thread.currentThread(); //获取当前线程
     3    int c = getState(); //获取state值,state=0表示锁当前是空闲状态,state>0代表当前有state个线程在等待
     4    if (c == 0) { //表示锁空闲
     5       if (compareAndSetState(0, acquires)) {//如果当前线程获取锁成功
     6            setExclusiveOwnerThread(current); //则将当前线程定义成锁的占用者
     7            return true; 
     8       } 
     9    } else if (current == getExclusiveOwnerThread()) { //判断当前请求的线程是否就是持锁的线程(保证重入性)
    10       int nextc = c + acquires; //累加
    11       if (nextc < 0) throw new Error("Maximum lock count exceeded"); 
    12       setState(nextc);//更新state值
    13        return true; 
    14    }
    15    return false; 
    16 }

      该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。如果没有重新获取到锁或者锁的占用线程和当前线程不是一个线程,方法返回false。那么这时候通过看acquire方法可知,需把获取锁失败的线程添加到同步队列中,调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,那接下来我们先看看addWaiter()方法:

    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);//将当前线程创建成一个新结点
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {//true表示同步队列已经被初始化过了,此时只需将当前结点的线程添加到同步队列中
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);//false表示同步队列未被初始化,此时只需先初始化队列
         return node; 
    }

       在enq方法中主要做两个动作,一个是创建一个虚拟结点,用来表示当前正占用锁的那个线程的结点(因为之前这个线程直接获取锁成功,所以未进过队列,因此也没有机会new结点),第二个是将(在addWaiter中创建,通过参数传过来)当前线程创建的结点在此方法中连接在队列的末尾,这也就是在Node方法中存在一个死循环的目的:执行第二个动作。如果想详细了解该方法的执行过程可以看下这篇博客:https://blog.csdn.net/java_lyvee/article/details/98966684

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

      接下来就是acquireQueued()方法,在该方法中先是判断当前结点的前一个结点是不是头结点,如果是头结点,在次尝试获取下锁(万一在你new结点的过程中,前一个节点刚好执行完释放锁了呢),如果成功,那皆大欢喜,获取锁并将当前线程的结点设置为头结点,因为头结点一般都存的都是当前正占用锁的线程。

      此处是做Node节点线程的自旋过程,自旋过程主要检查当前节点是不是head节点的next节点,如果是,则尝试获取锁,如果获取成功,那么释放当前节点,同时返回。如果不是,则一直循环做自旋过程。
      如果这里一直不断的循环检查,其实是很耗费性能的,JDK的实现肯定不会这么“弱智”,所以有了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,这两个方法就实现了线程的等待从而避免无限的轮询:

     final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
      
      
     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;//前一个结点的
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }

       首先,检查一下当前Node的前一个节点pred是否是SIGNAL,如果是SIGNAL,那么证明前置Node的线程已经Park了。(参考上图)如果ws>0,那么表示当前节点已经等待超时或者中断,那么需要不断调整当前节点的前置节点,将已经Concel的和已经中断的线程移除队列。如果waitStatus<0,那么设置前一个结点的waitStatus为SIGNAL,因为调用shouldParkAfterFailedAcquire的方法为死循环调用,所以终将返回true。接下来看parkAndCheckInterrupt方法,当shouldParkAfterFailedAcquire返回True的时候执行parkAndCheckInterrupt方法:

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
      此方法比较简单,其实就是使当前的线程park,即暂停了线程的轮询。
    2)锁的最终释放。
      通过上述代码可知,成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时,要先减少同步状态值,当state值为0时,才说明当前锁被成功释放,该方法的代码如下。
     1 protected final boolean tryRelease(int releases) { 
     2   int c = getState() - releases; //计算出state的新值
     3   if (Thread.currentThread() != getExclusiveOwnerThread())//判断当前线程是否为持锁的线程
     4     throw new IllegalMonitorStateException(); //如果不是,就抛异常
     5   boolean free = false; 
     6   if (c == 0) { //当state的值为0,说明当前在无线程占用锁,即锁可以被释放掉
     7     free = true; //修改free值
     8     setExclusiveOwnerThread(null); 
     9   }
    10   setState(c); //更新state值
    11   return free; //返回true表示锁被释放,返回false表示还有线程占用着锁
    12 }
      如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
    2.公平与非公平获取
      公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。
      在创建ReentrantLock的时候通过传进参数true创建公平锁,如果传入的是false或没传参数则创建的是非公平锁。ReentrantLock lock = new ReentrantLock(true);
       再次拿来对比一下:
     1 final boolean nonfairTryAcquire(int acquires) //非公平获取
     2 {   final Thread current = Thread.currentThread(); //获取当前线程
     3    int c = getState(); //获取state值,state=0表示锁当前是空闲状态,state>0代表当前有state个线程在等待
     4    if (c == 0) { //表示锁空闲
     5       if (compareAndSetState(0, acquires)) {//如果当前线程获取锁成功
     6            setExclusiveOwnerThread(current); //则将当前线程定义成锁的占用者
     7            return true; 
     8       } 
     9    } else if (current == getExclusiveOwnerThread()) { //判断当前请求的线程是否就是持锁的线程(保证重入性)
    10       int nextc = c + acquires; //累加
    11       if (nextc < 0) throw new Error("Maximum lock count exceeded"); 
    12       setState(nextc);//更新state值
    13        return true; 
    14    }
    15    return false; 
    16 }
       通过前面介绍的nonfairTryAcquire(int acquires)方法可知,对于非公平锁,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不同。
     1 protected final boolean tryAcquire(int acquires) { //公平获取
     2   final Thread current = Thread.currentThread(); //获取当前线程
     3   int c = getState(); //计算新的state值
     4   if (c == 0) { //当前锁是空闲状态
     5       if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { 
     6           setExclusiveOwnerThread(current);
     7           return true; 
     8       } 
     9   } else if (current == getExclusiveOwnerThread()) { //当前线程是持有锁的线程
    10       int nextc = c + acquires; 
    11       if (nextc < 0) throw new Error("Maximum lock count exceeded"); 
    12       setState(nextc); 
    13       return true; 
    14   }
    15   return false; 
    16 }

      该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即在获取锁前,先判断加入了同步队列中当前节点是否有前驱节点,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。可参考:https://www.cnblogs.com/ljl150/p/12585076.html

    【代码演示】:公平性获取

     1 import java.util.concurrent.locks.Lock;
     2 import java.util.concurrent.locks.ReentrantLock;
     3 class ThreadDome implements Runnable{
     4     String name;
     5     public ThreadDome(String name){
     6         this.name=name;
     7     }
     8     Lock lock=new ReentrantLock(true);//公平性
     9     @Override
    10     public void run() {
    11         for (int i = 0; i <2; i++){
    12             lock.lock();
    13             try{
    14                 Thread.sleep(1000);
    15                 System.out.println(this.name);
    16             }catch (Exception e){
    17                 e.getMessage();
    18             }finally {
    19                 lock.unlock();
    20             }
    21         }
    22     }
    23 }
    24 public class ReentrantLockTest {
    25     public static void main(String[] args) {
    26         for (int i = 0; i <5; i++) {
    27             new Thread(new ThreadDome(""+i)).start();
    28         }
    29     }
    30 }
    运行结果:

    【代码演示】:非公平性获取

    与公平性的代码仅在这句代码:Lock lock=new ReentrantLock(false);//非公平性

    运行结果:  


       在代码运行结果图中可知,公平性锁每次都是从同步队列中的第一个节点获取到锁,所以每次运行的线程都不一样,而非公平性锁出现了一个线程连续获取锁的情况。

      为什么会出现线程连续获取锁的情况呢?回顾nonfairTryAcquire(int acquires)方法,当一个线程请求锁时,只要获取了同步状态即成功获取锁。在这个前提下,刚释放锁的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。这也就是非公平性锁会可能造成使线程“饥饿”的原因,既然这样,它又为什么被设定成默认的实现呢?因为非公平性锁的开销更小。如果把每次不同线程获取到锁定义为1次切换,公平性锁在测试中进行了10次切换,而非公平性锁只有5次切换。

    3.ReentrantLock可响应中断

      当使用synchronized实现锁时,阻塞在锁上的线程除非获得锁否则将一直等待下去,也就是说这种无限等待获取锁的行为无法被中断。而ReentrantLock给我们提供了一个可以响应中断的获取锁的方法lockInterruptibly()。该方法可以用来解决死锁问题。

    public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
     public final void acquireInterruptibly(int arg)throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
     4.ReentrantLock类的其他方法
    • getHoldCount() 查询当前线程保持此锁的次数,也就是执行此线程执行lock方法的次数
    • getQueueLength()返回正等待获取此锁的线程估计数,比如启动10个线程,1个线程获得锁,此时返回的是9
    • getWaitQueueLength(Condition condition)返回等待与此锁相关的给定条件的线程估计数。比如10个线程,用同一个condition对象,并且此时这10个线程都执行了condition对象的await方法,那么此时执行此方法返回10
    • hasWaiters(Condition condition)查询是否有线程等待与此锁有关的给定条件(condition),对于指定contidion对象,有多少线程执行了condition.await方法
    • hasQueuedThread(Thread thread)查询给定线程是否等待获取此锁
    • hasQueuedThreads()是否有线程等待此锁
    • isFair()该锁是否公平锁
    • isHeldByCurrentThread() 当前线程是否保持锁锁定,线程的执行lock方法的前后分别是false和true
    • isLock()此锁是否有任意线程占用
    • lockInterruptibly()如果当前线程未被中断,获取锁
    • tryLock()尝试获得锁,仅在调用时锁未被线程占用,获得锁
    • tryLock(long timeout TimeUnit unit)如果锁在给定等待时间内没有被另一个线程保持,则获取该锁
     
  • 相关阅读:
    正则表达式
    Requests库基本使用(转载)
    prometheus + grafana + pushgateway 搭建监控可视化系统
    Docker的系统资源限制(转载)
    DAY8 文件操作
    DAY7 集合,深浅copy
    DAY6 Python之代码块,小数据池的详解
    DAY5 Python基础类型之字典
    DAY4 Python数据类型之列表
    DAY3 python基础之数据类型总览
  • 原文地址:https://www.cnblogs.com/ljl150/p/12568820.html
Copyright © 2011-2022 走看看