zoukankan      html  css  js  c++  java
  • CompletionService/ExecutorCompletionService/线程池/concurrent包

    线程池

    线程池的基本思想:线程频繁的创建、销毁会极大地占用系统资源,为了减少系统在创建销毁线程时的开销,线程池应运而生。线程池包括多个已创建的线程,当有任务要在新线程中执行时,将任务提交给线程池,线程池选取空闲线程或新开线程执行该任务,可见线程池应维护一个任务队列和线程队列。此外还要对线程最大数、最小数目、空闲等待时间等进行管理。

    Executor、ExecutorService接口(线程池)

    Executor提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。
    方法execute()向线程池提交任务
    ExecutorService接口 一个ExecutorService包括运行、关闭、终止三个生命阶段,当一个ExecutorService处于关闭状态时,不能再向线程池提交任务。线程池管理,提交任务功能扩展submit
    AbstractExecutorService实现了ExecutorService接口 ,ThreadPoolExecutor、ScheduledThreadPoolExecutor类是具体的线程池实现类。

    CompletionService接口、ExecutorCompletionService类

    能返回已完成任务,维护一个已完成任务队列(Future),通过take() 函数可以获得任务。
    CompletionService接口将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者 submit 执行的任务。使用者 take 已完成的任务,并按照完成这些任务的顺序处理它们的结果。
    ExecutorCompletionService类 依赖于一个线程池对象Excutor,在一个Excutor基础上维护一个已完成任务队列。

    Callable/Runnable、Future(任务接口),

    (1)Callable是能够返回结果的可执行任务,Runnable不返回执行结果。文档中说Future 表示异步计算的结果,从CompletionService接口来理解,感觉不如说Future是执行完之后,包含返回结果且可以取消的任务。
    (2)execute方法只能提交Runnable任务,ExecutorService接口扩展的submit方法还可以提交Callable任务,ExecutorCompletionService类的submit也支持两种任务
    (3)FutrueTask实现了RunnableFuture接口,且可以由一个Callable对象来构造,基本包括了三个接口的功能

    Executors(创建线程池)

    类提供了用于此包中所提供的执行程序服务的工厂方法。

    引用这里

    ExecutorCompletionService详解

    ExecutorCompletionService源码
        //成员变量,一个执行器,一个存放执行结果的阻塞队列
        //一个ExecutorService抽象类(转化submit的Runnable或Callable为RunnableFuture<V>)
        private final Executor executor;
        private final AbstractExecutorService aes;
        private final BlockingQueue<Future<V>> completionQueue;
    
        //提交待执行的任务,任务均会转化为RunnableFuture<V>接口,然后放入执行器执行。
       public Future<V> submit(Callable<V> task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    
        public Future<V> submit(Runnable task, V result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<V> f = newTaskFor(task, result);
            executor.execute(new QueueingFuture(f));
            return f;
        }
    
        //内部类QueueingFuture,将转化后的RunnableFuture<V>接口对象,装配,重写done方法,使得任务执行结束后,放入完成队列
        private class QueueingFuture extends FutureTask<Void> {
            QueueingFuture(RunnableFuture<V> task) {
                super(task, null);
                this.task = task;
            }
            protected void done() { completionQueue.add(task); }
            private final Future<V> task;
        }
    
        //阻塞方法,直至队列中有任务完成
        public Future<V> take() throws InterruptedException {
            return completionQueue.take();
        }
    
        public Future<V> poll() {
            return completionQueue.poll();
        }
    
        public Future<V> poll(long timeout, TimeUnit unit)
                throws InterruptedException {
            return completionQueue.poll(timeout, unit);
        }
    
    阻塞队列take方法源码
        //用到了ReentrantLock,队列为空时await,队列使用put加元素时signal.....好一个PV操作...
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();
            try {
                while (count.get() == 0) {
                    notEmpty.await();
                }
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    总结

    ExecutorCompletionService初始化时放入了一个线程池,并且每个submit的任务的执行结果,都会记录到本身的阻塞队列中,重点是,
    如果使用take()阻塞方法取出结果的数量等于submit的任务数量,那么所有任务一定是complete(完成)。

    代码示例

    CompletionService<Integer> completionExecutor=new ExecutorCompletionService<>(Executors.newFixedThreadPool());
    
    
    List<Task> subTasks=multiTask.getSubTasks();//假设list中有5个任务
    
    //全部submit到ExecutorCompletionService
    for (Task task : subTasks) {
        		completionExecutor.submit( new Runnable() {
    				public void run() {
    					task.runTask();
    				}
    			}, Integer.valueOf(TASKFINISHED));
    		}
    
    
    //如果取任务的数量恰好是submit的数量,那么下面这段一定阻塞,直至submit的任务全部完成
    for (int i = 0; i < subTasks.size(); i++) {
        		try {
    				//阻塞,直至有一个任务完成,并取回结果
    				Future<Integer> future=completionExecutor.take();
    				future.get()
    			} catch (InterruptedException) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    		}
    
    
    


    I am a slow walker, but I never walk backwards.



  • 相关阅读:
    selenium iframe切换
    roboframework环境搭建与使用
    ServHa双机热备简单配置
    linux搭建java ee开发环境
    FPGA学习笔记(五)—— 组合逻辑电路设计
    FPGA学习笔记(三)—— 数字逻辑设计基础(抽象的艺术)
    FPGA学习笔记(一)——初识FPGA
    FPGA学习笔记(二)——FPGA学习路线及开发流程
    【js重学系列】异步编程
    【js重学系列】作用域
  • 原文地址:https://www.cnblogs.com/lknny/p/5837048.html
Copyright © 2011-2022 走看看