zoukankan      html  css  js  c++  java
  • Java并发编程(六)——Callable接口

    Callable接口

    前面我们提到了创建线程的三种方法,继承 Thread 类,实现 Runnable 接口,通过线程池创建。今天学习创建线程的另一种方式,通过 Callable 接口来创建。

    Callable 接口与 Runnable 接口一样,可以给线程提交一个任务让其执行。Callable 接口只有一个 call 方法,且该方法的返回值是一个泛型。

    @FunctionalInterface
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }

    与 Runnable 接口不同的是,通过 Callable 接口提交任务时,我们可以在任务结束后获取任务的返回值。除此之外,Callable 接口还可以将异常抛出,而 Runnable 接口中的任务只能手动捕获。你可以这样使用 Callable 接口

    // 使用 Callable ,可以获取返回值
    Callable<String> callable = () -> {
        log.info("进入 Callable 的 call 方法");
        // 模拟子线程任务,在此睡眠 2s,
        // 小细节:由于 call 方法会抛出 Exception,这里不用像使用 Runnable 的run 方法那样 try/catch 了
        Thread.sleep(5000);
        return "Hello from Callable";
    };

    只是表示可以返回任务的执行结果,但任务还是要线程来执行。

    如何执行一个实现了 Callable 接口的任务呢?我们找遍整个 Thread 类也没有看到 Callable 的字样。其实要执行一个 Callable 接口的任务必须要配合线程池(ExecutorService)来使用。从下图中可以看到,ExecutorService 中的 submit 方法可以接收 Callable 类型的任务,并且返回 Future 类型的对象。接下来,我们就看下什么是 Future。

    Future

    在 JDK1.5 以前,我们异步的执行任务,将要处理的复杂操作交给其他线程处理,提高了程序的运行效率。我们并不关心其它线程的执行情况,也不需要其它线程返回结果。正是由于这种不关心的模式,决定了我们不能在线程中抛出异常。异常一般是由方法的调用者来处理的,试想下,如果我们在新的线程中抛出了异常,那么这个异常由谁来处理呢,又要在什么时候处理呢,毕竟没有任何线程关心着这个执行的任务。

    随着业务越来越复杂,我们不仅需要其他线程来处理任务,有时候还需要获得其它线程处理任务后的返回结果。当然,你可以使用其它方式获得任务的返回结果,比如用一个全局变量来实现线程之间的通信,当需要使用这个全局变量时,就等待这个线程结束然后使用这个全局变量。比如 join(),wait()等,但仍然无法抛出异常,即无法抛出检查异常,每次都需要在线程内部进行捕获。

    为了方便的解决上面的问题,在 JDK1.5后引入了 Callable 接口,并且通过 Future 接口可以获取任务的返回值。

     

    public interface Future<V> {
        // 取消任务
        boolean cancel(boolean mayInterruptIfRunning);
    
        // 获取任务执行结果
        V get() throws InterruptedException, ExecutionException;
    
        // 获取任务执行结果,带有超时时间限制
        V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    
        // 判断任务是否已经取消
        boolean isCancelled();
    
        // 判断任务是否已经结束
        boolean isDone();
    }

    接下来我们使用 Callable 接口来创建一个任务,通过线程池的 submit 来提交任务,配合 Future 接口获取任务的返回值。

    import java.util.concurrent.*;
    
    public class TestFuture {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
    
            ExecutorService executor = Executors.newFixedThreadPool(3);
    
            System.out.println("任务不想做,让其它线程去处理吧" + System.currentTimeMillis());
    
            Callable<String> callable = ()->{
                System.out.println("我正在执行任务....." + System.currentTimeMillis());
                Thread.sleep(4000);
                return "任务完成~~" + System.currentTimeMillis();
            };
    
            Future<String> future = executor.submit(callable);      //提交任务
    
            System.out.println("主线程继续执行" + System.currentTimeMillis());
            Thread.sleep(2000);     //模拟主线程处理其它任务
    
            System.out.println("主线程等待其它线程的返回结果" + System.currentTimeMillis());
            String result = future.get();   //调用get方法如果其它线程还没有返回结果,会阻塞主线程直到其它线程返回结果
    
            System.out.println("返回结果是: " + result);
    
            executor.shutdown();    //关闭线程池
        }
    }

     我们还可以使用 Future 接口中的其他方法,对线程进行控制。

    public class TestFuture {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
    
            ExecutorService executor = Executors.newFixedThreadPool(3);
    
            System.out.println("任务不想做,让其它线程去处理吧" + System.currentTimeMillis());
    
            Future<String> future = executor.submit(()->{    //提交任务
                System.out.println("我正在执行任务....." + System.currentTimeMillis());
                Thread.sleep(20000);
                return "任务完成~~" + System.currentTimeMillis();
            });
    
            System.out.println("主线程继续执行" + System.currentTimeMillis());
    
            int count = 0;
            while( !future.isDone() ){     //判断任务是否完成
                System.out.println("--- 任务还没有完成 ---");
                Thread.sleep(1000);
                count++;
                if(count == 5){
                    future.cancel(true);    //如果任务执行时间过长,我们可以取消任务
                }
            }
    
            String result = future.get();   //调用get方法如果其它线程还没有返回结果,会阻塞主线程直到其它线程返回结果
    
            System.out.println("返回结果是: " + result);
    
            executor.shutdown();    //关闭线程池
        }
    }

     

    这怎么还出现异常了呢?其实在源码的注释中已经说明了这个问题

     调用了 get() 时,如果任务被取消了,就抛出 CancellationException 异常。

    出现了异常自然是要处理的,我们可以使用 Future 中的另一个方法 isCancelled() 来判断任务是否被取消,如果没有被取消,就调用 get() 来取得任务的返回值。

    if( !future.isCancelled() ){
        String result = future.get();   //调用get方法如果其它线程还没有返回结果,会阻塞主线程直到其它线程返回结果
        System.out.println("返回结果是: " + result);
    }else {
        System.out.println("任务已被取消!!");
    }

     Future 中的方法我们基本都用过了,但 Future 只是一个接口,里面的方法必须要由实现类来完成,我们是如何获取任务的返回值呢,又如何来取消任务呢?这就需要直到 Future 的实现类 FutureTask 了。

    FutureTask

     通过类图我们看到,FutureTask 实现了 RunnableFuture 接口,见名知意,RunnableFuture 接口同时继承了 Runnable 接口和 Future 接口。

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        /**
         * Sets this Future to the result of its computation
         * unless it has been cancelled.
         */
        void run();
    }

    说明 RunnableFuture 具有两个接口的特性:

    1、具有 Runnable 的特性,创建类继承 RunnableFuture 接口并重写 run() 后,可以创建线程来执行这个任务。

    public class TestRunnableFuture implements RunnableFuture {     //将RunnableFuture当做Runnbale来使用
    
        @Override
        public void run() {                                 //可以将该类的对象作为Runnable类型的任务提交给一个线程执行
            System.out.println("重写run方法,线程会执行");      // new Thread(testRunnableFuture).start(); 创建线程来执行
        }
    
        //省略了其他未实现的方法
    }

    2、具有 Future 的特性,可以获取任务的返回值(只有使用 Callable 提交的任务才有返回值)。

    那么问题来了,FutureTask 实现了 RunnableFuture 接口,但是却没有实现 Callable 接口,从上面我们知道了,Runnable 接口中的 run() 是没有返回值的,只有 Callable 类型的任务才有返回值,那执行 run() 怎么获取任务的返回值呢?下面是 FutureTask 重写的 run()

    public void run() {
          // 如果状态不是 NEW,说明任务已经执行过或者已经被取消,直接返回
          // 如果状态是 NEW,则尝试把执行线程保存在 runnerOffset(runner字段),如果赋值失败,则直接返回
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
              // 获取构造函数传入的 Callable 值
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                      // 正常调用 Callable 的 call 方法就可以获取到返回值
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                      // 保存 call 方法抛出的异常, CAS操作
                    setException(ex);
                }
                if (ran)
                      // 保存 call 方法的执行结果, CAS操作
                    set(result);
            }
        } finally {        
            runner = null;       
            int s = state;
              // 如果任务被中断,则执行中断处理
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

    我们猜测FutureTask 可以把Runnable 类型的任务转换成 Callable 类型的任务。事实上 Runnable 类型的任务确实可以转换为 Callable 类型的任务,不过不是 FutureTask 自己完成的,而是通过工具类 Executors 来实现的。

    先看下 FutureTask 构造方法

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
    
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    通过构造方法可以看出来,不管是 Runnable 还是 Callable 类型,FutureTask 都会把其封装成 Callable。如果是 Runnable 类型,就调用 Executors.callable(),该方法可以把 Runnable 类型的任务变成 Callable 类型。

    public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        return new Executors.RunnableAdapter<Object>(task, null);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

    callable() 会调用 RunnableAdapter 类的构造方法,将 Runnable 类型的任务和我们显示设定的 result 对象封装成 Callable 对象。因为 Runnable 类型的任务本身是没有返回值的,所以我们必须要显示的设置 Runnbale 的返回值 result 对象,如果不想要,就设置为 null。

    public class TestFutureTask {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            FutureTask<String> futureTask = new FutureTask<>(()->{
                System.out.println("正在执行任务");
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "我是返回值");        //这是一个Runnable类型的任务
    
            //既可以通过线程池来执行FutureTask提交的任务
            ExecutorService exector = Executors.newFixedThreadPool(3);
            exector.submit(futureTask);
    
            //也可以通过新建一个线程来执行FutureTask提交的任务
            new Thread(futureTask).start();
    
            while( !futureTask.isDone() ){
               System.out.println("--- 任务还没有执行完 ---");
               Thread.sleep(1000);
            }
    
            String result = futureTask.get();
    
            System.out.println(result);
    
            exector.shutdown();
        }
    }

     从这里我们可以看出来,FutureTask 这个类只关注它自身,它内部解决了返回值和异常问题,你只需要用线程来执行这个类提交的任务,就可以轻松的获取你需要的信息。

    通过上面的学习,我们知道大概有两种方法使用 Future 获得返回值:

    1、Future + ExecutorService。

    2、FutureTask + ExecutorService / Thread

    submit

    在线程池那节我们就说过 ExecutorService 扩展了 Executor 的功能,提供了 submit() 来获取线程的返回值。在上面我们知道了 Future + ExecutorService 可以获取线程的返回值,而这种方式就是通过 submit() 返回的 Future 对象来获取的返回值的。那么 submit() 是如何获取返回值的呢?

    ExecutorService 的 submit() 是在抽象类 AbstractExecutorService 中实现的。

    submit() 既可以接收 Runnable 类型的参数,也可以接收 Callable 类型的参数。下面是 AbstractExecutorService 中的部分源码。

    public abstract class AbstractExecutorService implements ExecutorService {
    
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
    
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
    
        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
    }

    从上面的源码中我们就可以看出来,不管是 Runnable 还是 Callable 类型的任务,submit() 都会将其封装到一个 FutureTask 对象中,因为 FutureTask  间接实现了 Runnable 接口,所以在 submit() 中可以直接使用 execute() 来执行这个任务并且通过返回 FutureTask  对象来获取任务的返回值以及其他信息。

    总结

    本文主要讲了 Callable、Future 和 FutrureTask,然后还涉及 Runnable 和 submit() 的一些知识点。

    Callable 和 Runnable 的区别

    对比RunnableCallable
    方法返回值 run()没有返回值 call()有返回值
    异常 异常必须手动捕获,不能抛出 可以抛出异常
    在Thread中使用 new Thread(Runnable).start() new Thread(FutureTask(Callable)).start()
    在ExecutorService中使用 ExecutorService.submit(Runnable) ExecutorService.submit(Callable)

    Callable的异常抛出后,主线程调用 Future.get() 时会捕获任务中的异常。在 Runnable 中,主线程无法捕获任务中的异常。

    获取任务返回值的方式

    获取任务返回值有两种方式:Callbable 任务的 call() 可以返回任务的结果,我们只需要调用 submit(Callable) 就可以将 返回值封装到

    1、Future + ExecutorService

    Future future = ExecutorService.submit( Callable / Runnable )   //submit()可以执行两种任务

    不管是 Callable 还是 Runnable,submit() 会把其封装成 FutureTask 对象,然后使用 execute() 执行任务,并把执行结果封装到 FutureTask 对象中。最后返回 FutureTask 对象,我们就可以通过这个对象来异步的获取返回值或者其他信息。

    2、FutureTask + ExecutorService / Thread

    FutureTask futureTask = new FutureTask( Callable / Runnable, result);   //提交Runnable任务需要显示指定返回值,可以为null
    
    ExecutorService.submit(futureTask);     //提交任务到线程池,不需要使用返回的Future对象
    
    new Thread(futureTask).start();     //也可以直接新建一个线程来执行这个任务,因为FutureTask间接实现了Runnable
    
    futureTask.get();   //直接使用FutureTask对象就可以获取返回值,因为futureTask间接实现了Future

    FutureTask执行过程

    1、FutureTask 类中有一个 Callable 类型的成员变量 callable,通过构造方法创建一个对象,如果是 Callable 类型的任务,就直接将该 Callable 任务的实例赋值给成员变量 callable。

    2、如果是 Runnable 类型的任务,在构造方法中需要显示的设置返回值 result 对象,通过构造方法调用 Executors.callable(), 而 callable() 中是通过 RunnableAdapter  类的构造方法将 Runnable 任务和 result 返回值封装成 Callable 对象。

    3、因为 FutureTask 间接实现了 Runnable 接口,所以我们既可以在线程池中提交 FutureTask 中的任务,也可以创建一个新的线程来执行这个任务。

    4、FutureTask  对象可以直接获取任务的返回值,所以在线程池中使用时,只是将 FutureTask 当做 Runnable 类型的任务在使用,不需要再通过 submit() 返回的 FutureTask 对象来获取返回值。比如下面的例子

    public class TestFutureTask {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            ExecutorService exector = Executors.newFixedThreadPool(3);  //通过线程池来使用FutureTask
    
    
            FutureTask<String> futureTask = new FutureTask<>(()->{
                System.out.println("正在执行任务");
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "我是返回值");        //这是一个Runnable类型的任务
    
            //只是将futureTask当做 Runnable 类型的任务
            Future<String> future1 = exector.submit(futureTask, "我不是任务的返回值");
    
            while( !future1.isDone() ){
               System.out.println("--- 任务还没有执行完 ---");
               Thread.sleep(1000);
            }
    
            String result = futureTask.get();
    
            String result1 = future1.get();
    
            System.out.println("futureTask.get() = " + result);
            System.out.println("future1.get(); " + result1);
    
            exector.shutdown();
        }
    }

     从这里我们就可以看到之前所说的 FutureTak 这个类只关注它本身。因为它已经将返回值进行了封装,线程只需要执行任务就行了。

    参考资料

    Java Callable接口

    Callable接口详解

    Java Future

    不会用Java Future,我怀疑你泡茶没我快, 又是超长图文!!

    【Java并发编程】Callable、Future和FutureTask的实现

  • 相关阅读:
    POJ
    POJ-2253 Frogger(最短路)
    背包问题(转自背包九讲+对应题目)
    POJ-1860 Currency Exchange (最短路)
    Bellman-Ford 最短路径算法
    POJ-3295 Tautology (构造)
    POJ-2586 Y2K Accounting Bug 贪心
    POJ 2965 The Pilots Brothers' refrigerator (暴力枚举)
    python(pymysql操作数据库)
    python复习概念__oop中
  • 原文地址:https://www.cnblogs.com/Zz-feng/p/13367612.html
Copyright © 2011-2022 走看看