Callable接口
前面我们提到了创建线程的三种方法,继承 Thread 类,实现 Runnable 接口,通过线程池创建。今天学习创建线程的另一种方式,通过 Callable 接口来创建。
Callable 接口与 Runnable 接口一样,可以给线程提交一个任务让其执行。Callable 接口只有一个 call 方法,且该方法的返回值是一个泛型。
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
与 Runnable 接口不同的是,通过 Callable 接口提交任务时,我们可以在任务结束后获取任务的返回值。除此之外,Callable 接口还可以将异常抛出,而 Runnable 接口中的任务只能手动捕获。你可以这样使用 Callable 接口
// 使用 Callable ,可以获取返回值 Callable<String> callable = () -> { log.info("进入 Callable 的 call 方法"); // 模拟子线程任务,在此睡眠 2s, // 小细节:由于 call 方法会抛出 Exception,这里不用像使用 Runnable 的run 方法那样 try/catch 了 Thread.sleep(5000); return "Hello from Callable"; };
只是表示可以返回任务的执行结果,但任务还是要线程来执行。
如何执行一个实现了 Callable 接口的任务呢?我们找遍整个 Thread 类也没有看到 Callable 的字样。其实要执行一个 Callable 接口的任务必须要配合线程池(ExecutorService)来使用。从下图中可以看到,ExecutorService 中的 submit 方法可以接收 Callable 类型的任务,并且返回 Future 类型的对象。接下来,我们就看下什么是 Future。
Future
在 JDK1.5 以前,我们异步的执行任务,将要处理的复杂操作交给其他线程处理,提高了程序的运行效率。我们并不关心其它线程的执行情况,也不需要其它线程返回结果。正是由于这种不关心的模式,决定了我们不能在线程中抛出异常。异常一般是由方法的调用者来处理的,试想下,如果我们在新的线程中抛出了异常,那么这个异常由谁来处理呢,又要在什么时候处理呢,毕竟没有任何线程关心着这个执行的任务。
随着业务越来越复杂,我们不仅需要其他线程来处理任务,有时候还需要获得其它线程处理任务后的返回结果。当然,你可以使用其它方式获得任务的返回结果,比如用一个全局变量来实现线程之间的通信,当需要使用这个全局变量时,就等待这个线程结束然后使用这个全局变量。比如 join(),wait()等,但仍然无法抛出异常,即无法抛出检查异常,每次都需要在线程内部进行捕获。
为了方便的解决上面的问题,在 JDK1.5后引入了 Callable 接口,并且通过 Future 接口可以获取任务的返回值。
public interface Future<V> { // 取消任务 boolean cancel(boolean mayInterruptIfRunning); // 获取任务执行结果 V get() throws InterruptedException, ExecutionException; // 获取任务执行结果,带有超时时间限制 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; // 判断任务是否已经取消 boolean isCancelled(); // 判断任务是否已经结束 boolean isDone(); }
接下来我们使用 Callable 接口来创建一个任务,通过线程池的 submit 来提交任务,配合 Future 接口获取任务的返回值。
import java.util.concurrent.*; public class TestFuture { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(3); System.out.println("任务不想做,让其它线程去处理吧" + System.currentTimeMillis()); Callable<String> callable = ()->{ System.out.println("我正在执行任务....." + System.currentTimeMillis()); Thread.sleep(4000); return "任务完成~~" + System.currentTimeMillis(); }; Future<String> future = executor.submit(callable); //提交任务 System.out.println("主线程继续执行" + System.currentTimeMillis()); Thread.sleep(2000); //模拟主线程处理其它任务 System.out.println("主线程等待其它线程的返回结果" + System.currentTimeMillis()); String result = future.get(); //调用get方法如果其它线程还没有返回结果,会阻塞主线程直到其它线程返回结果 System.out.println("返回结果是: " + result); executor.shutdown(); //关闭线程池 } }
我们还可以使用 Future 接口中的其他方法,对线程进行控制。
public class TestFuture { public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(3); System.out.println("任务不想做,让其它线程去处理吧" + System.currentTimeMillis()); Future<String> future = executor.submit(()->{ //提交任务 System.out.println("我正在执行任务....." + System.currentTimeMillis()); Thread.sleep(20000); return "任务完成~~" + System.currentTimeMillis(); }); System.out.println("主线程继续执行" + System.currentTimeMillis()); int count = 0; while( !future.isDone() ){ //判断任务是否完成 System.out.println("--- 任务还没有完成 ---"); Thread.sleep(1000); count++; if(count == 5){ future.cancel(true); //如果任务执行时间过长,我们可以取消任务 } } String result = future.get(); //调用get方法如果其它线程还没有返回结果,会阻塞主线程直到其它线程返回结果 System.out.println("返回结果是: " + result); executor.shutdown(); //关闭线程池 } }
这怎么还出现异常了呢?其实在源码的注释中已经说明了这个问题
调用了 get() 时,如果任务被取消了,就抛出 CancellationException 异常。
出现了异常自然是要处理的,我们可以使用 Future 中的另一个方法 isCancelled() 来判断任务是否被取消,如果没有被取消,就调用 get() 来取得任务的返回值。
if( !future.isCancelled() ){ String result = future.get(); //调用get方法如果其它线程还没有返回结果,会阻塞主线程直到其它线程返回结果 System.out.println("返回结果是: " + result); }else { System.out.println("任务已被取消!!"); }
Future 中的方法我们基本都用过了,但 Future 只是一个接口,里面的方法必须要由实现类来完成,我们是如何获取任务的返回值呢,又如何来取消任务呢?这就需要直到 Future 的实现类 FutureTask 了。
FutureTask
通过类图我们看到,FutureTask 实现了 RunnableFuture 接口,见名知意,RunnableFuture 接口同时继承了 Runnable 接口和 Future 接口。
public interface RunnableFuture<V> extends Runnable, Future<V> { /** * Sets this Future to the result of its computation * unless it has been cancelled. */ void run(); }
说明 RunnableFuture 具有两个接口的特性:
1、具有 Runnable 的特性,创建类继承 RunnableFuture 接口并重写 run() 后,可以创建线程来执行这个任务。
public class TestRunnableFuture implements RunnableFuture { //将RunnableFuture当做Runnbale来使用 @Override public void run() { //可以将该类的对象作为Runnable类型的任务提交给一个线程执行 System.out.println("重写run方法,线程会执行"); // new Thread(testRunnableFuture).start(); 创建线程来执行 } //省略了其他未实现的方法 }
2、具有 Future 的特性,可以获取任务的返回值(只有使用 Callable 提交的任务才有返回值)。
那么问题来了,FutureTask 实现了 RunnableFuture 接口,但是却没有实现 Callable 接口,从上面我们知道了,Runnable 接口中的 run() 是没有返回值的,只有 Callable 类型的任务才有返回值,那执行 run() 怎么获取任务的返回值呢?下面是 FutureTask 重写的 run()
public void run() { // 如果状态不是 NEW,说明任务已经执行过或者已经被取消,直接返回 // 如果状态是 NEW,则尝试把执行线程保存在 runnerOffset(runner字段),如果赋值失败,则直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 获取构造函数传入的 Callable 值 Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 正常调用 Callable 的 call 方法就可以获取到返回值 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 保存 call 方法抛出的异常, CAS操作 setException(ex); } if (ran) // 保存 call 方法的执行结果, CAS操作 set(result); } } finally { runner = null; int s = state; // 如果任务被中断,则执行中断处理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
我们猜测FutureTask 可以把Runnable 类型的任务转换成 Callable 类型的任务。事实上 Runnable 类型的任务确实可以转换为 Callable 类型的任务,不过不是 FutureTask 自己完成的,而是通过工具类 Executors 来实现的。
先看下 FutureTask 构造方法
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
通过构造方法可以看出来,不管是 Runnable 还是 Callable 类型,FutureTask 都会把其封装成 Callable。如果是 Runnable 类型,就调用 Executors.callable(),该方法可以把 Runnable 类型的任务变成 Callable 类型。
public static Callable<Object> callable(Runnable task) { if (task == null) throw new NullPointerException(); return new Executors.RunnableAdapter<Object>(task, null); }
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
callable() 会调用 RunnableAdapter 类的构造方法,将 Runnable 类型的任务和我们显示设定的 result 对象封装成 Callable 对象。因为 Runnable 类型的任务本身是没有返回值的,所以我们必须要显示的设置 Runnbale 的返回值 result 对象,如果不想要,就设置为 null。
public class TestFutureTask { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask = new FutureTask<>(()->{ System.out.println("正在执行任务"); System.out.println(Thread.currentThread().getName()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }, "我是返回值"); //这是一个Runnable类型的任务 //既可以通过线程池来执行FutureTask提交的任务 ExecutorService exector = Executors.newFixedThreadPool(3); exector.submit(futureTask); //也可以通过新建一个线程来执行FutureTask提交的任务 new Thread(futureTask).start(); while( !futureTask.isDone() ){ System.out.println("--- 任务还没有执行完 ---"); Thread.sleep(1000); } String result = futureTask.get(); System.out.println(result); exector.shutdown(); } }
从这里我们可以看出来,FutureTask 这个类只关注它自身,它内部解决了返回值和异常问题,你只需要用线程来执行这个类提交的任务,就可以轻松的获取你需要的信息。
通过上面的学习,我们知道大概有两种方法使用 Future 获得返回值:
1、Future + ExecutorService。
2、FutureTask + ExecutorService / Thread
submit
在线程池那节我们就说过 ExecutorService 扩展了 Executor 的功能,提供了 submit() 来获取线程的返回值。在上面我们知道了 Future + ExecutorService 可以获取线程的返回值,而这种方式就是通过 submit() 返回的 Future 对象来获取的返回值的。那么 submit() 是如何获取返回值的呢?
ExecutorService 的 submit() 是在抽象类 AbstractExecutorService 中实现的。
submit() 既可以接收 Runnable 类型的参数,也可以接收 Callable 类型的参数。下面是 AbstractExecutorService 中的部分源码。
public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); } public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } }
从上面的源码中我们就可以看出来,不管是 Runnable 还是 Callable 类型的任务,submit() 都会将其封装到一个 FutureTask 对象中,因为 FutureTask 间接实现了 Runnable 接口,所以在 submit() 中可以直接使用 execute() 来执行这个任务并且通过返回 FutureTask 对象来获取任务的返回值以及其他信息。
总结
本文主要讲了 Callable、Future 和 FutrureTask,然后还涉及 Runnable 和 submit() 的一些知识点。
Callable 和 Runnable 的区别
对比 | Runnable | Callable |
---|---|---|
方法返回值 | run()没有返回值 | call()有返回值 |
异常 | 异常必须手动捕获,不能抛出 | 可以抛出异常 |
在Thread中使用 | new Thread(Runnable).start() | new Thread(FutureTask(Callable)).start() |
在ExecutorService中使用 | ExecutorService.submit(Runnable) | ExecutorService.submit(Callable) |
Callable的异常抛出后,主线程调用 Future.get() 时会捕获任务中的异常。在 Runnable 中,主线程无法捕获任务中的异常。
获取任务返回值的方式
获取任务返回值有两种方式:Callbable 任务的 call() 可以返回任务的结果,我们只需要调用 submit(Callable) 就可以将 返回值封装到
1、Future + ExecutorService
Future future = ExecutorService.submit( Callable / Runnable ) //submit()可以执行两种任务
不管是 Callable 还是 Runnable,submit() 会把其封装成 FutureTask 对象,然后使用 execute() 执行任务,并把执行结果封装到 FutureTask 对象中。最后返回 FutureTask 对象,我们就可以通过这个对象来异步的获取返回值或者其他信息。
2、FutureTask + ExecutorService / Thread
FutureTask futureTask = new FutureTask( Callable / Runnable, result); //提交Runnable任务需要显示指定返回值,可以为null ExecutorService.submit(futureTask); //提交任务到线程池,不需要使用返回的Future对象 new Thread(futureTask).start(); //也可以直接新建一个线程来执行这个任务,因为FutureTask间接实现了Runnable futureTask.get(); //直接使用FutureTask对象就可以获取返回值,因为futureTask间接实现了Future
FutureTask执行过程
1、FutureTask 类中有一个 Callable 类型的成员变量 callable,通过构造方法创建一个对象,如果是 Callable 类型的任务,就直接将该 Callable 任务的实例赋值给成员变量 callable。
2、如果是 Runnable 类型的任务,在构造方法中需要显示的设置返回值 result 对象,通过构造方法调用 Executors.callable(), 而 callable() 中是通过 RunnableAdapter 类的构造方法将 Runnable 任务和 result 返回值封装成 Callable 对象。
3、因为 FutureTask 间接实现了 Runnable 接口,所以我们既可以在线程池中提交 FutureTask 中的任务,也可以创建一个新的线程来执行这个任务。
4、FutureTask 对象可以直接获取任务的返回值,所以在线程池中使用时,只是将 FutureTask 当做 Runnable 类型的任务在使用,不需要再通过 submit() 返回的 FutureTask 对象来获取返回值。比如下面的例子
public class TestFutureTask { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService exector = Executors.newFixedThreadPool(3); //通过线程池来使用FutureTask FutureTask<String> futureTask = new FutureTask<>(()->{ System.out.println("正在执行任务"); System.out.println(Thread.currentThread().getName()); try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }, "我是返回值"); //这是一个Runnable类型的任务 //只是将futureTask当做 Runnable 类型的任务 Future<String> future1 = exector.submit(futureTask, "我不是任务的返回值"); while( !future1.isDone() ){ System.out.println("--- 任务还没有执行完 ---"); Thread.sleep(1000); } String result = futureTask.get(); String result1 = future1.get(); System.out.println("futureTask.get() = " + result); System.out.println("future1.get(); " + result1); exector.shutdown(); } }
从这里我们就可以看到之前所说的 FutureTak 这个类只关注它本身。因为它已经将返回值进行了封装,线程只需要执行任务就行了。