zoukankan      html  css  js  c++  java
  • ReentrantLock Condition await signal 专题

    Condition的执行方式,是当在线程T1中调用await方法后,线程T1将释放锁,并且将自己阻塞,等待唤醒,
    
    线程T2获取到锁后,开始做事,完毕后,调用Condition的signal方法,唤醒线程T1,在t2执行完unlock后,线程T1恢复执行。

     signalAll和signal很像,内部就是将Condition队列里所有的Node都加入到release队列中,仅此而已

    代码如下:

    import org.joda.time.LocalDateTime;
    
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class LockTest {
    
        private static final String separator = " - ";
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();
    
        private class T1 implements Runnable {
    
            public void run() {
                try {
                    lock.lock();
                    System.out.println(LocalDateTime.now().toString("yyyy-MM-dd HH:mm:ss") + separator + Thread.currentThread() + " after lock");
                    try {
                        condition.await();//释放当前线程占用的锁,并阻塞当前线程,等待唤醒
                        System.out.println(LocalDateTime.now().toString("yyyy-MM-dd HH:mm:ss") + separator + Thread.currentThread() + " after await");
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                } finally {
                    lock.unlock();
                    System.out.println(LocalDateTime.now().toString("yyyy-MM-dd HH:mm:ss") + separator + Thread.currentThread() + " after unlock");
                }
    
            }
    
        }
    
        private class T2 implements Runnable {
    
            public void run() {
                try {
                    lock.lock();
                    System.out.println(LocalDateTime.now().toString("yyyy-MM-dd HH:mm:ss") + separator + Thread.currentThread() + " after lock");
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        condition.signal();
                        System.out.println(LocalDateTime.now().toString("yyyy-MM-dd HH:mm:ss") + separator + Thread.currentThread() + " after signal");
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                } finally {
                    lock.unlock();
                    System.out.println(LocalDateTime.now().toString("yyyy-MM-dd HH:mm:ss") + separator + Thread.currentThread() + " after unlock");
                }
            }
        }
    
        private void run() throws InterruptedException {
            new Thread(new T1(), T1.class.getSimpleName()).start();
            TimeUnit.MILLISECONDS.sleep(50);
            new Thread(new T2(), T2.class.getSimpleName()).start();
        }
    
        public static void main(String[] args) throws Exception {
            LockTest lt = new LockTest();
            lt.run();
            TimeUnit.MINUTES.sleep(1);
        }
    
    }

    输出:

    2017-05-03 18:26:42 - Thread[T1,5,main] after lock
    2017-05-03 18:26:42 - Thread[T2,5,main] after lock
    2017-05-03 18:26:43 - Thread[T2,5,main] after signal
    2017-05-03 18:26:43 - Thread[T2,5,main] after unlock
    2017-05-03 18:26:43 - Thread[T1,5,main] after await
    2017-05-03 18:26:43 - Thread[T1,5,main] after unlock

    Condition的执行方式,是当在线程T1中调用await方法后,线程T1将释放锁,并且将自己沉睡,等待唤醒,

    线程T2获取到锁后,开始做事,完毕后,调用Condition的signal方法,唤醒线程T1,线程T1恢复执行。

    以上说明Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。

    ReentrantLock(重入锁)是jdk的concurrent包提供的一种独占锁的实现。它继承自Dong Lea的 AbstractQueuedSynchronizer(同步器),确切的说是ReentrantLock的一个内部类继承了AbstractQueuedSynchronizer,ReentrantLock只不过是代理了该类的一些方法,可能有人会问为什么要使用内部类在包装一层? 我想是安全的关系,因为AbstractQueuedSynchronizer中有很多方法,还实现了共享锁,Condition(稍候再细说)等功能,如果直接使ReentrantLock继承它,则很容易出现AbstractQueuedSynchronizer中的API被无用的情况。

    首先还是要明白,reentrantLock.newCondition() 返回的是Condition的一个实现,该类在AbstractQueuedSynchronizer中被实现,叫做newCondition()

    public Condition newCondition()   { 
           return sync.newCondition();
     }

    它可以访问AbstractQueuedSynchronizer中的方法和其余内部类(AbstractQueuedSynchronizer是个抽象类,至于他怎么能访问,这里有个很奇妙的点,后面我专门用demo说明 )

    现在,我们一起来看下Condition类的实现,还是从上面的demo入手,

    为了方便书写,我将AbstractQueuedSynchronizer缩写为AQS

    当await被调用时,代码如下:

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter(); // 将当前线程包装下后,
                                          // 添加到Condition自己维护的一个链表中。
        int savedState = fullyRelease(node);// 释放当前线程占有的锁,从demo中看到,
                                            // 调用await前,当前线程是占有锁的
     
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {// 释放完毕后,遍历AQS的队列,看当前节点是否在队列中,
            // 不在 说明它还没有竞争锁的资格,所以将继续阻塞// 直到它被加入到队列中,聪明的你可能猜到了,
            // 没有错,在singal的时候加入不就可以了?
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

    回到上面的demo,锁被释放后,线程1开始沉睡,这个时候线程因为线程1沉睡时,会唤醒AQS队列中的头结点,所所以线程2会开始竞争锁,并获取到,等待3秒后,线程2会调用signal方法,“发出”signal信号,signal方法如下:

    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter; // firstWaiter为condition自己维护的一个链表的头结点,
                                  // 取出第一个节点后开始唤醒操作
        if (first != null)
            doSignal(first);
    }

    说明下,其实Condition内部维护了等待队列的头结点和尾节点,该队列的作用是存放等待signal信号的线程,该线程被封装为Node节点后存放于此。

    public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

    关键的就在于此,我们知道AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行。直到队列为空。

    而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:

    1. 线程1调用reentrantLock.lock时,线程被加入到AQS的等待队列中。
    2. 线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。
    3. 接着马上被加入到Condition的等待队列中,以为着该线程需要signal信号。
    4. 线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的等待队列中。
    5. 线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。 注意,这个时候,线程1 并没有被唤醒。
    6. signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。
    7. 直到释放所整个过程执行完毕。

    可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

    看到这里,signal方法的代码应该不难理解了。

    取出头结点,然后doSignal

    public final void signal() {
        if (!isHeldExclusively()) {
            throw new IllegalMonitorStateException();
        }
        Node first = firstWaiter;
        if (first != null) {
            doSignal(first);
        }
    }
     
    private void doSignal(Node first) {
        do {
            if ((firstWaiter = first.nextWaiter) == null) // 修改头结点,完成旧头结点的移出工作
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) && // 将老的头结点,加入到AQS的等待队列中
                 (first = firstWaiter) != null);
    }
     
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
     
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or attempt
         * to set waitStatus fails, wake up to resync (in which case the
         * waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果该结点的状态为cancel 或者修改waitStatus失败,则直接唤醒。
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

    可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)这个判断是不会为true的,所以,不会在这个时候唤醒该线程。

    只有到发送signal信号的线程调用reentrantLock.unlock()后因为它已经被加到AQS的等待队列中,所以才会被唤醒。

    总结:

    本文从代码的角度说明了Condition的实现方式,其中,涉及到了AQS的很多操作,比如AQS的等待队列实现独占锁功能,不过,这不是本文讨论的重点,等有机会再将AQS的实现单独分享出来。

    http://www.importnew.com/9281.html

    Condition的await-signal流程
    Condition类图:

    Condition接口包含了多种await方式和两个通知方法
    ConditionObject实现了Condition接口,是AbstractQueuedSynchronizer的内部类
    Reentrantlock的newCondition方法返回与某个lock实例相关的Condition对象

    和release队列一样,Condition队列也是虚拟队列,每个Node通过nextWaiter进行关联。因为Condition Node要变为release Node才可以解除阻塞,所以不需要prevWaiter,这一点下面会有说明。

    大概的整个过程是:

    调用await的线程都会进入一个Condition队列。调用signal的线程每一次都会从firstWaiter开始找出未取消的Condition Node放到release队列里,然后调用signal的线程在await或者unlock的时候执行release方法才有机会将其解除阻塞。相对于lock-unlock,正常的流程要简单一些,但是对于中断处理会更为复杂。

    先看看调用await()至阻塞的过程

    如图所示,该过程可分为三个步骤:

    新建Condition Node包装线程,加入Condition队列
    释放当前线程占用的锁
    阻塞当前线程
    在阻塞当前线程之前,要判断Condition Node是否在release队列里。如果在的话则没必要阻塞,可直接参与锁竞争。关键代码如下:

    signal方法更简单一些,就是从firstWaiter开始,找到一个没有取消的Node放入release队列。但是即使一开始找到的Node没被取消,但是入队列的时候也可能会被取消,因此代码对这个情况做了点特殊处理。我根据自己的理解将代码做了如下解释:
    我们可以看到,signal方法只是将Node修改了状态,并没有唤醒线程。要将修改状态后的Node唤醒,一种是再次调用await(),一种是调用unlock()。这两个方法内部都会执行release方法对release队列里的Node解除阻塞,关于这点我在上一篇文章里已经说明了。

    下面我把调用await()的线程被解除阻塞后的流程也画了一下:

     以上就是await和signal的详细流程。signalAll和signal很像,内部就是将Condition队列里所有的Node都加入到release队列中,仅此而已。

     http://www.cnblogs.com/jycboy/p/5623238.html






  • 相关阅读:
    Codeforces 691A Fashion in Berland
    HDU 5741 Helter Skelter
    HDU 5735 Born Slippy
    HDU 5739 Fantasia
    HDU 5738 Eureka
    HDU 5734 Acperience
    HDU 5742 It's All In The Mind
    POJ Euro Efficiency 1252
    AtCoder Beginner Contest 067 C
    AtCoder Beginner Contest 067 D
  • 原文地址:https://www.cnblogs.com/softidea/p/4178856.html
Copyright © 2011-2022 走看看