zoukankan      html  css  js  c++  java
  • 使用CompletionService批处理任务(线程池阻塞线程)

    CompletionService ExecutorService BlockingQueueFuture[若要实际应用可参照本文后面的工具类]

    如果你向Executor提交了一个批处理任务,并且希望在它们完成后获得结果。为此你可以保存与每个任务相关联的Future,然后不断地调用timeout为零的get,来检验Future是否完成。这样做固然可以,但却相当乏味。幸运的是,还有一个更好的方法:完成服务  (Completion service)。

    CompletionService整合了Executor和BlockingQueue的功能。你可以将Callable任务提交给它去执行,然后使用类似于队列中的take和poll方法,在结果完整可用时获得这个结果,像一个打包的Future。ExecutorCompletionService是实现CompletionService接口的一个类,并将计算任务委托给一个Executor。

    ExecutorCompletionService的实现相当直观。它在构造函数中创建一个BlockingQueue,用它去保持完成的结果。计算完成时会调用FutureTask中的done方法。当提交一个任务后,首先把这个任务包装为一个QueueingFuture,它是FutureTask的一个子类,然后覆写done方法,将结果置入BlockingQueue中,take和poll方法委托给了BlockingQueue,它会在结果不可用时阻塞。

     1 import java.util.Random;  
     2 import java.util.concurrent.BlockingQueue;  
     3 import java.util.concurrent.Callable;  
     4 import java.util.concurrent.CompletionService;  
     5 import java.util.concurrent.ExecutionException;  
     6 import java.util.concurrent.ExecutorCompletionService;  
     7 import java.util.concurrent.ExecutorService;  
     8 import java.util.concurrent.Executors;  
     9 import java.util.concurrent.Future;  
    10 import java.util.concurrent.LinkedBlockingQueue;  
    11   
    12 public class Test17 {  
    13     public static void main(String[] args) throws Exception {  
    14         Test17 t = new Test17();  
    15         t.count1();  
    16         t.count2();  
    17     }  
    18     //使用阻塞容器保存每次Executor处理的结果,在后面进行统一处理  
    19     public void count1() throws Exception{  
    20         ExecutorService exec = Executors.newCachedThreadPool();  
    21         BlockingQueue<Future<Integer>> queue = new LinkedBlockingQueue<Future<Integer>>();  
    22         for(int i=0; i<10; i++){  
    23             Future<Integer> future =exec.submit(getTask());  
    24             queue.add(future);  
    25         }  
    26         int sum = 0;  
    27         int queueSize = queue.size();  
    28         for(int i=0; i<queueSize; i++){  
    29             sum += queue.take().get();  
    30         }  
    31         System.out.println("总数为:"+sum);  
    32         exec.shutdown();  
    33     }  
    34     //使用CompletionService(完成服务)保持Executor处理的结果  
    35     public void count2() throws InterruptedException, ExecutionException{  
    36         ExecutorService exec = Executors.newCachedThreadPool();  
    37         CompletionService<Integer> execcomp = new ExecutorCompletionService<Integer>(exec);  
    38         for(int i=0; i<10; i++){  
    39             execcomp.submit(getTask());  
    40         }  
    41         int sum = 0;  
    42         for(int i=0; i<10; i++){  
    43             //检索并移除表示下一个已完成任务的 Future,如果目前不存在这样的任务,则等待。  
    44             Future<Integer> future = execcomp.take();  
    45             sum += future.get();  
    46         }  
    47         System.out.println("总数为:"+sum);  
    48         exec.shutdown();  
    49     }  
    50     //得到一个任务  
    51     public Callable<Integer> getTask(){  
    52         final Random rand = new Random();  
    53         Callable<Integer> task = new Callable<Integer>(){  
    54             @Override  
    55             public Integer call() throws Exception {  
    56                 int i = rand.nextInt(10);  
    57                 int j = rand.nextInt(10);  
    58                 int sum = i*j;  
    59                 System.out.print(sum+"	");  
    60                 return sum;  
    61             }  
    62         };  
    63         return task;  
    64     }  
    65     /** 
    66      * 执行结果: 
    67      *   6   6   14  40  40  0   4   7   0   0   总数为:106 
    68      *  12  6   12  54  81  18  14  35  45  35  总数为:312 
    69      */  
    70 }  

    ExecutorCompletionService统一了ExecutorService和BlockingQueue,既有线程池功能,能提交任务,又有阻塞队列功能,能判断所有线程的执行结果。

    对比:Callable

    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
     
    /*
     * 一、创建执行线程的方式三:实现 Callable 接口。 相较于实现 Runnable 接口的方式,方法可以有返回值,并且可以抛出异常。
     *
     * 二、执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。  FutureTask 是  Future 接口的实现类
     */
    public class TestCallable {
     
        public static void main(String[] args) {
            ThreadDemo td = new ThreadDemo();
     
            //1.执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收运算结果。
            FutureTask<Integer> result = new FutureTask<>(td);
     
            new Thread(result).start();
     
            //2.接收线程运算后的结果
            try {
                Integer sum = result.get();  //FutureTask 可用于 闭锁 类似于CountDownLatch的作用,在所有的线程没有执行完成之后这里是不会执行的
                System.out.println(sum);
                System.out.println("------------------------------------");
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
     
    }
     
    class ThreadDemo implements Callable<Integer> {
     
        @Override
        public Integer call() throws Exception {
            int sum = 0;
     
            for (int i = 0; i <= 100000; i++) {
                sum += i;
            }
     
            return sum;
        }
     
    }
    
    
    
    
    综上例子可以看到: Callable 和 Future接口的区别
    
      (1)Callable规定的方法是call(),而Runnable规定的方法是run(). 
      (2)Callable的任务执行后可返回值,而Runnable的任务是不能返回值的。  
      (3)call()方法可抛出异常,而run()方法是不能抛出异常的。 
      (4)运行Callable任务可拿到一个Future对象, Future表示异步计算的结果。 
           它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。 
           通过Future对象可了解任务执行情况,可取消任务的执行,还可获取任务执行的结果。 
           Callable是类似于Runnable的接口,实现Callable接口的类和实现Runnable的类都是可被其它线程执行的任务。 

     最后来一个终极封装版, 方便以后使用:

    package cn.com.util;
    
    import java.util.*;
    import java.util.concurrent.*;
    
    public class CompletionServiceUtils {
    
        private static final int nThreads = 10;
        private static final ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
        private static final CompletionService completionService = new ExecutorCompletionService(executorService);
        private static final ThreadLocal<Integer> tLocal = new ThreadLocal<>();
        public static final ThreadLocal<Map<Object, Object>> tLocalMemory = ThreadLocal.withInitial(() -> new HashMap<>());
    
    
        /**
         * 适用于循环每次提交一个,每次传递参数不一样的多个相同逻辑的任务
         * 主线程出使用 ThreadLocalUtils.initInhert(), 子线程方可访问主线程公共参数, 每个线程的独有参数,需要在Callable里面设定
         * @param batchNum 任务个数
         * @param call 任务逻辑方法
         * @param <T>
         */
        public static <T> void submitOne(int batchNum, Callable<T> call) {
            if(Objects.isNull(tLocal.get())) {
                tLocal.set(batchNum);
            }
            completionService.submit(call);
        }
    
        /**
         * 适用于处理多个相同逻辑的任务
         * @param batchNum 任务个数
         * @param call 任务逻辑方法
         * @param <T>
         */
        public static <T> void submitBatch(int batchNum, Callable<T> call) {
            tLocal.set(batchNum);
            for (int i = 0; i < batchNum; i++) {
                completionService.submit(call);
            }
        }
    
        /**
         * 适用于处理一个或多个不同相同逻辑的任务
         * @param call 任务逻辑方法
         * @param <T>
         */
        public static <T> void submitBatch(Callable<T>...call) {
            tLocal.set(call.length);
            for (int i = 0; i < call.length; i++) {
                completionService.submit(call[i]);
            }
        }
    
        /**
         * 所有任务运行完毕结果归集
         * @param <T>
         * @return
         */
        public static <T> List<T> take() {
            try {
                List<T> tList = new ArrayList<>();
                Integer taskSize = tLocal.get();
                for (int i = 0; i < taskSize; i++) {
                    Future<T> future = completionService.take();
                    T t = future.get(5, TimeUnit.SECONDS);
                    tList.add(t);
                }
                return tList;
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                clear();
            }
        }
    
        public static void clear() {
            tLocal.remove();
            tLocalMemory.remove();
        }
    
        /**
         * 关闭线程池(实际使用时不必每次shutdown, 否则后续使用会报错)
         */
        public static void shutdown() {
            executorService.shutdown();
        }
    
    }

    使用示例:

    // 示例一
        public static void runTask0() {
            // 记录main运行开始时间
            long start = System.currentTimeMillis();
            // 累加多线程每个任务的时间
            long timeCount = 0;
            // 提交20个相同的任务
            CompletionServiceUtils.submit(20, () -> {
                long begin = System.currentTimeMillis();
                System.out.println("begin");
                Thread.sleep(2000);
                System.out.println("end");
                // 返回运行时间
                return (System.currentTimeMillis() - begin) / 1000;
            });
            // 等待全部任务执行完毕(不需要等待可省略),并返回所有任务的结果(不需要接收返回值可省略)
            List<Long> take = CompletionServiceUtils.take();
            // 自定义结果处理(不需要处理可省略)
            for (Long t : take) {
                timeCount += t;
            }
            // 打印多线程每个任务的时间的累加值
            System.out.println("done   timeCount = " + timeCount);
            // 打印主线程运行时间
            System.out.println("main" + "-[ Thread ID: " + Thread.currentThread().getId() + "]-" + "runTime = " + (System.currentTimeMillis() - start) / 1000);
            // 实际使用时不必shutdown, 否则后续使用会报错
            CompletionServiceUtils.shutdown();
        }
    
        // 示例二
        public static void runTask1() throws Exception {
            // 记录main运行开始时间
            long start = System.currentTimeMillis();
            // 提交2个不同的任务
            CompletionServiceUtils.submit(() -> {
                return 28;
            },() -> {
                return "你好!";
            });
            // 等待全部任务执行完毕(不需要等待可省略),并返回所有任务的结果(不需要接收返回值可省略)
            List<Object> take = CompletionServiceUtils.take();
            // 自定义结果处理(不需要处理可省略)
            for (Object t : take) {
                // 打印每次任务的结果
                System.out.println(t);
            }
            // 打印主线程运行时间
            System.out.println("main" + "-[ Thread ID: " + Thread.currentThread().getId() + "]-" + "runTime = " + (System.currentTimeMillis() - start) / 1000);
            // 实际使用时不必shutdown, 否则后续使用会报错
            CompletionServiceUtils.shutdown();
        }
  • 相关阅读:
    这篇文章把短线操作技巧以及交易原则讲透了!
    炒股老手的妙招——卖出在周K线的顶部
    连续多阳低吸买入法需要哪些条件?
    2个实盘案例带你了解南玻量态选庄股法!
    如何通过筹码分布判断一个股票要不要做?
    股票指标公式高手是怎么通过指标变现的?
    股票回踩均线是什么意思?
    定宽不定宽盒子的居中解决办法和外边距塌陷问题
    JS 数据类型
    JS 事件总结
  • 原文地址:https://www.cnblogs.com/bevis-byf/p/11658643.html
Copyright © 2011-2022 走看看