异步计算也可以实现闭锁、栅栏的功能。它使用Callable来代表一个任务,提交到Executor框架后,可以异步的通过Future来获取任务执行结果。也可以通过CompletionService来提交一组任务到一个阻塞队列,再异步获取到这组任务的执行结果。下面举例:
1、一个任务:
package com.wulinfeng.concurrent; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class FutureTaskHarness { private final ExecutorService es = Executors.newCachedThreadPool(); public void futureTest(int nThreads) { final int count = nThreads; // 定义任务内容 Callable<List<String>> task = new Callable<List<String>>() { @Override public List<String> call() throws Exception { List<String> results = new ArrayList<String>(); for (int i = 0; i < count; i++) { // 休息一秒 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } results.add(String.format("战士(%d)号进入真正的战斗线程中,获取战斗任务编号:%d ", i, new Random().nextInt())); } return results; } }; // 提交任务,开始执行 Future<List<String>> future = es.submit(task); es.shutdown(); // 干点别的啥事 System.out.println("开始准备战斗..."); long start = System.currentTimeMillis(); // 获取任务执行结果,一次性得到所有結果 try { List<String> results = future.get(); for (String soldier : results) { System.out.println(soldier); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); future.cancel(true); // TODO: handle exception } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } else { throw new IllegalStateException("Not unchecked", t); } } System.out.printf("战斗结束, 耗时:%d 秒 ", (System.currentTimeMillis() - start) / 1000); } public static void main(String[] args) { new FutureTaskHarness().futureTest(7); } }
运行结果:
开始准备战斗... 战士(0)号进入真正的战斗线程中,获取战斗任务编号:-955993222 战士(1)号进入真正的战斗线程中,获取战斗任务编号:1354359084 战士(2)号进入真正的战斗线程中,获取战斗任务编号:1216592938 战士(3)号进入真正的战斗线程中,获取战斗任务编号:1961530027 战士(4)号进入真正的战斗线程中,获取战斗任务编号:-1066960200 战士(5)号进入真正的战斗线程中,获取战斗任务编号:1117184762 战士(6)号进入真正的战斗线程中,获取战斗任务编号:1369217956 战斗结束, 耗时:7 秒
2、一组任务:
package com.wulinfeng.concurrent; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CompletionServiceHarness { private final ExecutorService es = Executors.newCachedThreadPool(); public void completionTest(int nThreads) { final int count = nThreads; // 定义任务内容 Callable<String> task = new Callable<String>() { @Override public String call() throws Exception { // 休息一秒 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return String.format("进入真正的战斗线程中,获取战斗任务编号:%d ", new Random().nextInt()); } }; // 提交任务,一个一个的提交,而不是一次提交所有任务 CompletionService<String> soldier = new ExecutorCompletionService<String>(es); for (int i = 0; i < count; i++) { soldier.submit(task); } es.shutdown(); // 干点别的啥 long start = System.currentTimeMillis(); System.out.println("开始准备战斗..."); // 获取任务执行结果,一个一个的取 try { for (int i = 0; i < count; i++) { Future<String> future = soldier.take(); String result = future.get(); System.out.printf("战士(%d)号%s ", i, result); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); // TODO: handle exception } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof RuntimeException) { throw (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } else { throw new IllegalStateException("Not unchecked", t); } } System.out.printf("战斗结束, 耗时:%d 秒 ", (System.currentTimeMillis() - start) / 1000); } public static void main(String[] args) { new CompletionServiceHarness().completionTest(7); } }
运行结果:
开始准备战斗... 战士(0)号进入真正的战斗线程中,获取战斗任务编号:420312313 战士(1)号进入真正的战斗线程中,获取战斗任务编号:346919435 战士(2)号进入真正的战斗线程中,获取战斗任务编号:-435118897 战士(3)号进入真正的战斗线程中,获取战斗任务编号:-89852175 战士(4)号进入真正的战斗线程中,获取战斗任务编号:-249934891 战士(5)号进入真正的战斗线程中,获取战斗任务编号:-321581365 战士(6)号进入真正的战斗线程中,获取战斗任务编号:1525687729 战斗结束, 耗时:1 秒