一、LockSupport 工具
1.1 LockSupport 介绍
当需要阻塞或唤醒一个线程的时候,都会使用LockSupport工具类来完成相应工作。LockSupport定义了一组的公共静态方法,这些方法提供了基本的线程阻塞和唤醒功能,而LockSupport也成为构建同步组件的基础工具
LockSupport定义了一组以park开通的方法用来阻塞当前线程,以及unpark(Thread thread)方法来唤醒一个被阻塞的线程,Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则指车辆启动离开,这些方法描述如下:
LockSupport工具类
1.2 LockSupport 使用
- 示例代码
1 public class TestLockSupport { 2 3 public static void main(String[] args) { 4 5 Thread t0 = new Thread(new Runnable() { 6 7 @Override 8 public void run() { 9 System.out.println(Thread.currentThread().getName() + ",开始执行!"); 10 for(;;){//spin 自旋 11 System.out.println(Thread.currentThread().getName() + ",准备park住当前线程"); 12 LockSupport.park(); 13 // System.out.println("Thread.interrupted() == " + Thread.interrupted()); 14 System.out.println(Thread.currentThread().getName() + ",当前线程已经被唤醒"); 15 } 16 } 17 18 },"t0"); 19 20 t0.start(); 21 22 try { 23 Thread.sleep(5000); 24 System.out.println("准备唤醒" + t0.getName() + "线程!"); 25 LockSupport.unpark(t0); 26 // t0.interrupt(); 27 } catch (InterruptedException e) { 28 e.printStackTrace(); 29 } 30 } 31 }
结果如下:
- 使用interrupt()也能唤醒线程,修改代码25,26行如下:
1 // LockSupport.unpark(t0); 2 t0.interrupt();
运行结果:线程t0依然会被唤醒,但是park()无法中断线程了
总结原因:
1、interrupt():有唤醒线程的作用
2、park()方法,是根据线程的中断标识来判断是否中断线程,当调用interrupt()方法后,线程中存在中断标识,park()方法判断线程中断了,就不会进行中断线程操作,实际线程未中断,解决办法是在未中断线程使用Thread.interrupted()方法,取出中断标识,那么下次park()方法就能起作用,即在代码12行后插入:Thread.interrupted(); 代码即可
二、Condition 接口
任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()、notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是着两者在使用方式以及功能特性上还是有差别的。
以下是Object的监视器方法与Condition接口对比
2.1 Condition 接口与示例
Condition定义了等待/通知两种类型的方法,当前线程调用了这些方法时,需要前提获取到Condition对象关联的锁。Condition对象有Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的
代码示例
1 public class TestCondition { 2 Lock lock = new ReentrantLock(); 3 Condition condition = lock.newCondition(); 4 5 public void conditionWait() throws InterruptedException { 6 lock.lock(); 7 try { 8 System.out.println("~~~条件等待前~~~"); 9 condition.await(); 10 System.out.println("~~~条件等待后~~~"); 11 } finally { 12 lock.unlock(); 13 } 14 } 15 16 public void conditionSignal() throws InterruptedException { 17 lock.lock(); 18 try { 19 System.out.println("~~~条件唤醒前~~~"); 20 condition.signal(); 21 System.out.println("~~~条件唤醒后~~~"); 22 } finally { 23 lock.unlock(); 24 } 25 } 26 27 public static void main(String[] args) { 28 TestCondition testCondition = new TestCondition(); 29 Thread t1 = new Thread() { 30 @Override 31 public void run() { 32 try { 33 testCondition.conditionWait(); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 } 38 }; 39 Thread t2 = new Thread() { 40 @Override 41 public void run() { 42 try { 43 testCondition.conditionSignal(); 44 } catch (InterruptedException e) { 45 e.printStackTrace(); 46 } 47 } 48 }; 49 t1.start(); 50 51 try { 52 Thread.sleep(2000); 53 } catch (InterruptedException e) { 54 e.printStackTrace(); 55 } 56 System.out.println("t1.isAlive() = " + t1.isAlive()); 57 t2.start(); 58 } 59 }
如示例所示,一般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且返回前已经获得了锁
Condition定义部分方法
- await() :造成当前线程在接到信号或被中断之前一直处于等待状态。
- await(long time, TimeUnit unit) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
- awaitNanos(long nanosTimeout) :造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在nanosTimesout之前唤醒,那么返回值 = nanosTimeout - 消耗时间,如果返回值 <= 0 ,则可以认定它已经超时了。
- awaitUninterruptibly() :造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
- awaitUntil(Date deadline) :造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回true,否则表示到了指定时间,返回返回false。
- signal():唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
- signal()All:唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
每个Condition对象都包含着一个FIFO队列,该队列是Condition对象通知/等待功能的关键。在队列中每一个节点都包含着一个线程引用,该线程就是在该Condition对象上等待的线程。
1 public class ConditionObject implements Condition, java.io.Serializable { 2 private static final long serialVersionUID = 1173984872572414699L; 3 4 //头节点 5 private transient Node firstWaiter; 6 //尾节点 7 private transient Node lastWaiter; 8 9 public ConditionObject() { 10 } 11 12 /** 省略方法 **/ 13 }
-
await()
1 public final void await() throws InterruptedException { 2 // 当前线程中断 3 if (Thread.interrupted()) 4 throw new InterruptedException(); 5 //当前线程加入等待队列 6 Node node = addConditionWaiter(); 7 8 //释放锁 9 int savedState = fullyRelease(node); 10 int interruptMode = 0; 11 12 /** 13 * 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待 14 * 直到检测到此节点在同步队列上 15 */ 16 while (!isOnSyncQueue(node)) { 17 //线程挂起 18 LockSupport.park(this); 19 //如果已经中断了,则退出 20 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) 21 break; 22 } 23 //被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。 24 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) 25 interruptMode = REINTERRUPT; 26 27 //清理下条件队列中的不是在等待条件的节点 28 if (node.nextWaiter != null) // clean up if cancelled 29 unlinkCancelledWaiters(); 30 if (interruptMode != 0) 31 reportInterruptAfterWait(interruptMode); 32 } 33 34 35 private Node addConditionWaiter() { 36 Node t = lastWaiter; 37 // If lastWaiter is cancelled, clean out. 38 //Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点 39 if (t != null && t.waitStatus != Node.CONDITION) { 40 unlinkCancelledWaiters(); 41 t = lastWaiter; 42 } 43 Node node = new Node(Thread.currentThread(), Node.CONDITION); 44 45 //将该节点加入到条件队列中最后一个位置 46 if (t == null) 47 firstWaiter = node; 48 else 49 t.nextWaiter = node; 50 lastWaiter = node; 51 return node; 52 } 53 54 55 final int fullyRelease(Node node) { 56 boolean failed = true; 57 try { 58 //节点状态--其实就是持有锁的数量 59 int savedState = getState(); 60 //释放锁 61 if (release(savedState)) { 62 failed = false; 63 return savedState; 64 } else { 65 throw new IllegalMonitorStateException(); 66 } 67 } finally { 68 if (failed) 69 node.waitStatus = Node.CANCELLED; 70 } 71 } 72 73 //isOnSyncQueue(Node node): 74 //如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true 75 76 final boolean isOnSyncQueue(Node node) { 77 if (node.waitStatus == Node.CONDITION || node.prev == null) 78 return false; 79 if (node.next != null) // If has successor, it must be on queue 80 return true; 81 return findNodeFromTail(node); 82 }
-
signal()
1 public final void signal() { 2 if (!isHeldExclusively()) 3 throw new IllegalMonitorStateException(); 4 //头节点,唤醒条件队列中的第一个节点 5 Node first = firstWaiter; 6 if (first != null) 7 doSignal(first); 8 } 9 10 11 private void doSignal(Node first) { 12 do { 13 //修改头结点,完成旧头结点的移出工作 14 if ( (firstWaiter = first.nextWaiter) == null) 15 lastWaiter = null; 16 first.nextWaiter = null; 17 } while (!transferForSignal(first) &&//将老的头结点,加入到AQS的等待队列中 18 (first = firstWaiter) != null); 19 } 20 21 22 23 final boolean transferForSignal(Node node) { 24 /* 25 * If cannot change waitStatus, the node has been cancelled. 26 */ 27 //将该节点从状态CONDITION改变为初始状态0, 28 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) 29 return false; 30 31 /* 32 * Splice onto queue and try to set waitStatus of predecessor to 33 * indicate that thread is (probably) waiting. If cancelled or 34 * attempt to set waitStatus fails, wake up to resync (in which 35 * case the waitStatus can be transiently and harmlessly wrong). 36 */ 37 //将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点 38 Node p = enq(node); 39 int ws = p.waitStatus; 40 //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒 41 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) 42 LockSupport.unpark(node.thread); 43 return true; 44 }
整个通知的流程如下
-
判断当前线程是否已经获取了锁,如果没有获取则直接抛出异常,因为获取锁为通知的前置条件。
-
如果线程已经获取了锁,则将唤醒条件队列的首节点
-
唤醒首节点是先将条件队列中的头节点移出,然后调用AQS的enq(Node node)方法将其安全地移到CLH同步队列中
-
最后判断如果该节点的同步状态是否为Cancel,或者修改状态为Signal失败时,则直接调用LockSupport唤醒该节点的线程。
总结
-
一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,
-
然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在CLH同步队列了,
-
如果是则尝试获取锁,否则一直挂起。
-
当线程调用signal()方法后,程序首先检查当前线程是否获取了锁,
-
然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。
-
被唤醒的线程,将从await()方法中的while循环中退出来,
-
然后调用acquireQueued()方法竞争同步状态。
三、具体Condition应用分析
参考:【Java多线程】BlockingQueue阻塞队列实现原理(十四)
参考文章:
1、《Java并发编程的艺术》
2、https://www.jianshu.com/p/5d83a8c6a6f3