zoukankan      html  css  js  c++  java
  • 异步任务执行器Executor简介

    以前线程Thread既表示执行的任务,又表示执行的机制。在JDK1.5中,java并发框架提供了一种“执行服务”的相关API,它将"任务的执行"和"任务的提交“相分离,”执行服务“封装了任务执行的细节,对于任务提交者来说,它可进一步聚焦于任务本身,如任务提交、获取任务执行后的结果、取消任务而不需要关注任务执行的细节,如线程的创建、任务的调试、线程的复用或关闭等。

    1.基本接口

    任务执行服务主要涉及4个接口

    • Runnable和Callable: 表示要执行的异步任务

    • Executor和ExecutorService:表示执行任务的服务

    • Future : 表示任务执行后的结果

    Runnable是我们最常用的异步任务接口,这个接口的方法没有返回值,不能抛出异常。而Callable接口就是为了解决Runnable的不足而在JDK1.5引入的接口,此接口的方法有返回值,且可抛出异常。

    @FunctionalInterface
    public interface Callable<V> {
        V call() throws Exception;
    }

    1)Executor接口

    Executor接口也只有一个方法,这个方法接受一个Runnable类型的参数,这是个抽象方法,它无法指定任务该如何执行。它可能是新建一个线程执行任务,也可能是利用线程池中的一个线程,还可能是在调用者线程中执行。

    public interface Executor {
        void execute(Runnable command);
    }

    ExecutorService扩展了Executor接口,它添加了一些新功能,如支持有返回结果的任务、支持超时任务、支持取消任务、支持批量提交任务。这里的submit方法,返回类型是Future,返回后,只表示任务已提交,不代表已经执行,具体执行与否要看”执行服务“如何调度,通过Future可以查询异步任务的状态、获取最终的结果、取消任务等。

    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)  throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }

    2)Future接口

    Future接口是对任务的结果做了进一步封装,字面上”future"是未来的意思,这里确实是表示“未来(或最终)的结果”,“结果”需要等待。

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException;
    }

    cancel()方法用于取消异步任务,若任务已完成、已取消或因其他原因不能取消等原因而导致任务取消失败就会返回false,反之返回true. 如果任务还未开始,则不再运行,但若任务已经在运行,则不一定能取消这个任务。参数mayInterruptIfRunning表示,如果任务正在执行,是否调用Thread.interrupt()方法中断线程,而interrupt()方法只是设置线程中断标志,它不一定能真的中断线程。

    isCancelled() 和isDone()分别返回任务是否被取消、任务是否已完成的布尔值。只要cancel方法返回true,那么即使执行任务的线程还未结束,isCancelled方法也一定会返回true。不管什么原因,无论是任务正常结束、任务抛出异常或任务被取消,只要任务结束了,isDone都会返回true.

    get()方法用于返回异步任务的结果,若任务未完成,当前线程则会阻塞待。get(long,TimeUnit)方法需要设定等待时长,若在给定的时间内还未完成任务,则会抛出TimeoutException异常。

    get方法的最终结果大致有3种: ①任务正常完成,get方法返回任务的执行结果,若任务是Runnable且入参未提供结果,最终返回null ②任务被取消了,get方法会抛出CancellationException. ③任务执行过程中抛出了异常,get方法会将异常包装为ExecutionException重新抛出,此异常的getCause方法可获得原始异常。

    Future是实现”任务的提交“与”任务的执行“相分离的关键,它是两者的桥梁,它使任务的提交者和任务的执行器的关注点相隔离,同时又让两者彼此联系。

    2.用法示例

    下面的例子中,使用异步任务,计算0-300的累加结果,在计算出结果前Future的get()方法将阻塞等待。

    这里使用了Executors工具类的newSingleThreadeExecutor方法创建一了个执行服务。Executors有很多静态方法,分别创建各种线程池执行服务。

    class SimpleTaskTest {
        static class Accumlation implements Callable<Integer> {
            @Override
            public Integer call() throws Exception {
                int sum =0;
                for (int i = 0; i < 300; i++) {
                    sum += i;
                    Thread.sleep(10);
                }
                return sum;
            }
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executor = Executors.newSingleThreadExecutor();
            Future<Integer> future = executor.submit(new Accumlation());
            long start = System.currentTimeMillis();
            int result = future.get();
            System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒才返回,计算结果:" + result);
        }
    }

    打印的结果

    Future.get方法等待了3147毫秒才返回,计算结果:44850

    3.基本原理

    1)ExecutorService实现

    ExecutorService是Executor的子接口,它添加了一些新功能,如支持有返回结果的任务、支持超时任务、支持取消任务、支持批量提交任务。

    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)  throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        Future<?> submit(Runnable task);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks)  throws InterruptedException, ExecutionException;
        <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
    }

    ①ExecutorService有两个关闭执行服务的方法,分别是shutdown()和shutdownNow. 两者关闭执行服务的方式有所差别,shutdown()方法调用后不会接受新的任务,但已提交的任务将继续执行(即使任务还未真正开始执行);而shutdownNow()方法不仅不会接受新任务,而且还会终止已经提交但未执行的任务,对于正在执行的任务,一般调用Thread.interrupt()方法设置中断标志,不过线程可能不响应中断,shutdownNow会返回已提交但未执行的任务列表。shutdown和shutdownNow不会阻塞等待,它们返回后不代表所有任务都结束。

    ②isShutdown返回执行服务是否被关闭的布尔值(不会等待),只要shutdown或shutdownNow任意一方法被调用后,isShutdwon都将返回true.

    ③awaitTermination方法用于等待执行服务中的所有任务完成,此方法需要设置超时时间,如果在限时间内所有任务都结束了(允许非正常结束),

    ④isTerminated 返回在执行服务关闭后所有任务是否已完成的布尔值,如果在此之前shutdownNow或shutdown没有被调用,这里永不可能返回true.

    ⑤三个submit方法都用于提交单任务,submit(Callable<T> )方法中入参Callable本身有返回结果 ;submit(Runnable,T)方法在设定任务的同时可以提供一个结果,在任务结束时将返回这个结果;submit(Runnable )方法入参没有提供结果,最终返回的结果是null 。

    ⑥ExecutorService有两类批量提交任务的方法,invokeAll和invokeAny,它们都有两个版本,一个不限时版本、一个超时版本。

    invokeAll等待(给定的)所有任务完成,返回的Future集合中,每个Future的isDone方法都返回true,但isDone是true并不代表任务完成了,也有可能是因任务被取消而导致任务非正常结束。invokeAll的超时版本方法,需要指定等待时间的时间,若超时后还有任务还未完成,这些任务就会被取消。而invokeAny,只要有一个任务正常完成(没抛出异常)后,它就返回此任务的结果;在正常返回或异常抛出返回后,其他任务则会被取消。对于invokeAny的超时版本,如果在限时内有一任务正常(没抛出异常)完成,就返回此任务的结果 ,其他将任务会被取消;如果没有任务能在限时内成功完成返回,就抛出TimeoutException; 没有任务正常成功返回(可能是因发生某种异常而返回),将抛出ExecutionException.

    在了解ExecutorService接口的相关抽象方法定义后,我们来进一步分析它的实现类和实现原理。

    ExecutorService的主要实现类是ThreadPoolExecutor,它是基于线程池实现的,ExecutorService有一个很重要的抽象类AbstractExecutorService, 而且ThreadPoolExecutor就是直接继承于AbstractExecutorService。

     

    我们可以基于此抽象类实现一个简易的ExecutorService。AbstractExecutorService提供了submit 、invokeAll和invokeAny的默认实现,子类只需要实现其他方法就行了。shutdown与isShutdown 等方法与生命周期管理有关,我暂时可以不用去管它,其实它的子类最关键在于实现execute方法,因为submit、invokeAll、invokeAny等方法底层主要还是调用execute方法。

    import java.util.List;
    import java.util.concurrent.*;
    
    public class CustomizeExecutorService extends AbstractExecutorService {
        @Override
        public void shutdown() {
            System.out.println("=====shutdown=====");
        }
        @Override
        public List<Runnable> shutdownNow() {
            System.out.println("=====shutdownNow=====");
            return null;
        }
        @Override
        public boolean isShutdown() {
            return false;
        }
        @Override
        public boolean isTerminated() {
            return false;
        }
        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            return false;
        }
        @Override
        public void execute(Runnable command) {
            new Thread(command).start();
        }
    }
    class Accumlation implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            int sum = 0;
            for (int i = 0; i < 300; i++) {
                sum += i;
                Thread.sleep(10);
            }
            return sum;
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executor = new CustomizeExecutorService();
            Future<Integer> future = executor.submit(new Accumlation());
            long start = System.currentTimeMillis();
            int result = future.get();
            System.out.println("Future.get方法等待了" + (System.currentTimeMillis() - start) + "毫秒,计算结果:" + result);
        }
    }
    CustomizeExecutorService

    打印结果

    Future.get方法等待了3142毫秒,计算结果:44850

    AbstractExecutorService的基本方法是submit,其他方法的实现可以此为参照。可以看下其submit系列方法的实现是怎样的。

    public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }

    不论入参是Runnable还是Callable,最终都会将其封装成RunnableFuture,而RunnableFuture又是Runnable的子接口,而后去调用execute(Runnable),最后返回RunnableFuture 。这里有点奇怪的地方在于对入参的封装,其实质相当于将Runnable或Callable任务封装成Runnable结果。这里也再次体现出了RunnableFuture的作用,它是连接“任务的提交”和“任务的执行”的桥梁。

    上面的submit方法体中调用newTaskFor,它实际上返回一个FutureTask类型的实例对象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    2)Future实现

    上面的提到的RunnableFuture同时继承了Runnable和Future接口,RunnableFuture本身没有增加任何抽象方法.

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();//此方法是父接口Runnable的抽象方法
    }

    而FutureTask又是RunnableFutrue接口的主要实现类,我们可以来看看其的实现细节

    它有一个成员变量state表示状态

    private volatile int state;

    它有这些可能取值

    private static final int NEW          = 0;//刚开始的状态或任务在运行中
    private static final int COMPLETING   = 1;//临时状态,任务即将结束,正在设置结果
    private static final int NORMAL       = 2;//任务正常完成
    private static final int EXCEPTIONAL  = 3;//因抛出异常而结束任务
    private static final int CANCELLED    = 4;//任务被取消
    private static final int INTERRUPTING = 5;//任务正在被中断
    private static final int INTERRUPTED  = 6;//任务被中断(中断的最终状态)

    其他成员变量

        private Callable<V> callable;
        private Object outcome; // non-volatile, protected by state reads/writes
        private volatile Thread runner;
        private volatile WaitNode waiters;

    成员变量callable表示要执行的任务。

    成员变量outcome表示任务的结果或任务非正常结束的异常

    成员变量runner表示执行此任务的线程

    成员变量waiter表示等待任务执行结果的等待栈表(数据结构是单向链表,先进后出)。WaitNode是一个简单的静态内部,一个成员变量thread表示等待结果的线程,另一个成员变量next表示下一个等待节点(线程)。

        static final class WaitNode {
            volatile Thread thread;
            volatile WaitNode next;
            WaitNode() { thread = Thread.currentThread(); }
        }

    FutureTask的构造方法会初始化callable和state。FutureTask的构造方法可以接受Runnable类型参数,它会调用Executors.callable将Runnable转换为Callable类型实例,以便于统一处理。

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    任务执行服务会使用一个线程执行FutureTask的run方法,run方法的实现

    public void run() {
            if (state != NEW ||
                    !UNSAFE.compareAndSwapObject(this, runnerOffset,
                            null, Thread.currentThread()))
                //将当前线程设置为执行任务的线程
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();//执行任务
                        ran = true;
                    } catch (Throwable ex) {
                        //运行时有异常,设置异常
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);//设置结果
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null; //state已是最终状态,不再变化,将runer设为null,防止run方法被并发调用
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state; //清空运行线程runner后再重新获取state,防止遗漏掉对中断的处理
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }

    其主要逻辑是:

    ①检查状态,设置运行任务的线程

    ②调用callable的call方法去执行任务,并捕获运行中可能出现的异常

    ③如果任务正常完成,调用set设置任务的结果,将state设为NORMAL, 将结果保存到outcome ,唤醒所有等待结果的线程

    ④若执行任务过程中发生了异常,调用setException设置异常,将state设为EXCEPTIONAL ,将此异常也保存到outcome ,唤醒所有等待结果的线程

    ⑤最后将运行线程runner清空,若状态可能是任务被取消的中断还要处理此中断。

    对于任务提交者,可通过get方法获取任务的最终结果,它的超时版本是

    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING && //还未完成
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) //等待完成
            //到了限定时间,任务仍未完成,抛出超时异常TimeoutException
            throw new TimeoutException();
        return report(s);//报告结果
    }

    其主要逻辑是:若任务未完成就等待任务完成,最后调用report报告结果,report会根据状态返回结果或抛出异常。

    而report方法的基本逻辑也很简单:若是任务正常结束就返回这个任务的结果,若是任务被取消,就抛出任务取消异常CancellationException,若是在执行任务过程中发生了异常就统一将其封装成ExecutionException并抛出。

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    说到任务被取消,我们可以看看cancel(boolean)方法如何实现的

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
                UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            //①不是NEW状态,表示任务至少是COMPLETING(即将结束)状态,返回false
            //②CAS更新state为INTERRUPTING或CANCELLED失败,返回false
            //只有state状态更新成功,才能取消任务(防止被并发调用)
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//允许中断就设置中断标志
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();//设置中断标志
                } finally { // final state 设置中断的最终状态
                    //INTERRUPTING -> INTERRUPTED ,将state由“正在中断”更新为”已经中断“
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //从等待栈中唤醒并移除所有的线程(节点)
            finishCompletion();
        }
        return true;
    }

    其基本逻辑:

    ①任务已结束或被取消,返回false

    ②若mayInterruptIfRunning为true,调用interrupt设置中断标志,将state设置为INTERRUPTED,若mayInterruptIfRunning为false,将state设为CANCELLED.

    ③调用finishCompletion唤醒并移除等待栈中的所有线程

  • 相关阅读:
    16. 3Sum Closest
    17. Letter Combinations of a Phone Number
    20. Valid Parentheses
    77. Combinations
    80. Remove Duplicates from Sorted Array II
    82. Remove Duplicates from Sorted List II
    88. Merge Sorted Array
    257. Binary Tree Paths
    225. Implement Stack using Queues
    113. Path Sum II
  • 原文地址:https://www.cnblogs.com/gocode/p/introduction-of-asyn-task-executor.html
Copyright © 2011-2022 走看看