zoukankan      html  css  js  c++  java
  • java并发编程框架 Executor ExecutorService invokeall

    首先介绍两个重要的接口,Executor和ExecutorService,定义如下: 

    Java代码  收藏代码
    1. public interface Executor {  
    2.     void execute(Runnable command);  
    3. }  
    Java代码  收藏代码
    1. public interface ExecutorService extends Executor {  
    2.     //不再接受新任务,待所有任务执行完毕后关闭ExecutorService  
    3.     void shutdown();  
    4.     //不再接受新任务,直接关闭ExecutorService,返回没有执行的任务列表  
    5.     List<Runnable> shutdownNow();  
    6.     //判断ExecutorService是否关闭  
    7.     boolean isShutdown();  
    8.     //判断ExecutorService是否终止  
    9.     boolean isTerminated();  
    10.     //等待ExecutorService到达终止状态  
    11.     boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
    12.     <T> Future<T> submit(Callable<T> task);  
    13.     //当task执行成功的时候future.get()返回result  
    14.     <T> Future<T> submit(Runnable task, T result);  
    15.     //当task执行成功的时候future.get()返回null  
    16.     Future<?> submit(Runnable task);  
    17.     //批量提交任务并获得他们的future,Task列表与Future列表一一对应  
    18.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
    19.         throws InterruptedException;  
    20.     //批量提交任务并获得他们的future,并限定处理所有任务的时间  
    21.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,  
    22. long timeout, TimeUnit unit) throws InterruptedException;  
    23.     //批量提交任务并获得一个已经成功执行的任务的结果  
    24.     <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;  
    25.   
    26.     <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
    27.                     long timeout, TimeUnit unit)  
    28.         throws InterruptedException, ExecutionException, TimeoutException;  
    29. }  



    为了配合使用上面的并发编程接口,有一个Executors工厂类,负责创建各类满足ExecutorService接口的线程池,具体如下: 
    newFixedThreadPool:创建一个固定长度的线程池,线程池中线程的数量从1增加到最大值后保持不变。如果某个线程坏死掉,将会补充一个新的线程。 
    newCachedThreadPool:创建长度不固定的线程池,线程池的规模不受限制,不常用。 
    newSingleThreadExecutor:创建一个单线程的Executor,他其中有一个线程来处理任务,如果这个线程坏死掉,将补充一个新线程。 
    newScheduledThreadPool:创建固定长度的线程池,以延时或定时的方式来执行任务。 

    下面是Executor和ExecutorService中常用方法的示例: 

    Java代码  收藏代码
    1. import java.util.ArrayList;  
    2. import java.util.Collection;  
    3. import java.util.Iterator;  
    4. import java.util.List;  
    5. import java.util.concurrent.Callable;  
    6. import java.util.concurrent.Executor;  
    7. import java.util.concurrent.ExecutorService;  
    8. import java.util.concurrent.Executors;  
    9. import java.util.concurrent.Future;  
    10. import java.util.concurrent.TimeUnit;  
    11.   
    12. public class Demo{  
    13.     public static void main(String [] args){  
    14.         //--------Executor示例------------//  
    15.         Executor s=Executors.newSingleThreadExecutor();  
    16.         s.execute(new MyRunnableTask("1"));  
    17.           
    18.         //--------ExecutorService示例------------//  
    19.         ExecutorService es=Executors.newFixedThreadPool(2);  
    20.           
    21.         //--------get()示例------------//  
    22.         Future<String> future=es.submit(new MyCallableTask("10"));  
    23.         try{  
    24.             System.out.println(future.get());             
    25.         }catch(Exception e){}  
    26.           
    27.         //--------get(timeout, timeunit)示例------------//  
    28.         future=es.submit(new MyCallableTask("11"));  
    29.         try{  
    30.             System.out.println(future.get(500,TimeUnit.MILLISECONDS));  
    31.         }catch(Exception e){  
    32.             System.out.println("cancle because timeout");  
    33.         }  
    34.           
    35.         //--------invokeAll(tasks)示例------------//  
    36.         List<MyCallableTask> myCallableTasks=new ArrayList<MyCallableTask>();  
    37.         for(int i=0;i<6;i++){  
    38.             myCallableTasks.add(new MyCallableTask(i+""));  
    39.         }  
    40.         try {  
    41.             List<Future<String>> results = es.invokeAll(myCallableTasks);  
    42.             Iterator<Future<String>> iterator=results.iterator();  
    43.             while(iterator.hasNext()){  
    44.                 future=iterator.next();  
    45.                 System.out.println(future.get());  
    46.             }  
    47.         } catch (Exception e) {}  
    48.   
    49.         //--------invokeAll(tasks,timeout,timeunit))示例------------//  
    50.         try {  
    51.             //限定执行时间为2100ms,每个任务需要1000ms,线程池的长度为2,因此最多只能处理4个任务。一共6个任务,有2个任务会被取消。  
    52.             List<Future<String>> results = es.invokeAll(myCallableTasks,2100,TimeUnit.MILLISECONDS);  
    53.             Iterator<Future<String>> iterator=results.iterator();  
    54.             while(iterator.hasNext()){  
    55.                 future=iterator.next();  
    56.                 if(!future.isCancelled())  
    57.                     System.out.println(future.get());  
    58.                 else  
    59.                     System.out.println("cancle because timeout");  
    60.             }  
    61.         } catch (Exception e) {}  
    62.         es.shutdown();  
    63.     }  
    64. }  
    65.   
    66. class MyRunnableTask implements Runnable{  
    67.     private String name;  
    68.     public MyRunnableTask(String name) {  
    69.         this.name=name;  
    70.     }  
    71.     @Override  
    72.     public void run() {  
    73.         try {  
    74.             Thread.sleep(1000);  
    75.         } catch (InterruptedException e) {  
    76.             e.printStackTrace();  
    77.         }  
    78.         System.out.println("runnable task--"+name);  
    79.     }  
    80. }  
    81. class MyCallableTask implements Callable<String>{  
    82.     private String name;  
    83.     public MyCallableTask(String name) {  
    84.         this.name=name;  
    85.     }  
    86.     @Override  
    87.     public String call() throws Exception {  
    88.         try {  
    89.             Thread.sleep(1000);  
    90.         } catch (InterruptedException e) {}  
    91.         StringBuilder sb=new StringBuilder("callable task--");  
    92.         return sb.append(name).toString();  
    93.     }  
    94. }  



    上面的ExecutorSerivce接口中的invokeAll(tasks)方法用于批量执行任务,并且将结果按照task列表中的顺序返回。此外,还存在一个批量执行任务的接口CompletionTask。ExecutorCompletionService是实现CompletionService接口的一个类,该类的实现原理很简单: 

    用Executor类来执行任务,同时把在执行任务的Future放到BlockingQueue<Future<V>>队列中。该类实现的关键就是重写FutureTask类的done()方法,FutureTask类的done()方法是一个钩子函数(关于钩子函数,请读者自行查询),done()方法在FutureTask任务被执行的时候被调用。 

    ExecutorCompletionService类的核心代码如下: 

    Java代码  收藏代码
    1. public Future<V> submit(Runnable task, V result) {  
    2.     if (task == null) throw new NullPointerException();  
    3.     RunnableFuture<V> f = newTaskFor(task, result);  
    4.     executor.execute(new QueueingFuture(f));  
    5.     return f;  
    6. }  
    7. private class QueueingFuture extends FutureTask<Void> {  
    8.     QueueingFuture(RunnableFuture<V> task) {  
    9.         super(task, null);  
    10.         this.task = task;  
    11.     }  
    12.     protected void done() { completionQueue.add(task); }  
    13.     private final Future<V> task;  
    14. }  


    其中的done()方法定义如下: 

    Java代码  收藏代码
    1. /** 
    2.     * Protected method invoked when this task transitions to state 
    3.     * <tt>isDone</tt> (whether normally or via cancellation). The 
    4.     * default implementation does nothing.  Subclasses may override 
    5.     * this method to invoke completion callbacks or perform 
    6.     * bookkeeping. Note that you can query status inside the 
    7.     * implementation of this method to determine whether this task 
    8.     * has been cancelled. 
    9.     */  
    10.    protected void done() { }  



    ExecutorCompletionService的使用示例如下: 

    Java代码  收藏代码
      1. import java.util.concurrent.Callable;  
      2. import java.util.concurrent.CompletionService;  
      3. import java.util.concurrent.ExecutionException;  
      4. import java.util.concurrent.ExecutorCompletionService;  
      5. import java.util.concurrent.Executors;  
      6. import java.util.concurrent.Future;  
      7.   
      8. public class Demo{  
      9.     public static void main(String [] args) throws InterruptedException, ExecutionException{  
      10.         CompletionService<String> cs=new ExecutorCompletionService<String>(  
      11.                 Executors.newFixedThreadPool(2));  
      12.         for(int i=0;i<6;i++){  
      13.             cs.submit(new MyCallableTask(i+""));  
      14.         }  
      15.         for(int i=0;i<6;i++){  
      16.             Future<String> future=cs.take();  
      17.             //Retrieves and removes the Future representing the next completed task,   
      18.             //waiting if none are yet present.  
      19.             System.out.println(future.get());  
      20.         }  
      21.     }  
      22. }  
      23.   
      24. class MyCallableTask implements Callable<String>{  
      25.     private String name;  
      26.     public MyCallableTask(String name) {  
      27.         this.name=name;  
      28.     }  
      29.     @Override  
      30.     public String call() throws Exception {  
      31.         try {  
      32.             Thread.sleep(1000);  
      33.         } catch (InterruptedException e) {}  
      34.         StringBuilder sb=new StringBuilder("callable task--");  
      35.         return sb.append(name).toString();  
      36.     }  
      37. }  
  • 相关阅读:
    微信小程序 'errcode' => 47003,'errmsg' => 'argument invalid! hint
    微信分账
    Acwing-----1016. 最大上升子序列和
    Acwing-----1012. 友好城市
    Acwing-----482. 合唱队形
    Acwing-----1014. 登山
    Acwing-----1017. 怪盗基德的滑翔翼
    Acwing-----275. 传纸条
    Acwing-----1027. 方格取数
    Acwing-----1018. 最低通行费
  • 原文地址:https://www.cnblogs.com/Evil-Rebe/p/5948427.html
Copyright © 2011-2022 走看看