从前创建线程的弊端
- 没有返回值
- 不能捕获异常
现在使用Callable+FutureTask既可以有返回值,也可以捕获异常
当有了返回值后,我们就可以不用一直等着线程的结果,而是可以先干点别的事情,最后凭future获取结果,例如星期六你去蛋糕店做蛋糕,店员给你一张小票,你这时候可以先去看一部电影,看完回到蛋糕店凭小票领取蛋糕即可,这样就可以省去你等待做蛋糕花费的时间,这里的future就相当于小票
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FutureTaskDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Callable<Integer> call = () -> {
System.out.println("正在计算结果。。。");
Thread.sleep(1000);
return 1;
};
FutureTask<Integer> futureTask = new FutureTask<>(call);
new Thread(futureTask).start();
// 做点别的事情
System.out.println("do something...");
System.out.println("拿到的结果为 " + futureTask.get());
}
}
源码解析
-
在new FutureTask的时候需要传入Callable接口,那么先去看看FutureTask的构造方法:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // 这里可以看到,FutureTask自己维护了一个Callable变量 this.state = NEW; // ensure visibility of callable }
-
然后我们需要将FutureTask对象传入Thread类中,那么在Thread类中的run()方法里面又发生了什么呢?
public void run() { if (target != null) { target.run(); // 执行了target的run()方法 } }
其实在Thread类中的run()方法很简单,只是执行了target的run()方法,这个target就是我们在构建Thread对象时传入的Runnable对象,在这里就是FutureTask对象(FutureTask继承了Runnable)
-
因此直接去FutureTask的run()方法查看
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 将自己维护的Callable对象赋值给c if (c != null && state == NEW) { V result; boolean ran; try { // 在这里执行了Callable的run()方法,也就是我们自己写的run()方法,并且拿到了返回结果 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); // 在这里将返回的结果设置供set()方法获取调用 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
深入解析set()以及get()方法
首先,需要先了解FutureTask类里面定义的几个状态,当FutureTask被new出来的时候,就会把state设置为NEW状态,表示这是新创建的任务
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
下面可以看set()跟get()方法了
set()方法
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 将状态修改为COMPLETING outcome = v; // 将执行结果设置给FutureTask维护的outcome变量 // 如果一切设置成功,就将任务状态设置为NORMAL状态,这便是已经完成了 UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 当设置成功后,就会执行这个方法,将所有先前执行了FutureTask.get()方法的线程唤醒(因为在之前代码还未执行完,没有得到结果,所以执行get()的线程都会被等待,一知道结果出来为止) finishCompletion(); } } private void finishCompletion() { // 这里的WaitNode里面存储了对应的线程,这是一个链式结构,在这一步会一个一个遍历唤醒 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); // 主要是在这一步对线程进行唤醒 } // 获取到下一个节点 WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } // 这个done方法并未在FutureTask中有任何有效的代码,如下。我猜测是一个钩子函数,等set执行完就会立即执行 // 这个done可以自己实现,可以自定义一个类继承FutureTask或者用匿名内部类实现 done(); callable = null; } protected void done() { } // 实现如下 FutureTask<Integer> futureTask = new FutureTask<Integer>(call) { @Override protected void done() { System.out.println("=======完成了"); } };
get()方法
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) // 假设状态值小于COMPLETING,也就是说任务还没有完成,就执行下面的等待方法 s = awaitDone(false, 0L); // 如果任务没有执行完,那么会将当前线程等待在这里 return report(s); // 返回处理结果,方法在下面展示 } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); // 如果线程被中断了,就把所有的WaitNode移除 throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 如果任务完成,直接返回状态 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 如果已经完成了任务的执行,就没要继续占用cpu的执行权,让出即可 // yield()方法会通知线程调度器放弃对处理器的占用,但调度器可以忽视这个通知。yield()方法主要是为了保障线程间调度的连续性,防止某个线程一直长时间占用cpu资源。 Thread.yield(); else if (q == null) // 如果当前访问get()方法的线程还没有对应的WaitNode,就创建 // 而且会把当前线程放入WaitNode中 q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 如果设置了超时时间,进行检测,如果超时了就一处所有的WaitNode nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 如果任务状态小于COMPLETING,也就是还没有执行完,那么就把当前线程阻塞,这也是这个方法的关键 LockSupport.park(this); } } private V report(int s) throws ExecutionException { // 这个outcome是FutureTask维护的变量,在先前的set()方法中已经将结果设置给outcome Object x = outcome; if (s == NORMAL) return (V)x; // 正常执行完,返回结果 if (s >= CANCELLED) // 如果非正常执行完,抛出异常 throw new CancellationException(); // 此处会将异常转为Throwable,是所有异常的基类 throw new ExecutionException((Throwable)x); } // 这是WaitNode的实现 static final class WaitNode { volatile Thread thread; volatile WaitNode next; // 每次创建,都会把当前正在执行的线程存进来 WaitNode() { thread = Thread.currentThread(); } }
以上就是FutureTask基本的一些api的源码解读
Fork/Join框架
背景
示例
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Integer> {
private Integer begin;
private Integer end;
public ForkJoinDemo(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
if (end - begin <= 2) {
// 计算
for (int i = begin; i <= end; i++) {
sum += i;
}
} else {
// 拆分任务
ForkJoinDemo forkJoinDemo = new ForkJoinDemo(begin, (begin + end) / 2);
ForkJoinDemo forkJoinDemo2 = new ForkJoinDemo((begin + end) / 2 + 1, end);
// 执行子任务
forkJoinDemo.fork();
forkJoinDemo2.fork();
Integer join = forkJoinDemo.join(); // 获取执行结果
Integer join1 = forkJoinDemo2.join();
sum = join + join1;
}
return sum;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Integer> future = pool.submit(new ForkJoinDemo(1, 100));
System.out.println("。。。。");
System.out.println("计算的结果:" + future.get());
}
}