zoukankan      html  css  js  c++  java
  • AbstractExecutorService源码

    public class RunnableFutureTask {
        static FinalizableDelegatedExecutorService executorService = (FinalizableDelegatedExecutorService) Executors1.newSingleThreadExecutor();    //创建一个单线程执行器
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            futureDemo();
        }
        
        static void futureDemo() throws InterruptedException, ExecutionException {
                /*调用FinalizableDelegatedExecutorService的submit,转调ThreadPoolExecutor1的submit,
                转调父类AbstractExecutorService1的submit,提交给线程池的是一个FutureTask,线程池里面调用runWorker(),
                然后Runnable的run方法,就是FutureTask的run方法(开了线程池的一个线程去执行),executorService.submit线程退出到外层。
                FutureTask的run方法没有返回值,结果是在FutureTask的outcome属性里面的。result2.get()就是获取FutureTask的outcome属性。*/
                Future<String> result2 = executorService.submit(new Callable<String>()     { 
                    public String call() throws Exception {
                        return "result2";    
                    }
                });
                System.out.println("future result from callable:"+result2.get());    
                //executorService.submit会把Callable封装成FutureTask然后丢到线程池执行,结果在FutureTask自身中。
            
                
                FutureTask1<String> result3 = new FutureTask1<String>(new Callable<String>() {
                    public String call() throws Exception {
                        return "result3";
                    }
                });
                executorService.submit(result3);
                /*submit会把result3再封装为FutureTask,丢到线程池执行,线程池最后执行runWorker(),然后Runnable的run方法,
                就是外层FutureTask的run方法(开了线程池的一个线程去执行),外层FutureTask run()时候调用里面result3的call方法,
                result3作为一个Runnable已经被封装为了一个Callable,就嗲用封装的call(),转为调用里面真正result3的run方法,
                result3的run方法执行时候,调用里面匿名内部类的call(),设置结果给result3。*/
                System.out.println("future result from FutureTask:" + result3.get());     
                
        }
    }
    public abstract class AbstractExecutorService1 implements ExecutorService {
    
        // 对Runnable的封装,T是期望值
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return new FutureTask1<T>(runnable, value);
        }
    
        // 对Callable的封装
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return new FutureTask1<T>(callable);// FutureTask1是一个Runnable,也就是通过execute提交给ThreadPoolExecutor线程池的一个任务
        }
    
        // Runnable封装为FutureTask,把FutureTask(是一个Runnable)作为任务丢到线程池执行,并且返回这个FutureTask。从FutureTask得到执行结果。
        public Future<?> submit(Runnable task) {
            if (task == null)
                throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);// FutureTask1
            execute(ftask);
            return ftask;// ftask是一个Runnable,也就是通过execute提交给ThreadPoolExecutor线程池的一个任务
            // 从ftask FutureTask获取结果
        }
    
        // Runnable封装为FutureTask,把FutureTask(是一个Runnable)作为任务丢到线程池执行,并且返回这个FutureTask。。从FutureTask得到执行结果。
        // T是期望值
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null)
                throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);// FutureTask1
            execute(ftask);
            return ftask;// 从ftask FutureTask获取结果
        }
    
        // Callable封装为FutureTask,把FutureTask(是一个Runnable)作为任务丢到线程池执行,并且返回这个FutureTask。。从FutureTask得到执行结果。
        public <T> Future<T> submit(Callable<T> task) {
            if (task == null)
                throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task);// FutureTask1
            execute(ftask);
            return ftask;// 从ftask FutureTask获取结果
        }
    
        // invokeAny()和invokeAll()方法是以批量的形式执行一组任务,然后等待至少一个或者全部的任务完成。
        
        //一批任务中的某个任务完成了,就返回他的结果,所以他返回的是最快执行完成的那个任务的结果,他的重载方法,加入了超时机制,如果超过了限制的时间也没有一个任务完成,那么就会抛出超时异常了。
        private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
                throws InterruptedException, ExecutionException, TimeoutException {
            if (tasks == null)
                throw new NullPointerException();
            int ntasks = tasks.size();
            if (ntasks == 0)
                throw new IllegalArgumentException();
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
            ExecutorCompletionService1<T> ecs = new ExecutorCompletionService1<T>(this);
    
            // 为了提高效率,特别是在并行性有限的执行器中,请在提交更多任务之前检查以前提交的任务是否已完成。这种交织加上异常机制解释了主循环的混乱。
    
            try {
                // 记录异常,以便如果我们无法获得任何结果,我们可以抛出最后一个异常。
                ExecutionException ee = null;
                final long deadline = timed ? System.nanoTime() + nanos : 0L;
                Iterator<? extends Callable<T>> it = tasks.iterator();
    
                // Start one task for sure; the rest incrementally
                futures.add(ecs.submit(it.next()));
                --ntasks;
                int active = 1;
    
                for (;;) {
                    Future<T> f = ecs.poll();
                    if (f == null) {
                        if (ntasks > 0) {
                            --ntasks;
                            futures.add(ecs.submit(it.next()));
                            ++active;
                        } else if (active == 0)
                            break;
                        else if (timed) {
                            f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                            if (f == null)
                                throw new TimeoutException();
                            nanos = deadline - System.nanoTime();
                        } else
                            f = ecs.take();
                    }
                    if (f != null) {
                        --active;
                        try {
                            return f.get();
                        } catch (ExecutionException eex) {
                            ee = eex;
                        } catch (RuntimeException rex) {
                            ee = new ExecutionException(rex);
                        }
                    }
                }
    
                if (ee == null)
                    ee = (ExecutionException) new Exception();
                throw ee;
    
            } finally {
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
            try {
                return doInvokeAny(tasks, false, 0);
            } catch (TimeoutException cannotHappen) {
                assert false;
                return null;
            }
        }
    
        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            return doInvokeAny(tasks, true, unit.toNanos(timeout));
        }
    
        //批量提交并执行任务,当所有任务都执行完成时,返回一个保存任务状态和执行结果的Future列表。
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
            if (tasks == null)// 如果任务集后为空,则抛出一个NullPointerException
                throw new NullPointerException();
            // 创建一个和任务数量等大的ArrayList.
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
            boolean done = false;//标志任务是否完成
            try {
                //将tasks里面的每个Callable转化成Future,添加到futures里面,并交给Executor#execute()方法执行
                for (Callable<T> t : tasks) {
                    RunnableFuture<T> f = newTaskFor(t);// FutureTask1
                    futures.add(f);
                    execute(f);
                }
                //判断futures里面的Future是否执行结束,如果还没有完成,通过get()阻塞直到任务完成
                //也是就说执行完这一段代码,futures里面的每一个任务都是执行完成的情况 
                for (int i = 0, size = futures.size(); i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) {
                        try {
                            f.get();
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        }
                    }
                }
                done = true;
                return futures;
            } finally {
                if (!done)//如果任务没有完成的,就全部都取消,并释放内存
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);//采用中断线程的方式取消任务
            }
        }
    
        //重载方法也是类似的,就是加入了一个超时时间,不管是所有的任务都执行完,还是已经到达超时的时间,只有两个有中满足其中一个,就会返回一个保存任务状态和执行结果的Future列表。
        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
                throws InterruptedException {
            if (tasks == null)
                throw new NullPointerException();//如果任务集后为空,则抛出一个NullPointerException
            long nanos = unit.toNanos(timeout);//将超时时间转化成纳秒
            ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());//创建一个和任务数量等大的ArrayList.
            boolean done = false;
            try {
                for (Callable<T> t : tasks)
                    futures.add(newTaskFor(t));//将tasks里面的每个Callable转化成Future,添加到futures里面
    
                final long deadline = System.nanoTime() + nanos;//超时时间点
                final int size = futures.size();//记录一下futures的大小,
    
                //执行任务,如果超时就直接返回futures
                for (int i = 0; i < size; i++) {
                    execute((Runnable) futures.get(i));
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L)
                        return futures;
                }
                //判断futures里面的Future是否执行结束,如果还没有完成,通过get()阻塞直到任务完成
                //也是就说执行完这一段代码,要么futures里面的每一个任务都是执行完成了,要么就是超时了 
                for (int i = 0; i < size; i++) {
                    Future<T> f = futures.get(i);
                    if (!f.isDone()) {
                        if (nanos <= 0L)
                            return futures;
                        try {
                            f.get(nanos, TimeUnit.NANOSECONDS);//这里捕捉了get()方法可能出现了所有的异常
                        } catch (CancellationException ignore) {
                        } catch (ExecutionException ignore) {
                        } catch (TimeoutException toe) {
                            return futures;
                        }
                        nanos = deadline - System.nanoTime();
                    }
                }
                done = true;//标记任务完成
                return futures;
            } finally {
                if (!done)//如果任务没有完成的,就全部都取消,并释放内存
                    for (int i = 0, size = futures.size(); i < size; i++)
                        futures.get(i).cancel(true);//采用中断线程的方式取消任务
            }
        }
    
    }
  • 相关阅读:
    Reactive(1) 从响应式编程到"好莱坞"
    [动图演示]Redis 持久化 RDB/AOF 详解与实践
    补习系列(22)-全面解读 Spring Profile 的用法
    Android手机打造你的Python&Java开发工具!
    人工神经网络模型种类
    最小二乘拟合
    LDA主体模型
    Logistic Regression求解classification问题
    batch gradient descent(批量梯度下降) 和 stochastic gradient descent(随机梯度下降)
    SVM实验
  • 原文地址:https://www.cnblogs.com/yaowen/p/11388331.html
Copyright © 2011-2022 走看看