zoukankan      html  css  js  c++  java
  • CompletionService 和ExecutorService的区别和用法

    Java SE5的Java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和执行任务之间提供了一个间接层,Executor代替客户端执行任务。Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期。Executor在Java SE5/6中时启动任务的优选方法。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等


    创建线程池

    Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

    public static ExecutorService newFixedThreadPool(int nThreads)

    创建固定数目线程的线程池。

    public static ExecutorService newCachedThreadPool()

    创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

    public static ExecutorService newSingleThreadExecutor()

    创建一个单线程化的Executor。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

    见类图,接口Executor只有一个方法execute,接口ExecutorService扩展了Executor并添加了一些生命周期管理的方法,如shutdown、submit等。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。

    Callable,Future用于返回结果

    Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。

    实例:用ExecutorService  实现对一个大数组并行求和

    1. package executor;  
    2.   
    3. import java.util.*;  
    4. import java.util.concurrent.*;  
    5.   
    6. /* 
    7.  * 并行计算求和. 
    8.  * 本例中,把一个整数数组的求和分解到每个线程中,每个线程求该数值的部分和, 
    9.  * 然后主程序把各个和再次求和就能得到最后的数字。从这个架构上跟mapreduce有点神似。 
    10.  *  
    11.  */  
    12.   
    13. public class ExecutorServiceParalelSumdemo {  
    14.       
    15.     private int coreCpuNum;     
    16.     private ExecutorService  executor;     
    17.       
    18.     /*  
    19.      * save the result of each thread's sum calculation 
    20.      *  
    21.      */  
    22.     private List<FutureTask<Long>> tasks = new ArrayList<FutureTask<Long>>();    
    23.       
    24.     public ExecutorServiceParalelSumdemo(){     
    25.         coreCpuNum = Runtime.getRuntime().availableProcessors();     
    26.         System.out.println("this host has "+coreCpuNum+ " CPU(s)");  
    27.           
    28.         //for before Java 8.0  
    29.         //executor = Executors.newFixedThreadPool(coreCpuNum);     
    30.           
    31.         //this CPU parallelism API is Java8 or later ONLY   
    32.         executor = Executors.newWorkStealingPool(coreCpuNum);     
    33.     }   
    34.       
    35.     /* 
    36.      * thread main body 
    37.      */  
    38.     class CalculatorTask implements Callable<Long>{     
    39.         int nums[];     
    40.         int start;     
    41.         int end;     
    42.         public CalculatorTask(final int nums[],int start,int end){     
    43.             this.nums = nums;     
    44.             this.start = start;     
    45.             this.end = end;     
    46.         }  
    47.           
    48.         @Override    
    49.         public Long call() throws Exception {     
    50.             long sum =0;     
    51.             for(int i=start;i<end;i++){     
    52.                 sum += nums[i];     
    53.             }     
    54.             
    55.             return sum;     
    56.         }     
    57.     }    
    58.       
    59.     private long getFinalSum(){     
    60.         long sum = 0;     
    61.         System.out.println(tasks.size() + " future tasks in pool");  
    62.         for(int i=0;i<tasks.size();i++){     
    63.             try {     
    64.                 /* 
    65.                  * If this future's thread not return its result, 
    66.                  * get() will block here. So perf issue introduced. 
    67.                  * we can use CompletionService to solve this potential issue. 
    68.                 */   
    69.                 sum += tasks.get(i).get();     
    70.             } catch (InterruptedException e) {     
    71.                 e.printStackTrace();     
    72.             } catch (ExecutionException e) {     
    73.                 e.printStackTrace();     
    74.             }     
    75.         }     
    76.         return sum;     
    77.     }  
    78.       
    79.     public long ParallelSum(int[] nums){     
    80.         int start,end,increment;     
    81.           
    82.         // 根据CPU核心个数拆分任务,创建每个thread和对应的 FutureTask,并提交到ExecutorService中。      
    83.         for(int i=0;i<coreCpuNum;i++) {   
    84.             increment = (nums.length/coreCpuNum)+1;     
    85.             start = i*increment;     
    86.             end = start+increment;     
    87.             if(end > nums.length){     
    88.                 end = nums.length;      
    89.             }     
    90.        
    91.             //create thread tasks  
    92.             CalculatorTask calculator = new CalculatorTask(nums, start, end);    
    93.             //create each future result per thread task  
    94.             FutureTask<Long> task = new FutureTask<Long>(calculator);     
    95.             tasks.add(task);    
    96.               
    97.             if(!executor.isShutdown()){  
    98.                 //execute() can't return result  
    99.                 executor.submit(task);     
    100.             }     
    101.         }    
    102.           
    103.         return getFinalSum();     
    104.     }     
    105.       
    106.     public void close(){     
    107.         executor.shutdown();     
    108.     }     
    109. }  
    CompletionService

    在上述例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使用CompletionService。

    它与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。

    CompletionService版本的求和例子

    1. package executor;  
    2.   
    3. import java.util.*;  
    4. import java.util.concurrent.*;  
    5.   
    6. public class CompletionServiceDemo {  
    7.       
    8.     /* 
    9.      * 并行计算求和. 
    10.      * 本例中,把一个整数数组的求和分解到每个线程中,每个线程求该数值的部分和, 
    11.      * 然后主程序把各个和再次求和就能得到最后的数字。从这个架构上跟mapreduce有点神似。 
    12.      *  
    13.      */  
    14.       
    15.         private int coreCpuNum;     
    16.         private ExecutorService  executor;  
    17.         /* 
    18.          * CompletionService与ExecutorService最主要的区别在于 
    19.          *前者submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装, 
    20.          *内部维护一个保存Future对象的BlockingQueue。 
    21.          *只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。 
    22.          *它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。 
    23.          *所以,先完成的必定先被取出。这样就减少了不必要的等待时间。 
    24.          *  
    25.          */  
    26.         /*  
    27.          * CompletionService has a internal bloking queue to save the result of each  
    28.          * thread's sum calculation. so List<FutureTask<Long>> tasks appears unnecessary now 
    29.          *  
    30.          */  
    31.         private CompletionService<Long> mcs;  
    32.         /*  
    33.          * save the result of each thread's sum calculation 
    34.          *  
    35.          */  
    36.          
    37.         public CompletionServiceDemo(){     
    38.             coreCpuNum = Runtime.getRuntime().availableProcessors();     
    39.               
    40.             System.out.println("this host has "+coreCpuNum+ " CPU(s)");  
    41.               
    42.             //for before Java 8.0  
    43.             //executor = Executors.newFixedThreadPool(coreCpuNum);     
    44.               
    45.             //this CPU parallelism API is Java8 or later ONLY   
    46.             executor = Executors.newWorkStealingPool(coreCpuNum);     
    47.             mcs=new ExecutorCompletionService<>(executor);    
    48.         }   
    49.           
    50.         /* 
    51.          * thread main body 
    52.          */  
    53.         class CalculatorTask implements Callable<Long>{     
    54.             int nums[];     
    55.             int start;     
    56.             int end;     
    57.             public CalculatorTask(final int nums[],int start,int end){     
    58.                 this.nums = nums;     
    59.                 this.start = start;     
    60.                 this.end = end;     
    61.             }  
    62.               
    63.             @Override    
    64.             public Long call() throws Exception {     
    65.                 long sum =0;     
    66.                 for(int i=start;i<end;i++){     
    67.                     sum += nums[i];     
    68.                 }     
    69.                  
    70.                 return sum;     
    71.             }     
    72.         }    
    73.           
    74.         private long getFinalSum(){     
    75.             long sum = 0;             
    76.             for(int i=0;i<coreCpuNum;i++){     
    77.                 try {     
    78.                 /* 
    79.                  * get one complete result from CompletionServer internal  
    80.                  * blocking queue 
    81.                  */  
    82.                 sum += mcs.take().get();     
    83.                 } catch (InterruptedException e) {     
    84.                     e.printStackTrace();     
    85.                 } catch (ExecutionException e) {     
    86.                     e.printStackTrace();     
    87.                 }     
    88.             }     
    89.             return sum;      
    90.         }  
    91.           
    92.         public long ParallelSum(int[] nums){     
    93.             int start,end,increment;     
    94.               
    95.             // 根据CPU核心个数拆分任务,创建每个thread和对应的 FutureTask,并提交到ExecutorService中。      
    96.             for(int i=0;i<coreCpuNum;i++) {   
    97.                 increment = (nums.length/coreCpuNum)+1;     
    98.                 start = i*increment;     
    99.                 end = start+increment;     
    100.                 if(end > nums.length){     
    101.                     end = nums.length;      
    102.                 }     
    103.                 //create thread tasks  
    104.                 CalculatorTask mthread = new CalculatorTask(nums, start, end);                        
    105.                 if(!executor.isShutdown()){  
    106.                     mcs.submit(mthread);     
    107.                 }     
    108.             }   
    109.               
    110.             return getFinalSum();     
    111.         }     
    112.           
    113.         public void close(){     
    114.             executor.shutdown();     
    115.         }     
    116. }  
    测试main方法:
    1. package executor;  
    2.   
    3. public class MainTest {  
    4.     public static void main(String[] args) {          
    5.         System.out.println("ExcutorServer thread pool demo show");  
    6.         int[] nums={12890,345678,2345,5678,865,234,3434,454,4656,67678,678,1234,6789};  
    7.         ExecutorServiceParalelSumdemo mysum=new ExecutorServiceParalelSumdemo();  
    8.           
    9.         System.out.println("result per ExecutorServiceParalelSumdemo = "  
    10.                           +mysum.ParallelSum(nums));  
    11.           
    12.         System.out.println("CompletionServiceDemo thread pool demo show");  
    13.         CompletionServiceDemo mcom=new CompletionServiceDemo();  
    14.         System.out.println("result per CompletionServiceDemo = "  
    15.                          +mcom.ParallelSum(nums));  
    16.     }  
    17. }  

    输出:

    ExcutorServer thread pool demo show
    this host has 4 CPU(s)
    4 future tasks in pool
    result per ExecutorServiceParalelSumdemo = 452613
    CompletionServiceDemo thread pool demo show
    this host has 4 CPU(s)
    result per CompletionServiceDemo = 452613

  • 相关阅读:
    pytest实际编码中特殊问题的解决
    python+locust性能测试实例
    python使用eval动态调用函数及其在测试用例中断言的应用
    python中列表生成式的两种用法
    pycharm中的Terminal无法使用git命令
    ruamel.yaml的使用
    pip安装模块失败如何解决
    locust的setup等相关函数的使用
    python模块-optparse(解析命令行参数)
    (转)locust源码目录结构及模块作用
  • 原文地址:https://www.cnblogs.com/Free-Thinker/p/6231671.html
Copyright © 2011-2022 走看看