zoukankan      html  css  js  c++  java
  • Java 并发编程中的 Executor 框架与线程池

    Java 5 开始引入 Conccurent 软件包,提供完备的并发能力,对线程池有了更好的支持。其中,Executor 框架是最值得称道的。

    Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。

    一、创建线程池

    Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

    public static ExecutorService newFixedThreadPool(int nThreads)

    创建固定数目线程的线程池。

    public static ExecutorService newCachedThreadPool()

    创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

    public static ExecutorService newSingleThreadExecutor()

    创建一个单线程化的Executor。

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)

    创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

     1 Executor executor = Executors.newFixedThreadPool(10);  
     2 Runnable task = new Runnable() {  
     3 
     4     @Override  
     5 
     6     public void run() {  
     7 
     8         System.out.println("task over");  
     9 
    10     }  
    11 
    12 };  
    13 
    14 executor.execute(task);  

    或者

    1 executor = Executors.newScheduledThreadPool(10);  
    2 ScheduledExecutorService scheduler = (ScheduledExecutorService) executor;  
    3 scheduler.scheduleAtFixedRate(task, 10, 10, TimeUnit.SECONDS);  

     

    二、ExecutorService与生命周期

    ExecutorService扩展了Executor并添加了一些生命周期管理的方法。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止 。Executor创建时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。

    如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。

     1 ExecutorService executorService = (ExecutorService) executor;  
     2 
     3 while (!executorService.isShutdown()) {  
     4 
     5     try {  
     6 
     7         executorService.execute(task);  
     8 
     9     } catch (RejectedExecutionException ignored) {  
    10 
    11           
    12 
    13     }  
    14 
    15 }  
    16 
    17 executorService.shutdown();  

     三、使用Callable,Future返回结果

    Future代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask实现了Future和Runable。Callable代表一个有返回值得操作。

     1 Callable func = new Callable(){  
     2 
     3     public Integer call() throws Exception {  
     4 
     5         System.out.println("inside callable");  
     6 
     7         Thread.sleep(1000);  
     8 
     9         return new Integer(8);  
    10 
    11     }         
    12 
    13 };        
    14 
    15 FutureTask futureTask  = new FutureTask(func);  
    16 
    17 Thread newThread = new Thread(futureTask);  
    18 
    19 newThread.start();  
    20 
    21   
    22 
    23 try {  
    24 
    25     System.out.println("blocking here");  
    26 
    27     Integer result = futureTask.get();  
    28 
    29     System.out.println(result);  
    30 
    31 } catch (InterruptedException ignored) {  
    32 
    33 } catch (ExecutionException ignored) {  
    34 
    35 }  

     ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

    例子:并行计算数组的和。

      1 package executorservice;  
      2 
      3   
      4 
      5 import java.util.ArrayList;  
      6 
      7 import java.util.List;  
      8 
      9 import java.util.concurrent.Callable;  
     10 
     11 import java.util.concurrent.ExecutionException;  
     12 
     13 import java.util.concurrent.ExecutorService;  
     14 
     15 import java.util.concurrent.Executors;  
     16 
     17 import java.util.concurrent.Future;  
     18 
     19 import java.util.concurrent.FutureTask;  
     20 
     21   
     22 
     23 public class ConcurrentCalculator {  
     24 
     25   
     26 
     27     private ExecutorService exec;  
     28 
     29     private int cpuCoreNumber;  
     30 
     31     private List> tasks = new ArrayList>();  
     32 
     33   
     34 
     35     // 内部类  
     36 
     37     class SumCalculator implements Callable {  
     38 
     39         private int[] numbers;  
     40 
     41         private int start;  
     42 
     43         private int end;  
     44 
     45   
     46 
     47         public SumCalculator(final int[] numbers, int start, int end) {  
     48 
     49             this.numbers = numbers;  
     50 
     51             this.start = start;  
     52 
     53             this.end = end;  
     54 
     55         }  
     56 
     57   
     58 
     59         public Long call() throws Exception {  
     60 
     61             Long sum = 0l;  
     62 
     63             for (int i = start; i < end; i++) {  
     64 
     65                 sum += numbers[i];  
     66 
     67             }  
     68 
     69             return sum;  
     70 
     71         }  
     72 
     73     }  
     74 
     75   
     76 
     77     public ConcurrentCalculator() {  
     78 
     79         cpuCoreNumber = Runtime.getRuntime().availableProcessors();  
     80 
     81         exec = Executors.newFixedThreadPool(cpuCoreNumber);  
     82 
     83     }  
     84 
     85   
     86 
     87     public Long sum(final int[] numbers) {  
     88 
     89         // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor  
     90 
     91         for (int i = 0; i < cpuCoreNumber; i++) {  
     92 
     93             int increment = numbers.length / cpuCoreNumber + 1;  
     94 
     95             int start = increment * i;  
     96 
     97             int end = increment * i + increment;  
     98 
     99             if (end > numbers.length)  
    100 
    101                 end = numbers.length;  
    102 
    103             SumCalculator subCalc = new SumCalculator(numbers, start, end);  
    104 
    105             FutureTask task = new FutureTask(subCalc);  
    106 
    107             tasks.add(task);  
    108 
    109             if (!exec.isShutdown()) {  
    110 
    111                 exec.submit(task);  
    112 
    113             }  
    114 
    115         }  
    116 
    117         return getResult();  
    118 
    119     }  
    120 
    121   
    122 
    123       
    124 
    125     public Long getResult() {  
    126 
    127         Long result = 0l;  
    128 
    129         for (Future task : tasks) {  
    130 
    131             try {  
    132 
    133                 // 如果计算未完成则阻塞  
    134 
    135                 Long subSum = task.get();  
    136 
    137                 result += subSum;  
    138 
    139             } catch (InterruptedException e) {  
    140 
    141                 e.printStackTrace();  
    142 
    143             } catch (ExecutionException e) {  
    144 
    145                 e.printStackTrace();  
    146 
    147             }  
    148 
    149         }  
    150 
    151         return result;  
    152 
    153     }  
    154 
    155   
    156 
    157     public void close() {  
    158 
    159         exec.shutdown();  
    160 
    161     }  
    162 
    163 }  

     Main

    1 int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };  
    2 
    3 ConcurrentCalculator calc = new ConcurrentCalculator();  
    4 
    5 Long sum = calc.sum(numbers);  
    6 
    7 System.out.println(sum);  
    8 
    9 calc.close();  
     四、CompletionService

    在刚在的例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意字任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使CompletionService。生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService的take方法是,会返回按完成顺序放回任务的结果,CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞。修改刚才的例子使用CompletionService:

      1 public class ConcurrentCalculator2 {  
      2 
      3   
      4 
      5     private ExecutorService exec;  
      6 
      7     private CompletionService completionService;  
      8 
      9   
     10 
     11   
     12 
     13     private int cpuCoreNumber;  
     14 
     15   
     16 
     17     // 内部类  
     18 
     19     class SumCalculator implements Callable {  
     20 
     21         ......  
     22 
     23     }  
     24 
     25   
     26 
     27     public ConcurrentCalculator2() {  
     28 
     29         cpuCoreNumber = Runtime.getRuntime().availableProcessors();  
     30 
     31         exec = Executors.newFixedThreadPool(cpuCoreNumber);  
     32 
     33         completionService = new ExecutorCompletionService(exec);  
     34 
     35   
     36 
     37   
     38 
     39     }  
     40 
     41   
     42 
     43     public Long sum(final int[] numbers) {  
     44 
     45         // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor  
     46 
     47         for (int i = 0; i < cpuCoreNumber; i++) {  
     48 
     49             int increment = numbers.length / cpuCoreNumber + 1;  
     50 
     51             int start = increment * i;  
     52 
     53             int end = increment * i + increment;  
     54 
     55             if (end > numbers.length)  
     56 
     57                 end = numbers.length;  
     58 
     59             SumCalculator subCalc = new SumCalculator(numbers, start, end);   
     60 
     61             if (!exec.isShutdown()) {  
     62 
     63                 completionService.submit(subCalc);  
     64 
     65   
     66 
     67   
     68 
     69             }  
     70 
     71               
     72 
     73         }  
     74 
     75         return getResult();  
     76 
     77     }  
     78 
     79   
     80 
     81       
     82 
     83     public Long getResult() {  
     84 
     85         Long result = 0l;  
     86 
     87         for (int i = 0; i < cpuCoreNumber; i++) {              
     88 
     89             try {  
     90 
     91                 Long subSum = completionService.take().get();  
     92 
     93                 result += subSum;             
     94 
     95             } catch (InterruptedException e) {  
     96 
     97                 e.printStackTrace();  
     98 
     99             } catch (ExecutionException e) {  
    100 
    101                 e.printStackTrace();  
    102 
    103             }  
    104 
    105         }  
    106 
    107         return result;  
    108 
    109     }  
    110 
    111   
    112 
    113     public void close() {  
    114 
    115         exec.shutdown();  
    116 
    117     }  
    118 
    119 }  
  • 相关阅读:
    [Step By Step]使用SAP Business Objects Data Services将文本文件数据导入到SAP HANA中 沧海
    [Step By Step]如何在SAP Business Object Data Services中连接到SAP HANA 数据库 沧海
    [Step By Step]使用SAP Business Objects Data Services将XML数据导入到SAP HANA中(XML DTD) 沧海
    [Step By Step]使用SAP Business Objects Data Services将Excel数据导入到SAP HANA中 沧海
    [Step By Step]SAP HANA PAL KNN 近邻预测分析K Nearest Neighbor编程实例KNN 沧海
    [Step By Step]在SAP Business Objects Data Services中使用Table Comparison Transform表比较功能并导入到SAP HANA中(Table Comparison Transform) 沧海
    [Step By Step]SAP HANA PAL逻辑回归预测分析Logistic Regression编程实例LOGISTICREGRESSION(模型) 沧海
    Media Convert媒体转换工具发布
    WPF中template的区别
    Bambook大赛程序终于发布了
  • 原文地址:https://www.cnblogs.com/cyberniuniu/p/5021944.html
Copyright © 2011-2022 走看看