zoukankan      html  css  js  c++  java
  • JAVA的Executor框架

      Executor框架分离了任务的创建和执行。JAVA SE5的java.util.concurrent包中的执行器(Executor)管理Thread对象,从而简化了并发编程。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等.

    1.Executor接口

    public interface Executor{
       void execute(Runnable task);
    }
    

      Executor接口是Executor框架中最基础的部分,定义了一个用于执行Runnable的execute方法。它没有直接的实现类,有一个重要的子接口ExecutorService.执行已提交的Runnable任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():

     Executor executor = anExecutor;
     executor.execute(new RunnableTask1());
     executor.execute(new RunnableTask2());
     ...
    

      内存一致性效果:线程中将Runnable 对象提交到 Executor之前的操作 happen-before其执行开始(可能在另一个线程中)。

    2.ExecutorService

    public interface ExecutorService extends Executor {
        void shutdown();//关闭方法,调用后执行之前提交的任务,不再接受新的任务
        List<Runnable> shutdownNow();//从语义上可以看出是立即停止的意思,将暂停所有等待处理的任务并返回这些任务的列表
        boolean isShutdown();// 判断执行器是否已经关闭
        boolean isTerminated();//关闭后所有任务是否都已完成
        boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;
        <T> Future<T> submit(Callable<T> task);//提交一个Callable任务
        <T> Future<T> submit(Runnable task, T result);//提交一个Runable任务,result要返回的结果
        Future<?> submit(Runnable task);//提交一个任务
        //执行所有给定的任务,当所有任务完成,返回保持任务状态和结果的Future列表
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;
        //执行给定的任务,当所有任务完成或超时期满时(无论哪个首先发生),返回保持任务状态和结果的 Future 列表。
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
         throws InterruptedException; 执行给定的任务,如果某个任务已成功完成(也就是未抛出异常),则返回其结果。 <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException; //执行给定的任务,如果在给定的超时期满前某个任务已成功完成(也就是未抛出异常),则返回其结果. <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
         throws InterruptedException,ExecutionException,TimeoutException; }

      ExecutorService接口继承自Executor接口,定义了终止、提交任务、跟踪任务返回结果等方法。ExecutorService涉及到Runnable、Callable、Future接口,这些接口的具体内容如下。

      // 实现Runnable接口的类将被Thread执行,表示一个基本的任务
      public interface Runnable {
          public abstract void run();// run方法就是它所有的内容,就是实际执行的任务
      }
      // Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
      public interface Callable<V> {
          V call() throws Exception;// 相对于run方法的带有返回值的call方法
      }

    3.Future:代表异步任务执行的结果

    public interface Future<V> {
        //尝试取消一个任务,如果这个任务不能被取消(通常是因为已经执行完了),返回false,否则返回true。
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();// 返回代表的任务是否在完成之前被取消了 
        boolean isDone(); //如果任务已经完成,返回true
        //获取异步任务的执行结果,如果任务没执行完将等待
        V get() throws InterruptedException, ExecutionException;
        //获取异步任务的执行结果(有最长等待时间的限制)。timeout表示等待的时间,unit是它时间单位 
        V get(long timeout, TimeUnit unit) throws InterruptedException,ExecutionException, TimeoutException;
    }

    4.ScheduledExecutorService

    //可以安排指定时间或周期性的执行任务的ExecutorService
    public interface ScheduledExecutorService extends ExecutorService {
        //在指定延迟后执行一个任务,只执行一次
        public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit);
        //与上面的方法相同,只是接受的是Callable任务 
        public ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit);
        //创建并执行一个周期性的任务,在initialDelay延迟后每间隔period个单位执行一次,时间单位都是unit
        // 每次执行任务的时间点是initialDelay, initialDelay+period, initialDelay + 2 * period
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period, TimeUnit unit);
        // 创建并执行一个周期性的任务,在initialDelay延迟后开始执行,在执行结束后再延迟delay个单位开始执行下一次任务,
        // 时间单位都是unit.每次执行任务的时间点是initialDelay, initialDelay+(任务运行时间+delay),
        // initialDelay + 2 * (任务运行时间+delay)...
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay, long delay, TimeUnit unit);
    }

     5.AbstractExecutorService

    //提供ExecutorService的默认实现
    public abstract class AbstractExecutorService implements ExecutorService { 
        //为指定的Runnable和value构造一个FutureTask, value表示默认被返回的Future
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask<T>(runnable, value);
        }
        //为指定的Callable创建一个FutureTask
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask<T>(callable);
        }
        // 提交Runnable任务
        public Future<?> submit(Runnable task) {
            if (task == null)
                throw new NullPointerException();
            // 通过newTaskFor方法构造RunnableFuture,默认的返回值是null
            RunnableFuture<Object> ftask = newTaskFor(task, null);
            // 调用具体实现的execute方法
            execute(ftask);
            return ftask;
        }
        // 提交Runnable任务
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            //通过newTaskFor方法构造RunnableFuture,默认的返回值是result
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
        //提交Callable任务
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null)
                throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);
            execute(ftask);
            return ftask;
        }
        // doInvokeAny的具体实现(核心内容),其它几个方法都是重载方法,都对这个方法进行调用
        // tasks是被执行的任务集,timed标志是否定时的,nanos表示定时的情况下执行任务的限制时间
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed,long nanos)
             throws InterruptedException, ExecutionException, TimeoutException {
             if (tasks==null)// tasks空判断
                 throw new NullPointerException();
             int ntasks=tasks.size();// 任务数量
             if (ntasks == 0)
                 throw new IllegalArgumentException();  
             List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);// 创建对应数量的Future返回集
             ExecutorCompletionService<T> ecs =new ExecutorCompletionService<T>(this);
             try {
                 ExecutionException ee = null; //执行异常
                 long lastTime = (timed)? System.nanoTime(): 0; //System.nanoTime()根据系统计时器当回当前的纳秒值
                 Iterator<? extends Callable<T>> it = tasks.iterator();//获取任务集的遍历器
                 futures.add(ecs.submit(it.next()));//向执行器ExecutorCompletionService提交一个任务,并将结果加入futures中
                 --ntasks;//修改任务计数器
                 int active =1;//活跃任务计数器
                 for (;;) {
                     Future<T> f = ecs.poll();//获取并移除代表已完成任务的Future,如果不存在,返回null
                     if (f== null) {
                         //没有任务完成,且任务集中还有未提交的任务
                         if (ntasks > 0) {
                             --ntasks;//剩余任务计数器减   
                             futures.add(ecs.submit(it.next()));//提交任务并添加结果
                             ++active; //活跃任务计数器加
                         }
                         //没有剩余任务,且没有活跃任务(所有任务可能都会取消),跳过这一次循环
                         else if (active==0)
                             break;
                         else if (timed) {
                             //获取并移除代表已完成任务的Future,如果不存在,会等待nanos指定的纳秒数
                             f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                             if (f == null)
                                 throw new TimeoutException();
                             // 计算剩余可用时间
                             long now = System.nanoTime();
                             nanos -= now - lastTime;
                             lastTime = now;
                         }
                         else
                             // 获取并移除表示下一个已完成任务的未来,等待,如果目前不存在。
                             // 执行到这一步说明已经没有任务任务可以提交,只能等待某一个任务的返回
                             f = ecs.take();
                     }
                     // f不为空说明有一个任务完成了
                     if (f!=null) {
                         --active;//已完成一个任务,所以活跃任务计数减
                         try {
                             return f.get();//返回该任务的结果
                         } catch (InterruptedException ie) {
                             throw ie;
                         } catch (ExecutionException eex) {
                             ee = eex;
                         } catch (RuntimeException rex) {
                             ee = new ExecutionException(rex);
                         }
                     }
                 }
                 // 如果没有成功返回结果则抛出异常
                 if (ee == null)
                     ee = new ExecutionException();
                 throw ee;
             } finally {
                 // 无论执行中发生异常还是顺利结束,都将取消剩余未执行的任务
                 for (Future<T> f : futures)
                     f.cancel(true);
             }
         }
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
             throws InterruptedException, ExecutionException {
             try {
                 return doInvokeAny(tasks, false,0);//非定时任务的doInvokeAny调用
             } catch (TimeoutException cannotHappen) {
                 assert false;
                 return null;
             }
         }
        // 定时任务的invokeAny调用,timeout表示超时时间,unit表示时间单位
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit) 
                        throws InterruptedException,
                ExecutionException, TimeoutException {
            return doInvokeAny(tasks, true, unit.toNanos(timeout));
        }
        // 无超时设置的invokeAll方法
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
                throws InterruptedException {
            // 空任务判断
            if (tasks == null)
                throw new NullPointerException();
            // 创建大小为任务数量的结果集
            List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            //是否完成所有任务的标记
            boolean done=false;
            try {
                // 遍历并执行任务
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);
                    futures.add(f);
                    execute(f);
                }
                // 遍历结果集
                for (Future<T> f : futures) {
                    // 如果某个任务没完成,通过f调用get()方法
                    if (!f.isDone()) {
                        try {
                            //get方法等待计算完成,然后获取结果(会等待)。所以调用get后任务就会完成计算,否则会等待
                            f.get();
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                } 
                done =true;//标志所有任务执行完成
                return futures;// 返回结果
            } finally {
                // 假如没有完成所有任务(可能是发生异常等情况),将任务取消
                if (!done)
                    for (Future<T> f : futures)
                        f.cancel(true);
            }
        }
        // 超时设置的invokeAll方法
        public <T> List<Future<T>>invokeAll(Collection<? extends Callable<T>> tasks,long timeout,TimeUnit unit)throws InterruptedException {
             if (tasks == null||unit == null)//需要执行的任务集为空或时间单位为空,抛出异常
                 throw new NullPointerException();
             long nanos = unit.toNanos(timeout);//将超时时间转为纳秒单位
             List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());//创建任务结果集
             boolean done = false;//是否全部完成的标志
             try {
                 //遍历tasks,将任务转为RunnableFuture
                 for (Callable<T> t : tasks)
                     futures.add(newTaskFor(t));
                 long lastTime = System.nanoTime(); //记录当前时间(单位是纳秒)
                 Iterator<Future<T>> it = futures.iterator();//获取迭代器
                 // 遍历
                 while (it.hasNext()) {
                     execute((Runnable)(it.next()));//执行任务
                     long now = System.nanoTime();//记录当前时间 
                     nanos -= now - lastTime;//计算剩余可用时间
                     lastTime = now;//更新上一次执行时间
                     //超时,返回保存任务状态的结果集
                     if (nanos <= 0)
                         return futures;
                 }
                 for (Future<T> f : futures) {
                     //如果有任务没完成
                     if (!f.isDone()) {
                         //时间已经用完,返回保存任务状态的结果集
                         if (nanos <= 0)
                             return futures;
                         try {
                             // 获取计算结果,最多等待给定的时间nanos,单位是纳秒
                             f.get(nanos, TimeUnit.NANOSECONDS);
                         } catch (CancellationException ignore) {
                         } catch (ExecutionException ignore) {
                         } catch (TimeoutException toe) {
                             return futures;
                         }
                         // 计算可用时间
                         long now = System.nanoTime();
                         nanos -= now - lastTime;
                         lastTime = now;
                     }
                 }
                 done = true;//修改是否全部完成的标记          
                 return futures;//返回结果集
             } finally {
                 //假如没有完成所有任务(可能是时间已经用完、发生异常等情况),将任务取消
                 if (!done)
                     for (Future<T> f : futures)
                         f.cancel(true);
             }
         }
    }
  • 相关阅读:
    Hive的安装和使用
    Redis 慢查询日志
    GO语言-数组
    ZooKeeper-3.3.4集群安装配置
    GO语言-基础语法:循环
    GO语言-基础语法:条件判断
    GO语言-基础语法:变量定义
    nginx限制下载速度
    Centos7下Etcd集群搭建
    浅谈spj
  • 原文地址:https://www.cnblogs.com/wxgblogs/p/5435646.html
Copyright © 2011-2022 走看看