zoukankan      html  css  js  c++  java
  • 【Java多线程】LockSupport 工具 、Condition接口(十三)

    一、LockSupport 工具

    1.1 LockSupport 介绍

      当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。LockSupport定义了一组的公共静态方法,这些方法提供了基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具

      LockSupport定义了一组以park开通的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程,Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则指车辆启动离开,这些方法描述如下:

      LockSupport工具类

      

    1.2 LockSupport 使用

    • 示例代码
     1 public class TestLockSupport {
     2 
     3     public static void main(String[] args) {
     4 
     5         Thread t0 = new Thread(new Runnable() {
     6 
     7             @Override
     8             public void run() {
     9                 System.out.println(Thread.currentThread().getName() + ",开始执行!");
    10                 for(;;){//spin 自旋
    11                     System.out.println(Thread.currentThread().getName() + ",准备park住当前线程");
    12                     LockSupport.park();
    13 //                    System.out.println("Thread.interrupted() == " + Thread.interrupted());
    14                     System.out.println(Thread.currentThread().getName() + ",当前线程已经被唤醒");
    15                 }
    16             }
    17 
    18         },"t0");
    19 
    20         t0.start();
    21 
    22         try {
    23             Thread.sleep(5000);
    24             System.out.println("准备唤醒" + t0.getName() + "线程!");
    25             LockSupport.unpark(t0);
    26 //            t0.interrupt();
    27         } catch (InterruptedException e) {
    28             e.printStackTrace();
    29         }
    30     }
    31 }   

      结果如下:

      

    • 使用interrupt()也能唤醒线程,修改代码25,26行如下:
      1 //            LockSupport.unpark(t0);
      2             t0.interrupt();

      运行结果:线程t0依然会被唤醒,但是park()无法中断线程了

      总结原因:

        1、interrupt():有唤醒线程的作用

        2、park()方法,是根据线程的中断标识来判断是否中断线程,当调用interrupt()方法后,线程中存在中断标识,park()方法判断线程中断了,就不会进行中断线程操作,实际线程未中断,解决办法是在未中断线程使用Thread.interrupted()方法,取出中断标识,那么下次park()方法就能起作用,即在代码12行后插入:Thread.interrupted(); 代码即可

    二、Condition 接口

      任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()、notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是着两者在使用方式以及功能特性上还是有差别的。

      以下是Object的监视器方法与Condition接口对比

      

    2.1 Condition 接口与示例

      Condition定义了等待/通知两种类型的方法,当前线程调用了这些方法时,需要前提获取到Condition对象关联的锁。Condition对象有Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的

      代码示例

     1 public class TestCondition {
     2     Lock lock = new ReentrantLock();
     3     Condition condition = lock.newCondition();
     4 
     5     public void conditionWait() throws InterruptedException {
     6         lock.lock();
     7         try {
     8             System.out.println("~~~条件等待前~~~");
     9             condition.await();
    10             System.out.println("~~~条件等待后~~~");
    11         } finally {
    12             lock.unlock();
    13         }
    14     }
    15 
    16     public void conditionSignal() throws InterruptedException {
    17         lock.lock();
    18         try {
    19             System.out.println("~~~条件唤醒前~~~");
    20             condition.signal();
    21             System.out.println("~~~条件唤醒后~~~");
    22         } finally {
    23             lock.unlock();
    24         }
    25     }
    26 
    27     public static void main(String[] args) {
    28         TestCondition testCondition = new TestCondition();
    29         Thread t1 = new Thread() {
    30             @Override
    31             public void run() {
    32                 try {
    33                     testCondition.conditionWait();
    34                 } catch (InterruptedException e) {
    35                     e.printStackTrace();
    36                 }
    37             }
    38         };
    39         Thread t2 = new Thread() {
    40             @Override
    41             public void run() {
    42                 try {
    43                     testCondition.conditionSignal();
    44                 } catch (InterruptedException e) {
    45                     e.printStackTrace();
    46                 }
    47             }
    48         };
    49         t1.start();
    50 
    51         try {
    52             Thread.sleep(2000);
    53         } catch (InterruptedException e) {
    54             e.printStackTrace();
    55         }
    56         System.out.println("t1.isAlive() = " + t1.isAlive());
    57         t2.start();
    58     }
    59 }

      如示例所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且返回前已经获得了锁

      Condition定义部分方法

      

    • await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
    • await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    • awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
    • awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
    • awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
    • signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
    • signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。

      每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。

     1 public class ConditionObject implements Condition, java.io.Serializable {
     2     private static final long serialVersionUID = 1173984872572414699L;
     3     
     4     //头节点
     5     private transient Node firstWaiter;
     6     //尾节点
     7     private transient Node lastWaiter;
     8 
     9     public ConditionObject() {
    10     }
    11     
    12     /** 省略方法 **/
    13 }

       

    • await()

       1 public final void await() throws InterruptedException {
       2     // 当前线程中断
       3     if (Thread.interrupted())
       4         throw new InterruptedException();
       5     //当前线程加入等待队列
       6     Node node = addConditionWaiter();
       7 
       8     //释放锁
       9     int savedState = fullyRelease(node);
      10     int interruptMode = 0;
      11 
      12     /**
      13      * 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
      14      * 直到检测到此节点在同步队列上
      15      */
      16     while (!isOnSyncQueue(node)) {
      17         //线程挂起
      18         LockSupport.park(this);
      19         //如果已经中断了,则退出
      20         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
      21             break;
      22     }
      23 //被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
      24     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      25         interruptMode = REINTERRUPT;
      26 
      27     //清理下条件队列中的不是在等待条件的节点
      28     if (node.nextWaiter != null) // clean up if cancelled
      29         unlinkCancelledWaiters();
      30     if (interruptMode != 0)
      31         reportInterruptAfterWait(interruptMode);
      32 }
      33 
      34 
      35 private Node addConditionWaiter() {
      36     Node t = lastWaiter;
      37     // If lastWaiter is cancelled, clean out.
      38     //Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
      39     if (t != null && t.waitStatus != Node.CONDITION) {
      40         unlinkCancelledWaiters();
      41         t = lastWaiter;
      42     }
      43     Node node = new Node(Thread.currentThread(), Node.CONDITION);
      44 
      45     //将该节点加入到条件队列中最后一个位置
      46     if (t == null)
      47         firstWaiter = node;
      48     else
      49         t.nextWaiter = node;
      50     lastWaiter = node;
      51     return node;
      52 }
      53 
      54 
      55 final int fullyRelease(Node node) {
      56     boolean failed = true;
      57     try {
      58         //节点状态--其实就是持有锁的数量
      59         int savedState = getState();
      60         //释放锁
      61         if (release(savedState)) {
      62             failed = false;
      63             return savedState;
      64         } else {
      65             throw new IllegalMonitorStateException();
      66         }
      67     } finally {
      68         if (failed)
      69             node.waitStatus = Node.CANCELLED;
      70     }
      71 }
      72 
      73 //isOnSyncQueue(Node node):
      74 //如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true
      75 
      76 final boolean isOnSyncQueue(Node node) {
      77     if (node.waitStatus == Node.CONDITION || node.prev == null)
      78         return false;
      79     if (node.next != null) // If has successor, it must be on queue
      80         return true;
      81     return findNodeFromTail(node);
      82 }
    • signal()

       1 public final void signal() {
       2         if (!isHeldExclusively())
       3             throw new IllegalMonitorStateException();
       4         //头节点,唤醒条件队列中的第一个节点
       5         Node first = firstWaiter;
       6         if (first != null)
       7             doSignal(first);
       8     }
       9     
      10 
      11     private void doSignal(Node first) {
      12         do {
      13              //修改头结点,完成旧头结点的移出工作
      14             if ( (firstWaiter = first.nextWaiter) == null)
      15                 lastWaiter = null;
      16             first.nextWaiter = null;
      17         } while (!transferForSignal(first) &&//将老的头结点,加入到AQS的等待队列中
      18                  (first = firstWaiter) != null);
      19     }
      20 
      21 
      22 
      23     final boolean transferForSignal(Node node) {
      24         /*
      25          * If cannot change waitStatus, the node has been cancelled.
      26          */
      27          //将该节点从状态CONDITION改变为初始状态0,
      28         if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
      29             return false;
      30 
      31         /*
      32          * Splice onto queue and try to set waitStatus of predecessor to
      33          * indicate that thread is (probably) waiting. If cancelled or
      34          * attempt to set waitStatus fails, wake up to resync (in which
      35          * case the waitStatus can be transiently and harmlessly wrong).
      36          */
      37          //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
      38         Node p = enq(node);
      39         int ws = p.waitStatus;
      40        //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
      41         if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      42             LockSupport.unpark(node.thread);
      43         return true;
      44     }

       整个通知的流程如下

    • 判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。

    • 如果线程已经获取了锁,则将唤醒条件队列的首节点

    • 唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中

    • 最后判断如果该节点的同步状态是否为Cancel,或者修改状态为Signal失败时,则直接调用LockSupport唤醒该节点的线程。

      总结

    1. 一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中, 

    2. 然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了,

    3. 如果是则尝试获取锁,否则一直挂起。 

    4. 当线程调用signal()方法后,程序首先检查当前线程是否获取了锁, 

    5. 然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。 

    6. 被唤醒的线程,将从await()方法中的while循环中退出来, 

    7. 然后调用acquireQueued()方法竞争同步状态。

    三、具体Condition应用分析

      参考:【Java多线程】BlockingQueue阻塞队列实现原理(十四)

     

    参考文章:

      1、《Java并发编程的艺术》

      2、https://www.jianshu.com/p/5d83a8c6a6f3

  • 相关阅读:
    安卓内存不足(删除data/dalvik-cache目录)
    Blend Tree Type
    Unity 物理引擎动力学关节
    Daikon Forge GUI 制作图集和字体集
    Daikon Forge GUI 制作UI面板
    Vector3.Dot 判断方位
    (译)关于使用Eclipse Memory Analyzer的10点小技巧
    Android中Handler引起的内存泄露
    关于Android内存优化你应该知道的一切
    Android性能优化第(三)篇---MAT比Menmery Monitor更强大
  • 原文地址:https://www.cnblogs.com/h--d/p/14561534.html
Copyright © 2011-2022 走看看