前言
在单线程模型下,想要获取方法执行的结果比较简单,直接获取方法返回的结果即可。但是在多线程情况下,如何获取其他线程执行的结果,此时就涉及到多线程之间数据传输的问题。比如A线程内部开启B线程和C线程,A线程如何获取B、C线程的执行结果呢?另外在B、C线程执行过程中,A线程是一直处于阻塞状态等待还是非阻塞继续执行呢?如果能够获取到B、C线程的结果那么何时去获取呢?这都是获取多线程执行结果需要解决的问题,而在JUC中就提供了Future就可以实现。
1、Callable接口
想要通过异步获取结果,就离不开Callable接口,众所周知创建线程可以通过继承Thread、实现Runnable的方式,另外还有一种可以返回操作结果的方式就是实现Callable接口
Callable接口只定义了一个方法,就是返回操作结果,定义如下:
@FunctionalInterface public interface Callable<V> { /** * 返回操作结果或者直接抛异常 */ V call() throws Exception; }
Callable接口可以返回操作的结果,那么还需要有一个对象来接收操作的结果,该对象就是Future对象
2、Future接口
Future 用于获取异步操作的结果,异步操作之后,结果会存在Future中,可以通过get()方法获取,Future接口定义如下:
public interface Future<V> { /** * 取消异步操作 */ boolean cancel(boolean mayInterruptIfRunning); /** * 判断异步操作是否取消 */ boolean isCancelled(); /** * 判断异步操作是否完成 */ boolean isDone(); /** * 获取异步结果 */ V get() throws InterruptedException, ExecutionException; /** * 获取异步结果,并且设置了超时时间 */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
3、FutureTask类
FutureTask是Future的实现类,并且实现了Runnable接口,构造函数是Callable类型,所以FutureTask的作用就是创建线程执行FutureTask的内容并获取结果
3.1、使用案例
使用案例如下:
1 public static void main(String[] args) throws ExecutionException, InterruptedException { 2 /** 创建FutureTask,构造函数传入Callable*/ 3 FutureTask futureTask = new FutureTask(new Callable() { 4 @Override 5 public Object call() throws Exception { 6 Thread.sleep(3000L); 7 System.out.println("返回执行结果"); 8 return "test"; 9 } 10 }); 11 System.out.println("等待执行结果"); 12 /** 创建线程运行Callable内容*/ 13 futureTask.run(); 14 /** 阻塞等待FutureTask的执行结果*/ 15 String result = (String) futureTask.get(); 16 System.out.println("结果为:"+result); 17 }
用法比较简单,首先通过构造函数传入Callable来构建FutureTask对象,然后调用FutureTask的run方法创建线程执行Callable的call方法,最后主线程调用FutureTask的get方法阻塞等待获取call方法执行的结果
3.2、实现原理
FutureTask实现了Runnable接口和Future接口,所以需要实现Runnable的run方法以及Future的5个方法,这里重点分析下run方法和get方法即可。
首先从FutureTask的构造方法分析,构造方法如下:
/** 任务状态*/ private volatile int state; /** 任务*/ private Callable<V> callable; /** 执行任务线程 */ private volatile Thread runner; public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; /** 状态为新建状态*/ this.state = NEW; }
FutureTask内部有三个核心属性,分别表示任务的状态、任务的具体内容和执行任务的线程,构造方法创建FutureTask时会初始化callable和state两个属性,而执行的线程runner会留在任务执行时再创建.
FutureTask的run方法逻辑如下:
1 public void run() { 2 /** 1.通过Unsafe的CAS将当前线程Thread对象赋值给当前的FutureTask对象的runner属性 */ 3 if (state != NEW || 4 !UNSAFE.compareAndSwapObject(this, runnerOffset, 5 null, Thread.currentThread())) 6 return; 7 try { 8 Callable<V> c = callable; 9 if (c != null && state == NEW) { 10 V result; 11 /** 完成标志 */ 12 boolean ran; 13 try { 14 /** 2.执行Callable的call方法并获取结果 */ 15 result = c.call(); 16 ran = true; 17 } catch (Throwable ex) { 18 result = null; 19 ran = false; 20 setException(ex); 21 } 22 if (ran) 23 /** 3.设置操作结果 */ 24 set(result); 25 } 26 } finally { 27 /** 状态重制*/ 28 runner = null; 29 int s = state; 30 if (s >= INTERRUPTING) 31 handlePossibleCancellationInterrupt(s); 32 } 33 }
逻辑比较清晰,首先是给FutureTask对象的runner赋值为当前线程Thread对象;然后直接执行Callable的call()方法执行;最好将call()执行结果通过set方法进行设置,具体如何设置就需要继续查看set方法逻辑如下,
1 /** 设置操作结果*/ 2 protected void set(V v) { 3 /** 1.设置FutureTask对象的state属性 */ 4 if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { 5 /** 2.赋值操作结果*/ 6 outcome = v; 7 /** 3.通过CAS设置state为完成状态NORMAL */ 8 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state 9 /** 4.完成操作*/ 10 finishCompletion(); 11 } 12 } 13 14 /** 完成操作*/ 15 private void finishCompletion() { 16 /** 1.获取当前等待节点waiters赋值给q */ 17 for (WaitNode q; (q = waiters) != null;) { 18 /** 2.死循环中判断当前waiters设置为null */ 19 if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { 20 for (;;) { 21 Thread t = q.thread; 22 if (t != null) { 23 q.thread = null;
/** 唤醒等待结果的线程*/ 24 LockSupport.unpark(t); 25 } 26 WaitNode next = q.next; 27 if (next == null) 28 break; 29 q.next = null; // unlink to help gc 30 q = next; 31 } 32 break; 33 } 34 } 35 /** 2.任务完成,空实现,子类可用于扩展*/ 36 done(); 37 /** 3.重置callable属性 */ 38 callable = null; // to reduce footprint 39 }
核心就是通过CAS更新FutureTask的状态,然后将操作结果赋值的FutureTask的outcome属性用于存储操作结果,而最后如果想要获取操作结果,就可以通过FutureTask的get方法获取outcome的值即可,逻辑如下:
1 public V get() throws InterruptedException, ExecutionException { 2 int s = state; 3 if (s <= COMPLETING) 4 /** 如果当前状态不是已完成状态,那么就执行await进入等待状态,等待被唤醒*/ 5 s = awaitDone(false, 0L); 6 return report(s); 7 } 8 9 private V report(int s) throws ExecutionException { 10 Object x = outcome; 11 if (s == NORMAL) 12 /** 返回操作结果*/ 13 return (V)x; 14 if (s >= CANCELLED) 15 throw new CancellationException(); 16 throw new ExecutionException((Throwable)x); 17 }
在获取结果时判断当前状态是否是已经完成,如果不是就需要调用awaitDone方法等待操作结果,实际是调用LockSupport的park方法进入等待状态,逻辑如下:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
总结:
1、FutureTask内部有Callable对象,FutureTask实现了Future和Runnable接口,执行run方法时会用调用run方法的线程执行Callable的call方法,并且将结果存入FutureTask的内部属性outcome中;
2、当不同线程调用FutureTask对象的get方法时,如果当前FutureTask状态已完成就直接返回结果;如果还未完成就通过LockSupport的park方法进行阻塞等待,多个线程等待就会封装成WaitNode节点对象组成链表结构;
3、当FutureTask任务执行完成之后,如果等待链表存在就会遍历链表中WaitNode节点,并依次通过LockSupport的unpark方法唤醒等待的线程,等待的线程被唤醒后就可以直接获取FutureTask的结果
4、FutureTask通常会和线程池配合使用,将FutureTask直接提交给线程池处理