zoukankan      html  css  js  c++  java
  • 多线程批量执行等待全部结果

    来自:http://blog.csdn.net/wxwzy738/article/details/8497853

           http://blog.csdn.net/cutesource/article/details/6061229

    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Test17 {
        public static void main(String[] args) throws Exception {
            Test17 t = new Test17();
            t.count1();
            t.count2();
        }
    //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理
        public void count1() throws Exception{
            ExecutorService exec = Executors.newCachedThreadPool();
            BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
            for(int i=0; i<10; i++){
                Future<Integer> future =exec.submit(getTask());
                queue.add(future);
            }
            int sum = 0;
            int queueSize = queue.size();
            for(int i=0; i<queueSize; i++){
                sum += queue.take().get();
            }
            System.out.println("总数为:"+sum);
            exec.shutdown();
        }
    //使用CompletionService(完成服务)保持Executor处理的结果
        public void count2() throws InterruptedException, ExecutionException{
            ExecutorService exec = Executors.newCachedThreadPool();
            CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);
            for(int i=0; i<10; i++){
                execcomp.submit(getTask());
            }
            int sum = 0;
            for(int i=0; i<10; i++){
    //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
                Future<Integer> future = execcomp.take();
                sum += future.get();
            }
            System.out.println("总数为:"+sum);
            exec.shutdown();
        }
        //得到一个任务
        public Callable<Integer> getTask(){
            final Random rand = new Random();
            Callable<Integer> task = new Callable<Integer>(){
                @Override
                public Integer call() throws Exception {
                    int i = rand.nextInt(10);
                    int j = rand.nextInt(10);
                    int sum = i*j;
                    System.out.print(sum+"	");
                    return sum;
                }
            };
            return task;
        }
        /**
         * 执行结果:
            6    6    14    40    40    0    4    7    0    0    总数为:106
            12    6    12    54    81    18    14    35    45    35    总数为:312
         */
    }

    先看一下新建一个ThreadPoolExecutor的构建参数:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

    看这个参数很容易让人以为是线程池里保持corePoolSize个线程,如果不够用,就加线程入池直至maximumPoolSize大小,如果 还不够就往workQueue里加,如果workQueue也不够就用RejectedExecutionHandler来做拒绝处理。

    但实际情况不是这样,具体流程如下:

    1)当池子大小小于corePoolSize就新建线程,并处理请求

    2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

    3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

    4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

    内部结构如下所示:

    从中可以发现ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住。

    其实可以通过Executes来学学几种特殊的ThreadPoolExecutor是如何构建的。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    newFixedThreadPool就是一个固定大小的ThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    newCachedThreadPool比较适合没有固定大小并且比较快速就能完成的小任务,没必要维持一个Pool,这比直接new Thread来处理的好处是能在60秒内重用已创建的线程。

    其他类型的ThreadPool看看构建参数再结合上面所说的特性就大致知道它的特性

  • 相关阅读:
    管理之道
    Swagger-editor安装启动及错误处理,注意事项
    装箱 拆箱 枚举 注解 多态
    Spring Security 内置过滤器表
    Spring Boot入门 and Spring Boot与ActiveMQ整合
    消息中间件解决方案JMS
    网页静态化解决方案-Freemarker demo+语法
    spring-data-radis错误
    java基础总结
    swift oc 混编的头文件
  • 原文地址:https://www.cnblogs.com/sunxucool/p/4562842.html
Copyright © 2011-2022 走看看