多线程的实现方式:
1、继承Thread类
2、实现Runnable接口
3、实现Callable<V> 接口
通过实现Callable接口来创建多线程,在执行完成后可获取执行结果。
Callable接口:
对于需要执行的任务需要实现Callable接口,Callable接口定义如下:
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
Callable是个泛型接口,泛型V就是要call()方法返回的类型。
Future接口:
Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。Future接口的定义如下:
public interface Future<V> {
/**
* 尝试取消任务的执行
* 如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。
* 如果任务还没有被执行,则会返回true并且异步任务不会被执行。
* 如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程
* @param mayInterruptIfRunning
* @return
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false
* @return
*/
boolean isCancelled();
/**
* 判断任务是否已经完成,如果完成则返回true,否则返回false。
* 需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true
* @return
*/
boolean isDone();
/**
* 获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。
* 如果任务被取消则会抛出CancellationException异常,如果任务执行过程发生异常则会抛出ExecutionException异常,如果阻塞等待过程中被中断则会抛出InterruptedException异常。
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
V get() throws InterruptedException, ExecutionException;
/**
* 带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常
* @param timeout
* @param unit
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
使用Callable和Future获取线程执行结果的示例:
// 创建一个单线程的线程池,仅测试用,因其使用无界队列,存在OOM的风险,实际开发使用ThreadPoolExecutor ExecutorService testExecutor = Executors.newSingleThreadExecutor(); Future<String> stringFuture = testExecutor.submit(new Callable<String>() { @Override public String call() throws Exception { return "success"; } }); String result = stringFuture.get(); System.out.println(result); // success
线程池submit源码:
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } // newTaskFor protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
Step 1 : 当通过 submit 调用的时候 , 底层会调用 :
return new FutureTask<T>(runnable, value);
Step 2 : 在外层会被提升为父类 RunnableFuture , 在返回的时候又会被提成 Future
RunnableFuture<T> ftask = newTaskFor(task, result);
总结 : 所以 , 底层的实现类主要是 FutureTask , 而FutureTask 是 Runnable 的实现类
FutureTask:
FutureTask实现了RunnableFuture接口,则RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask既能当做一个Runnable直接被Thread执行,也能作为Future用来得到Callable的计算结果。
FutureTask一般配合ExecutorService来使用,也可以直接通过Thread来使用,示例:
// 创建一个单线程的线程池,仅测试用,因其使用无界队列,存在OOM的风险,实际开发使用ThreadPoolExecutor ExecutorService testExecutor = Executors.newSingleThreadExecutor(); FutureTask<String> futureTask = new FutureTask<>(new Callable<String>() { @Override public String call() throws Exception { return "success"; } }); // testExecutor.submit(futureTask); new Thread(futureTask).start(); String result = futureTask.get(); System.out.println(result); // success
FutureTask 源码解读:
1)构造方法:
private Callable<V> callable; public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
private volatile int state; // volatile修饰,保证内存可见性,避免任务被执行多次
state字段用来保存FutureTask内部的任务执行状态,一共有7中状态,初始状态是NEW,每种状态及其对应的值如下:
private static final int NEW = 0; // 新建 private static final int COMPLETING = 1; // 完成 private static final int NORMAL = 2; // 正常 private static final int EXCEPTIONAL = 3; // 异常 private static final int CANCELLED = 4; // 取消 private static final int INTERRUPTING = 5; // 中断(中) private static final int INTERRUPTED = 6; // 打断
private Object outcome; // 保存任务执行结果(如果异常将保存异常Throwable)
run()方法:
public void run() { // 1、如果状态不是NEW,说明任务已经执行过,或者被取消,直接返回 // 2、如果状态为NEW则接着会通过unsafe类把任务执行线程引用CAS的保存在runner字段中,如果保存失败,则直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 执行任务 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 任务异常,将异常Throwable赋给outcome setException(ex); } if (ran) // 任务正常执行完毕,将执行结果赋给outcome set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 如果任务被中断,执行中断处理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
get()方法:
public V get() throws InterruptedException, ExecutionException { // 获取状态 int s = state; // 如果任务还未执行完毕 if (s <= COMPLETING) // 阻塞等待任务执行完毕 s = awaitDone(false, 0L); // 返回任务执行结果 return report(s); }
END.