zoukankan      html  css  js  c++  java
  • 异步任务执行器Executor简介

    以前线程Thread既表示执行的任务,又表示执行的机制。在JDK1.5中,java并发框架提供了一种“执行服务”的相关API,它将"任务的执行"和"任务的提交“相分离,”执行服务“封装了任务执行的细节,对于任务提交者来说,它可进一步聚焦于任务本身,如任务提交、获取任务执行后的结果、取消任务而不需要关注任务执行的细节,如线程的创建、任务的调试、线程的复用或关闭等。

    1.基本接口

    任务执行服务主要涉及4个接口

    • Runnable和Callable: 表示要执行的异步任务

    • Executor和ExecutorService:表示执行任务的服务

    • Future : 表示任务执行后的结果

    Runnable是我们最常用的异步任务接口,这个接口的方法没有返回值,不能抛出异常。而Callable接口就是为了解决Runnable的不足而在JDK1.5引入的接口,此接口的方法有返回值,且可抛出异常。

    @FunctionalInterface
    public interface Callable<V> {
        V call() throws Exception;
    }

    1)Executor接口

    Executor接口也只有一个方法,这个方法接受一个Runnable类型的参数,这是个抽象方法,它无法指定任务该如何执行。它可能是新建一个线程执行任务,也可能是利用线程池中的一个线程,还可能是在调用者线程中执行。

    public interface Executor {
        void execute(Runnable command);
    }

    ExecutorService扩展了Executor接口,它添加了一些新功能,如支持有返回结果的任务、支持超时任务、支持取消任务、支持批量提交任务。这里的submit方法,返回类型是Future,返回后,只表示任务已提交,不代表已经执行,具体执行与否要看”执行服务“如何调度,通过Future可以查询异步任务的状态、获取最终的结果、取消任务等。

    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)  throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }

    2)Future接口

    Future接口是对任务的结果做了进一步封装,字面上”future"是未来的意思,这里确实是表示“未来(或最终)的结果”,“结果”需要等待。

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException;
    }

    cancel()方法用于取消异步任务,若任务已完成、已取消或因其他原因不能取消等原因而导致任务取消失败就会返回false,反之返回true. 如果任务还未开始,则不再运行,但若任务已经在运行,则不一定能取消这个任务。参数mayInterruptIfRunning表示,如果任务正在执行,是否调用Thread.interrupt()方法中断线程,而interrupt()方法只是设置线程中断标志,它不一定能真的中断线程。

    isCancelled() 和isDone()分别返回任务是否被取消、任务是否已完成的布尔值。只要cancel方法返回true,那么即使执行任务的线程还未结束,isCancelled方法也一定会返回true。不管什么原因,无论是任务正常结束、任务抛出异常或任务被取消,只要任务结束了,isDone都会返回true.

    get()方法用于返回异步任务的结果,若任务未完成,当前线程则会阻塞待。get(long,TimeUnit)方法需要设定等待时长,若在给定的时间内还未完成任务,则会抛出TimeoutException异常。

    get方法的最终结果大致有3种: ①任务正常完成,get方法返回任务的执行结果,若任务是Runnable且入参未提供结果,最终返回null ②任务被取消了,get方法会抛出CancellationException. ③任务执行过程中抛出了异常,get方法会将异常包装为ExecutionException重新抛出,此异常的getCause方法可获得原始异常。

    Future是实现”任务的提交“与”任务的执行“相分离的关键,它是两者的桥梁,它使任务的提交者和任务的执行器的关注点相隔离,同时又让两者彼此联系。

    2.用法示例

    下面的例子中,使用异步任务,计算0-300的累加结果,在计算出结果前Future的get()方法将阻塞等待。

    这里使用了Executors工具类的newSingleThreadeExecutor方法创建一了个执行服务。Executors有很多静态方法,分别创建各种线程池执行服务。

    class SimpleTaskTest {
        static class Accumlation implements Callable<Integer> {
            @Override
            public Integer call() throws Exception {
                int sum =0;
                for (int i = 0; i < 300; i++) {
                    sum += i;
                    Thread.sleep(10);
                }
                return sum;
            }
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executor = Executors.newSingleThreadExecutor();
            Future<Integer> future = executor.submit(new Accumlation());
            long start = System.currentTimeMillis();
            int result = future.get();
            System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒才返回,计算结果:" + result);
        }
    }

    打印的结果

    Future.get方法等待了3147毫秒才返回,计算结果:44850

    3.基本原理

    1)ExecutorService实现

    ExecutorService是Executor的子接口,它添加了一些新功能,如支持有返回结果的任务、支持超时任务、支持取消任务、支持批量提交任务。

    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)  throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }

    ①ExecutorService有两个关闭执行服务的方法,分别是shutdown()和shutdownNow. 两者关闭执行服务的方式有所差别,shutdown()方法调用后不会接受新的任务,但已提交的任务将继续执行(即使任务还未真正开始执行);而shutdownNow()方法不仅不会接受新任务,而且还会终止已经提交但未执行的任务,对于正在执行的任务,一般调用Thread.interrupt()方法设置中断标志,不过线程可能不响应中断,shutdownNow会返回已提交但未执行的任务列表。shutdown和shutdownNow不会阻塞等待,它们返回后不代表所有任务都结束。

    ②isShutdown返回执行服务是否被关闭的布尔值(不会等待),只要shutdown或shutdownNow任意一方法被调用后,isShutdwon都将返回true.

    ③awaitTermination方法用于等待执行服务中的所有任务完成,此方法需要设置超时时间,如果在限时间内所有任务都结束了(允许非正常结束),

    ④isTerminated 返回在执行服务关闭后所有任务是否已完成的布尔值,如果在此之前shutdownNow或shutdown没有被调用,这里永不可能返回true.

    ⑤三个submit方法都用于提交单任务,submit(Callable<T> )方法中入参Callable本身有返回结果 ;submit(Runnable,T)方法在设定任务的同时可以提供一个结果,在任务结束时将返回这个结果;submit(Runnable )方法入参没有提供结果,最终返回的结果是null 。

    ⑥ExecutorService有两类批量提交任务的方法,invokeAll和invokeAny,它们都有两个版本,一个不限时版本、一个超时版本。

    invokeAll等待(给定的)所有任务完成,返回的Future集合中,每个Future的isDone方法都返回true,但isDone是true并不代表任务完成了,也有可能是因任务被取消而导致任务非正常结束。invokeAll的超时版本方法,需要指定等待时间的时间,若超时后还有任务还未完成,这些任务就会被取消。而invokeAny,只要有一个任务正常完成(没抛出异常)后,它就返回此任务的结果;在正常返回或异常抛出返回后,其他任务则会被取消。对于invokeAny的超时版本,如果在限时内有一任务正常(没抛出异常)完成,就返回此任务的结果 ,其他将任务会被取消;如果没有任务能在限时内成功完成返回,就抛出TimeoutException; 没有任务正常成功返回(可能是因发生某种异常而返回),将抛出ExecutionException.

    在了解ExecutorService接口的相关抽象方法定义后,我们来进一步分析它的实现类和实现原理。

    ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,ExecutorService有一个很重要的抽象类AbstractExecutorService, 而且ThreadPoolExecutor就是直接继承于AbstractExecutorService。

     

    我们可以基于此抽象类实现一个简易的ExecutorService。AbstractExecutorService提供了submit 、invokeAll和invokeAny的默认实现,子类只需要实现其他方法就行了。shutdown与isShutdown 等方法与生命周期管理有关,我暂时可以不用去管它,其实它的子类最关键在于实现execute方法,因为submit、invokeAll、invokeAny等方法底层主要还是调用execute方法。

    import java.util.List;
    import java.util.concurrent.*;
    
    public class CustomizeExecutorService extends AbstractExecutorService {
        @Override
        public void shutdown() {
            System.out.println("=====shutdown=====");
        }
        @Override
        public List<Runnable> shutdownNow() {
            System.out.println("=====shutdownNow=====");
            return null;
        }
        @Override
        public boolean isShutdown() {
            return false;
        }
        @Override
        public boolean isTerminated() {
            return false;
        }
        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return false;
        }
        @Override
        public void execute(Runnable command) {
            new Thread(command).start();
        }
    }
    class Accumlation implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            int sum = 0;
            for (int i = 0; i < 300; i++) {
                sum += i;
                Thread.sleep(10);
            }
            return sum;
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executor = new CustomizeExecutorService();
            Future<Integer> future = executor.submit(new Accumlation());
            long start = System.currentTimeMillis();
            int result = future.get();
            System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒,计算结果:" + result);
        }
    }
    CustomizeExecutorService

    打印结果

    Future.get方法等待了3142毫秒,计算结果:44850

    AbstractExecutorService的基本方法是submit,其他方法的实现可以此为参照。可以看下其submit系列方法的实现是怎样的。

    public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    不论入参是Runnable还是Callable,最终都会将其封装成RunnableFuture,而RunnableFuture又是Runnable的子接口,而后去调用execute(Runnable),最后返回RunnableFuture 。这里有点奇怪的地方在于对入参的封装,其实质相当于将Runnable或Callable任务封装成Runnable结果。这里也再次体现出了RunnableFuture的作用,它是连接“任务的提交”和“任务的执行”的桥梁。

    上面的submit方法体中调用newTaskFor,它实际上返回一个FutureTask类型的实例对象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    2)Future实现

    上面的提到的RunnableFuture同时继承了Runnable和Future接口,RunnableFuture本身没有增加任何抽象方法.

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();//此方法是父接口Runnable的抽象方法
    }

    而FutureTask又是RunnableFutrue接口的主要实现类,我们可以来看看其的实现细节

    它有一个成员变量state表示状态

    private volatile int state;

    它有这些可能取值

    private static final int NEW          = 0;//刚开始的状态或任务在运行中
    private static final int COMPLETING   = 1;//临时状态,任务即将结束,正在设置结果
    private static final int NORMAL       = 2;//任务正常完成
    private static final int EXCEPTIONAL  = 3;//因抛出异常而结束任务
    private static final int CANCELLED    = 4;//任务被取消
    private static final int INTERRUPTING = 5;//任务正在被中断
    private static final int INTERRUPTED  = 6;//任务被中断(中断的最终状态)

    其他成员变量

        private Callable<V> callable;
        private Object outcome; // non-volatile, protected by state reads/writes
        private volatile Thread runner;
        private volatile WaitNode waiters;

    成员变量callable表示要执行的任务。

    成员变量outcome表示任务的结果或任务非正常结束的异常

    成员变量runner表示执行此任务的线程

    成员变量waiter表示等待任务执行结果的等待栈表(数据结构是单向链表,先进后出)。WaitNode是一个简单的静态内部,一个成员变量thread表示等待结果的线程,另一个成员变量next表示下一个等待节点(线程)。

        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }

    FutureTask的构造方法会初始化callable和state。FutureTask的构造方法可以接受Runnable类型参数,它会调用Executors.callable将Runnable转换为Callable类型实例,以便于统一处理。

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    任务执行服务会使用一个线程执行FutureTask的run方法,run方法的实现

    public void run() {
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                //将当前线程设置为执行任务的线程
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();//执行任务
                        ran = true;
                    } catch (Throwable ex) {
                        //运行时有异常,设置异常
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);//设置结果
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null; //state已是最终状态,不再变化,将runer设为null,防止run方法被并发调用
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state; //清空运行线程runner后再重新获取state,防止遗漏掉对中断的处理
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

    其主要逻辑是:

    ①检查状态,设置运行任务的线程

    ②调用callable的call方法去执行任务,并捕获运行中可能出现的异常

    ③如果任务正常完成,调用set设置任务的结果,将state设为NORMAL, 将结果保存到outcome ,唤醒所有等待结果的线程

    ④若执行任务过程中发生了异常,调用setException设置异常,将state设为EXCEPTIONAL ,将此异常也保存到outcome ,唤醒所有等待结果的线程

    ⑤最后将运行线程runner清空,若状态可能是任务被取消的中断还要处理此中断。

    对于任务提交者,可通过get方法获取任务的最终结果,它的超时版本是

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING && //还未完成
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //等待完成
            //到了限定时间,任务仍未完成,抛出超时异常TimeoutException
            throw new TimeoutException();
        return report(s);//报告结果
    }

    其主要逻辑是:若任务未完成就等待任务完成,最后调用report报告结果,report会根据状态返回结果或抛出异常。

    而report方法的基本逻辑也很简单:若是任务正常结束就返回这个任务的结果,若是任务被取消,就抛出任务取消异常CancellationException,若是在执行任务过程中发生了异常就统一将其封装成ExecutionException并抛出。

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    说到任务被取消,我们可以看看cancel(boolean)方法如何实现的

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            //①不是NEW状态,表示任务至少是COMPLETING(即将结束)状态,返回false
            //②CAS更新state为INTERRUPTING或CANCELLED失败,返回false
            //只有state状态更新成功,才能取消任务(防止被并发调用)
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//允许中断就设置中断标志
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();//设置中断标志
                } finally { // final state 设置中断的最终状态
                    //INTERRUPTING -> INTERRUPTED ,将state由“正在中断”更新为”已经中断“
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //从等待栈中唤醒并移除所有的线程(节点)
            finishCompletion();
        }
        return true;
    }

    其基本逻辑:

    ①任务已结束或被取消,返回false

    ②若mayInterruptIfRunning为true,调用interrupt设置中断标志,将state设置为INTERRUPTED,若mayInterruptIfRunning为false,将state设为CANCELLED.

    ③调用finishCompletion唤醒并移除等待栈中的所有线程

  • 相关阅读:
    第二十课字符串
    数学归纳法:搞定循环与递归的钥匙
    11预处理命令下
    Xshell6无法连接上虚拟机的解决方法
    redis数据库常用命令
    redis使用get命令,中文乱码问题
    Ubuntu下redis的安装和简单操作
    启动hbase后,使用指令进入shell命令行模式时报错"wrong number of arguments (0 for 1)"
    启动hbase报错:“SLF4J: Class path contains multiple SLF4J bindings.”解决方法
    ./bin/hadoop 提示“没有那个文件或目录”解决方法
  • 原文地址:https://www.cnblogs.com/gocode/p/introduction-of-asyn-task-executor.html
Copyright © 2011-2022 走看看