zoukankan      html  css  js  c++  java
  • CompletionService 和ExecutorService的区别和用法

    Java SE5的Java.util.concurrent包中的执行器(Executor)将为你管理Thread对象,从而简化了并发编程。Executor在客户端和执行任务之间提供了一个间接层,Executor代替客户端执行任务。Executor允许你管理异步任务的执行,而无须显式地管理线程的生命周期。Executor在Java SE5/6中时启动任务的优选方法。Executor引入了一些功能类来管理和使用线程Thread,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等


    创建线程池

    Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

    public static ExecutorService newFixedThreadPool(int nThreads)

    创建固定数目线程的线程池。

    public static ExecutorService newCachedThreadPool()

    创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

    public static ExecutorService newSingleThreadExecutor()

    创建一个单线程化的Executor。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

    见类图,接口Executor只有一个方法execute,接口ExecutorService扩展了Executor并添加了一些生命周期管理的方法,如shutdown、submit等。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止。

    Callable,Future用于返回结果

    Future<V>代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask<V>实现了Future<V>和Runable<V>。Callable代表一个有返回值得操作。

    实例:用ExecutorService  实现对一个大数组并行求和

    1. package executor;  
    2.   
    3. import java.util.*;  
    4. import java.util.concurrent.*;  
    5.   
    6. /* 
    7.  * 并行计算求和. 
    8.  * 本例中,把一个整数数组的求和分解到每个线程中,每个线程求该数值的部分和, 
    9.  * 然后主程序把各个和再次求和就能得到最后的数字。从这个架构上跟mapreduce有点神似。 
    10.  *  
    11.  */  
    12.   
    13. public class ExecutorServiceParalelSumdemo {  
    14.       
    15.     private int coreCpuNum;     
    16.     private ExecutorService  executor;     
    17.       
    18.     /*  
    19.      * save the result of each thread's sum calculation 
    20.      *  
    21.      */  
    22.     private List<FutureTask<Long>> tasks = new ArrayList<FutureTask<Long>>();    
    23.       
    24.     public ExecutorServiceParalelSumdemo(){     
    25.         coreCpuNum = Runtime.getRuntime().availableProcessors();     
    26.         System.out.println("this host has "+coreCpuNum+ " CPU(s)");  
    27.           
    28.         //for before Java 8.0  
    29.         //executor = Executors.newFixedThreadPool(coreCpuNum);     
    30.           
    31.         //this CPU parallelism API is Java8 or later ONLY   
    32.         executor = Executors.newWorkStealingPool(coreCpuNum);     
    33.     }   
    34.       
    35.     /* 
    36.      * thread main body 
    37.      */  
    38.     class CalculatorTask implements Callable<Long>{     
    39.         int nums[];     
    40.         int start;     
    41.         int end;     
    42.         public CalculatorTask(final int nums[],int start,int end){     
    43.             this.nums = nums;     
    44.             this.start = start;     
    45.             this.end = end;     
    46.         }  
    47.           
    48.         @Override    
    49.         public Long call() throws Exception {     
    50.             long sum =0;     
    51.             for(int i=start;i<end;i++){     
    52.                 sum += nums[i];     
    53.             }     
    54.             
    55.             return sum;     
    56.         }     
    57.     }    
    58.       
    59.     private long getFinalSum(){     
    60.         long sum = 0;     
    61.         System.out.println(tasks.size() + " future tasks in pool");  
    62.         for(int i=0;i<tasks.size();i++){     
    63.             try {     
    64.                 /* 
    65.                  * If this future's thread not return its result, 
    66.                  * get() will block here. So perf issue introduced. 
    67.                  * we can use CompletionService to solve this potential issue. 
    68.                 */   
    69.                 sum += tasks.get(i).get();     
    70.             } catch (InterruptedException e) {     
    71.                 e.printStackTrace();     
    72.             } catch (ExecutionException e) {     
    73.                 e.printStackTrace();     
    74.             }     
    75.         }     
    76.         return sum;     
    77.     }  
    78.       
    79.     public long ParallelSum(int[] nums){     
    80.         int start,end,increment;     
    81.           
    82.         // 根据CPU核心个数拆分任务,创建每个thread和对应的 FutureTask,并提交到ExecutorService中。      
    83.         for(int i=0;i<coreCpuNum;i++) {   
    84.             increment = (nums.length/coreCpuNum)+1;     
    85.             start = i*increment;     
    86.             end = start+increment;     
    87.             if(end > nums.length){     
    88.                 end = nums.length;      
    89.             }     
    90.        
    91.             //create thread tasks  
    92.             CalculatorTask calculator = new CalculatorTask(nums, start, end);    
    93.             //create each future result per thread task  
    94.             FutureTask<Long> task = new FutureTask<Long>(calculator);     
    95.             tasks.add(task);    
    96.               
    97.             if(!executor.isShutdown()){  
    98.                 //execute() can't return result  
    99.                 executor.submit(task);     
    100.             }     
    101.         }    
    102.           
    103.         return getFinalSum();     
    104.     }     
    105.       
    106.     public void close(){     
    107.         executor.shutdown();     
    108.     }     
    109. }  
    CompletionService

    在上述例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使用CompletionService。

    它与ExecutorService最主要的区别在于submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装,内部维护一个保存Future对象的BlockingQueue。只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。所以,先完成的必定先被取出。这样就减少了不必要的等待时间。

    CompletionService版本的求和例子

    1. package executor;  
    2.   
    3. import java.util.*;  
    4. import java.util.concurrent.*;  
    5.   
    6. public class CompletionServiceDemo {  
    7.       
    8.     /* 
    9.      * 并行计算求和. 
    10.      * 本例中,把一个整数数组的求和分解到每个线程中,每个线程求该数值的部分和, 
    11.      * 然后主程序把各个和再次求和就能得到最后的数字。从这个架构上跟mapreduce有点神似。 
    12.      *  
    13.      */  
    14.       
    15.         private int coreCpuNum;     
    16.         private ExecutorService  executor;  
    17.         /* 
    18.          * CompletionService与ExecutorService最主要的区别在于 
    19.          *前者submit的task不一定是按照加入时的顺序完成的。CompletionService对ExecutorService进行了包装, 
    20.          *内部维护一个保存Future对象的BlockingQueue。 
    21.          *只有当这个Future对象状态是结束的时候,才会加入到这个Queue中,take()方法其实就是Producer-Consumer中的Consumer。 
    22.          *它会从Queue中取出Future对象,如果Queue是空的,就会阻塞在那里,直到有完成的Future对象加入到Queue中。 
    23.          *所以,先完成的必定先被取出。这样就减少了不必要的等待时间。 
    24.          *  
    25.          */  
    26.         /*  
    27.          * CompletionService has a internal bloking queue to save the result of each  
    28.          * thread's sum calculation. so List<FutureTask<Long>> tasks appears unnecessary now 
    29.          *  
    30.          */  
    31.         private CompletionService<Long> mcs;  
    32.         /*  
    33.          * save the result of each thread's sum calculation 
    34.          *  
    35.          */  
    36.          
    37.         public CompletionServiceDemo(){     
    38.             coreCpuNum = Runtime.getRuntime().availableProcessors();     
    39.               
    40.             System.out.println("this host has "+coreCpuNum+ " CPU(s)");  
    41.               
    42.             //for before Java 8.0  
    43.             //executor = Executors.newFixedThreadPool(coreCpuNum);     
    44.               
    45.             //this CPU parallelism API is Java8 or later ONLY   
    46.             executor = Executors.newWorkStealingPool(coreCpuNum);     
    47.             mcs=new ExecutorCompletionService<>(executor);    
    48.         }   
    49.           
    50.         /* 
    51.          * thread main body 
    52.          */  
    53.         class CalculatorTask implements Callable<Long>{     
    54.             int nums[];     
    55.             int start;     
    56.             int end;     
    57.             public CalculatorTask(final int nums[],int start,int end){     
    58.                 this.nums = nums;     
    59.                 this.start = start;     
    60.                 this.end = end;     
    61.             }  
    62.               
    63.             @Override    
    64.             public Long call() throws Exception {     
    65.                 long sum =0;     
    66.                 for(int i=start;i<end;i++){     
    67.                     sum += nums[i];     
    68.                 }     
    69.                  
    70.                 return sum;     
    71.             }     
    72.         }    
    73.           
    74.         private long getFinalSum(){     
    75.             long sum = 0;             
    76.             for(int i=0;i<coreCpuNum;i++){     
    77.                 try {     
    78.                 /* 
    79.                  * get one complete result from CompletionServer internal  
    80.                  * blocking queue 
    81.                  */  
    82.                 sum += mcs.take().get();     
    83.                 } catch (InterruptedException e) {     
    84.                     e.printStackTrace();     
    85.                 } catch (ExecutionException e) {     
    86.                     e.printStackTrace();     
    87.                 }     
    88.             }     
    89.             return sum;      
    90.         }  
    91.           
    92.         public long ParallelSum(int[] nums){     
    93.             int start,end,increment;     
    94.               
    95.             // 根据CPU核心个数拆分任务,创建每个thread和对应的 FutureTask,并提交到ExecutorService中。      
    96.             for(int i=0;i<coreCpuNum;i++) {   
    97.                 increment = (nums.length/coreCpuNum)+1;     
    98.                 start = i*increment;     
    99.                 end = start+increment;     
    100.                 if(end > nums.length){     
    101.                     end = nums.length;      
    102.                 }     
    103.                 //create thread tasks  
    104.                 CalculatorTask mthread = new CalculatorTask(nums, start, end);                        
    105.                 if(!executor.isShutdown()){  
    106.                     mcs.submit(mthread);     
    107.                 }     
    108.             }   
    109.               
    110.             return getFinalSum();     
    111.         }     
    112.           
    113.         public void close(){     
    114.             executor.shutdown();     
    115.         }     
    116. }  
    测试main方法:
    1. package executor;  
    2.   
    3. public class MainTest {  
    4.     public static void main(String[] args) {          
    5.         System.out.println("ExcutorServer thread pool demo show");  
    6.         int[] nums={12890,345678,2345,5678,865,234,3434,454,4656,67678,678,1234,6789};  
    7.         ExecutorServiceParalelSumdemo mysum=new ExecutorServiceParalelSumdemo();  
    8.           
    9.         System.out.println("result per ExecutorServiceParalelSumdemo = "  
    10.                           +mysum.ParallelSum(nums));  
    11.           
    12.         System.out.println("CompletionServiceDemo thread pool demo show");  
    13.         CompletionServiceDemo mcom=new CompletionServiceDemo();  
    14.         System.out.println("result per CompletionServiceDemo = "  
    15.                          +mcom.ParallelSum(nums));  
    16.     }  
    17. }  

    输出:

    ExcutorServer thread pool demo show
    this host has 4 CPU(s)
    4 future tasks in pool
    result per ExecutorServiceParalelSumdemo = 452613
    CompletionServiceDemo thread pool demo show
    this host has 4 CPU(s)
    result per CompletionServiceDemo = 452613

  • 相关阅读:
    LVS基于DR模式负载均衡的配置
    Linux源码安装mysql 5.6.12 (cmake编译)
    HOSt ip is not allowed to connect to this MySql server
    zoj 3229 Shoot the Bullet(无源汇上下界最大流)
    hdu 3987 Harry Potter and the Forbidden Forest 求割边最少的最小割
    poj 2391 Ombrophobic Bovines(最大流+floyd+二分)
    URAL 1430 Crime and Punishment
    hdu 2048 神、上帝以及老天爷(错排)
    hdu 3367 Pseudoforest(最大生成树)
    FOJ 1683 纪念SlingShot(矩阵快速幂)
  • 原文地址:https://www.cnblogs.com/Free-Thinker/p/6231671.html
Copyright © 2011-2022 走看看