zoukankan      html  css  js  c++  java
  • CompletionService/ExecutorCompletionService/线程池/concurrent包

    线程池

    线程池的基本思想:线程频繁的创建、销毁会极大地占用系统资源,为了减少系统在创建销毁线程时的开销,线程池应运而生。线程池包括多个已创建的线程,当有任务要在新线程中执行时,将任务提交给线程池,线程池选取空闲线程或新开线程执行该任务,可见线程池应维护一个任务队列和线程队列。此外还要对线程最大数、最小数目、空闲等待时间等进行管理。

    Executor、ExecutorService接口(线程池)

    Executor提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。
    方法execute()向线程池提交任务
    ExecutorService接口 一个ExecutorService包括运行、关闭、终止三个生命阶段,当一个ExecutorService处于关闭状态时,不能再向线程池提交任务。线程池管理,提交任务功能扩展submit
    AbstractExecutorService实现了ExecutorService接口 ,ThreadPoolExecutor、ScheduledThreadPoolExecutor类是具体的线程池实现类。

    CompletionService接口、ExecutorCompletionService类

    能返回已完成任务,维护一个已完成任务队列(Future),通过take() 函数可以获得任务。
    CompletionService接口将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。
    ExecutorCompletionService类 依赖于一个线程池对象Excutor,在一个Excutor基础上维护一个已完成任务队列。

    Callable/Runnable、Future(任务接口),

    (1)Callable是能够返回结果的可执行任务,Runnable不返回执行结果。文档中说Future 表示异步计算的结果,从CompletionService接口来理解,感觉不如说Future是执行完之后,包含返回结果且可以取消的任务。
    (2)execute方法只能提交Runnable任务,ExecutorService接口扩展的submit方法还可以提交Callable任务,ExecutorCompletionService类的submit也支持两种任务
    (3)FutrueTask实现了RunnableFuture接口,且可以由一个Callable对象来构造,基本包括了三个接口的功能

    Executors(创建线程池)

    类提供了用于此包中所提供的执行程序服务的工厂方法。

    引用这里

    ExecutorCompletionService详解

    ExecutorCompletionService源码
        //成员变量,一个执行器,一个存放执行结果的阻塞队列
        //一个ExecutorService抽象类(转化submit的Runnable或Callable为RunnableFuture<V>)
        private final Executor executor;
        private final AbstractExecutorService aes;
        private final BlockingQueue<Future<V>> completionQueue;
    
        //提交待执行的任务,任务均会转化为RunnableFuture<V>接口,然后放入执行器执行。
       public Future<V> submit(Callable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    
        public Future<V> submit(Runnable task, V result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task, result);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    
        //内部类QueueingFuture,将转化后的RunnableFuture<V>接口对象,装配,重写done方法,使得任务执行结束后,放入完成队列
        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
    
        //阻塞方法,直至队列中有任务完成
        public Future<V> take() throws InterruptedException {
            return completionQueue.take();
        }
    
        public Future<V> poll() {
            return completionQueue.poll();
        }
    
        public Future<V> poll(long timeout, TimeUnit unit)
                throws InterruptedException {
            return completionQueue.poll(timeout, unit);
        }
    
    阻塞队列take方法源码
        //用到了ReentrantLock,队列为空时await,队列使用put加元素时signal.....好一个PV操作...
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    总结

    ExecutorCompletionService初始化时放入了一个线程池,并且每个submit的任务的执行结果,都会记录到本身的阻塞队列中,重点是,
    如果使用take()阻塞方法取出结果的数量等于submit的任务数量,那么所有任务一定是complete(完成)。

    代码示例

    CompletionService<Integer> completionExecutor=new ExecutorCompletionService<>(Executors.newFixedThreadPool());
    
    
    List<Task> subTasks=multiTask.getSubTasks();//假设list中有5个任务
    
    //全部submit到ExecutorCompletionService
    for (Task task : subTasks) {
        		completionExecutor.submit( new Runnable() {
    				public void run() {
    					task.runTask();
    				}
    			}, Integer.valueOf(TASKFINISHED));
    		}
    
    
    //如果取任务的数量恰好是submit的数量,那么下面这段一定阻塞,直至submit的任务全部完成
    for (int i = 0; i < subTasks.size(); i++) {
        		try {
    				//阻塞,直至有一个任务完成,并取回结果
    				Future<Integer> future=completionExecutor.take();
    				future.get()
    			} catch (InterruptedException) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    		}
    
    
    


    I am a slow walker, but I never walk backwards.



  • 相关阅读:
    在IE和Firfox获取keycode
    using global variable in android extends application
    using Broadcast Receivers to listen outgoing call in android note
    help me!virtual keyboard issue
    using iscroll.js and iscroll jquery plugin in android webview to scroll div and ajax load data.
    javascript:jquery.history.js使用方法
    【CSS核心概念】弹性盒子布局
    【Canvas学习笔记】基础篇(二)
    【JS核心概念】数据类型以及判断方法
    【问题记录】ElementUI上传组件使用beforeupload钩子校验失败时的问题处理
  • 原文地址:https://www.cnblogs.com/lknny/p/5837048.html
Copyright © 2011-2022 走看看