zoukankan      html  css  js  c++  java
  • 多线程批量执行等待全部结果

    来自:http://blog.csdn.net/wxwzy738/article/details/8497853

           http://blog.csdn.net/cutesource/article/details/6061229

    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CompletionService;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorCompletionService;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Test17 {
        public static void main(String[] args) throws Exception {
            Test17 t = new Test17();
            t.count1();
            t.count2();
        }
    //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理
        public void count1() throws Exception{
            ExecutorService exec = Executors.newCachedThreadPool();
            BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();
            for(int i=0; i<10; i++){
                Future<Integer> future =exec.submit(getTask());
                queue.add(future);
            }
            int sum = 0;
            int queueSize = queue.size();
            for(int i=0; i<queueSize; i++){
                sum += queue.take().get();
            }
            System.out.println("总数为:"+sum);
            exec.shutdown();
        }
    //使用CompletionService(完成服务)保持Executor处理的结果
        public void count2() throws InterruptedException, ExecutionException{
            ExecutorService exec = Executors.newCachedThreadPool();
            CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);
            for(int i=0; i<10; i++){
                execcomp.submit(getTask());
            }
            int sum = 0;
            for(int i=0; i<10; i++){
    //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。
                Future<Integer> future = execcomp.take();
                sum += future.get();
            }
            System.out.println("总数为:"+sum);
            exec.shutdown();
        }
        //得到一个任务
        public Callable<Integer> getTask(){
            final Random rand = new Random();
            Callable<Integer> task = new Callable<Integer>(){
                @Override
                public Integer call() throws Exception {
                    int i = rand.nextInt(10);
                    int j = rand.nextInt(10);
                    int sum = i*j;
                    System.out.print(sum+"	");
                    return sum;
                }
            };
            return task;
        }
        /**
         * 执行结果:
            6    6    14    40    40    0    4    7    0    0    总数为:106
            12    6    12    54    81    18    14    35    45    35    总数为:312
         */
    }

    先看一下新建一个ThreadPoolExecutor的构建参数:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

    看这个参数很容易让人以为是线程池里保持corePoolSize个线程,如果不够用,就加线程入池直至maximumPoolSize大小,如果 还不够就往workQueue里加,如果workQueue也不够就用RejectedExecutionHandler来做拒绝处理。

    但实际情况不是这样,具体流程如下:

    1)当池子大小小于corePoolSize就新建线程,并处理请求

    2)当池子大小等于corePoolSize,把请求放入workQueue中,池子里的空闲线程就去从workQueue中取任务并处理

    3)当workQueue放不下新入的任务时,新建线程入池,并处理请求,如果池子大小撑到了maximumPoolSize就用RejectedExecutionHandler来做拒绝处理

    4)另外,当池子的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁

    内部结构如下所示:

    从中可以发现ThreadPoolExecutor就是依靠BlockingQueue的阻塞机制来维持线程池,当池子里的线程无事可干的时候就通过workQueue.take()阻塞住。

    其实可以通过Executes来学学几种特殊的ThreadPoolExecutor是如何构建的。

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    newFixedThreadPool就是一个固定大小的ThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    newCachedThreadPool比较适合没有固定大小并且比较快速就能完成的小任务,没必要维持一个Pool,这比直接new Thread来处理的好处是能在60秒内重用已创建的线程。

    其他类型的ThreadPool看看构建参数再结合上面所说的特性就大致知道它的特性

  • 相关阅读:
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 455 分发饼干
    Java实现 LeetCode 454 四数相加 II
    Java实现 LeetCode 454 四数相加 II
    Java实现 LeetCode 454 四数相加 II
    FFmpeg解码H264及swscale缩放详解
    linux中cat more less head tail 命令区别
    C语言字符串操作总结大全(超详细)
    如何使用eclipse进行嵌入式Linux的开发
  • 原文地址:https://www.cnblogs.com/sunxucool/p/4562842.html
Copyright © 2011-2022 走看看