newCachedThreadPool() 方法创建一个缓存线程池。当需要执行新的任务会创建新的线程,如果它们已经完成运行任务,变成可用状态,会重新使用这些线程。
缓存线程池的缺点:为新任务不断创建线程, 如果提交过多的任务给执行者,会使系统超载。
newFixedThreadPool() 创建一个有最大线程数的执行者。
public class CallableTest { public static void main(String args[]) { String[] array = new String[]{"fdasdf", "24234", "&^*", "MNJ&^", ")(UJH"}; ExecutorService pool = Executors.newFixedThreadPool(30); int sum = 0; List<Future<Integer>> list = new ArrayList<Future<Integer>>(); for (String word : array) { Callable<Integer> callable = new WordLength(word); Future<Integer> future = pool.submit(callable); list.add(future ); } for(Future<Integer> future :list){ try { sum += future.get(); } catch (InterruptedException | ExecutionException ex) { ex.printStackTrace(); } } System.out.println("The sum of lengths is " + sum); } } class WordLength implements Callable<Integer> { private String word; public WordLength(String word) { this.word = word; } @Override public Integer call() { return Integer.valueOf(word.length()); } }
ThreadPoolExecutor类中的invokeAny()方法接收任务数列,并启动它们,返回完成时没有抛出异常的第一个 任务的结果
TaskValidator ldapTask = new TaskValidator(ldapValidator, username, password); TaskValidator dbTask = new TaskValidator(dbValidator, username, password); List<TaskValidator> taskList = new ArrayList<>(); taskList.add(ldapTask); taskList.add(dbTask); ExecutorService executor = (ExecutorService) Executors .newCachedThreadPool(); String result; try { result = executor.invokeAny(taskList); System.out.printf("Main: Result: %s ", result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } executor.shutdown();
List<Task> taskList = new ArrayList<>(); for (int i = 0; i < 3; i++) { Task task = new Task(i); taskList.add(task); } List<Future<Result>> resultList = null; try { resultList = executor.invokeAll(taskList); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Main: Printing the results"); for (int i = 0; i < resultList.size(); i++) { Future<Result> future = resultList.get(i); try { Result result = future.get(); System.out.println(result.getName() + ": " + result.getValue()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1); System.out.printf("Main: Starting at: %s ", new Date()); for (int i = 0; i < 5; i++) { Task task = new Task("Task " + i); executor.schedule(task, i + 1, TimeUnit.SECONDS); } executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); }
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); System.out.printf("Main: Starting at: %s ", new Date()); Task task = new Task("Task"); ScheduledFuture<?> result = executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS); for (int i = 0; i < 10; i++) { System.out.printf("Main: Delay: %d ", result.getDelay(TimeUnit.MILLISECONDS)); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } executor.shutdown(); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Main: Finished at: %s ", new Date());
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool(); Task task = new Task(); System.out.printf("Main: Executing the Task "); Future<String> result = executor.submit(task); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Main: Canceling the Task "); result.cancel(true); System.out.printf("Main: Canceled: %s ", result.isCanceled()); System.out.printf("Main: Done: %s ", result.isDone()); executor.shutdown(); System.out.printf("Main: The executor has finished ");
FutureTask则是一个RunnableFuture<V>,即实现了Runnbale又实现了Futrue<V>这两个接口,另外它还可以包装Runnable和Callable<V>,所以一般来讲是一个符合体了,它可以通过Thread包装来直接执行,也可以提交给ExecuteService来执行,并且还可以通过v get()返回执行结果,在线程体没有执行完成的时候,主线程一直阻塞等待,执行完则直接返回结果。
public class FutureTaskTest { public static void main(String[] args) { Callable<String> task = new Callable<String>() { public String call() { System.out.println("Sleep start."); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println("Sleep end."); return "time=" + System.currentTimeMillis(); } }; //直接使用Thread的方式执行 FutureTask<String> ft = new FutureTask<String>(task); Thread t = new Thread(ft); t.start(); try { System.out.println("waiting execute result"); System.out.println("result = " + ft.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } //使用Executors来执行 System.out.println("========="); FutureTask<String> ft2 = new FutureTask<String>(task); Executors.newSingleThreadExecutor().submit(ft2); try { System.out.println("waiting execute result"); System.out.println("result = " + ft2.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
public class FutureTaskDone { public static void main(String[] args) { ExecutorService executor = (ExecutorService) Executors .newCachedThreadPool(); ResultTask resultTasks[] = new ResultTask[5]; for (int i = 0; i < 5; i++) { ExecutableTask executableTask = new ExecutableTask("Task " + i); resultTasks[i] = new ResultTask(executableTask); executor.submit(resultTasks[i]); } try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e1) { e1.printStackTrace(); } for (int i = 0; i < resultTasks.length; i++) { resultTasks[i].cancel(true); } for (int i = 0; i < resultTasks.length; i++) { try { if (!resultTasks[i].isCancelled()) { System.out.printf("%s ", resultTasks[i].get()); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executor.shutdown(); } } class ExecutableTask implements Callable<String> { private String name; public String getName() { return name; } public ExecutableTask(String name) { this.name = name; } @Override public String call() throws Exception { try { long duration = (long) (Math.random() * 10); System.out.printf("%s: Waiting %d seconds for results. ", this.name, duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { } return "Hello, world. I'm " + name; } } class ResultTask extends FutureTask<String> { private String name; public ResultTask(Callable<String> callable) { super(callable); this.name = ((ExecutableTask) callable).getName(); } @Override protected void done() { if (isCancelled()) { System.out.printf("%s: Has been canceled ", name); } else { System.out.printf("%s: Has finished ", name); } } }
通常,当你使用执行者执行并发任务时,你将会提交 Runnable或Callable任务给这个执行者,并获取Future对象控制这个方法。你可以发现这种情况,你需要提交任务给执行者在一个对象中,而处理结果在另一个对象中。基于这种情况,Java并发API提供CompletionService类。
CompletionService 类有一个方法来提交任务给执行者和另一个方法来获取已完成执行的下个任务的Future对象。在内部实现中,它使用Executor对象执行任务。这种行为的优点是共享一个CompletionService对象,并提交任务给执行者,这样其他(对象)可以处理结果。其局限性是,第二个对象只能获取那些已经完成它们的执行的任务的Future对象,所以,这些Future对象只能获取任务的结果。
public class CompletionServiceTest { public static void main(String[] args) { ExecutorService executor = (ExecutorService) Executors .newCachedThreadPool(); CompletionService<String> service = new ExecutorCompletionService<>( executor); ReportRequest faceRequest = new ReportRequest("Face", service); ReportRequest onlineRequest = new ReportRequest("Online", service); Thread faceThread = new Thread(faceRequest); Thread onlineThread = new Thread(onlineRequest); ReportProcessor processor = new ReportProcessor(service); Thread senderThread = new Thread(processor); System.out.printf("Main: Starting the Threads "); faceThread.start(); onlineThread.start(); senderThread.start(); try { System.out.printf("Main: Waiting for the report generators. "); faceThread.join(); onlineThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Main: Shutting down the executor. "); executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Main: Shutting down the executor. "); executor.shutdown(); try { executor.awaitTermination(1, TimeUnit.DAYS); } catch (InterruptedException e) { e.printStackTrace(); } } } class ReportGenerator implements Callable<String> { private String sender; private String title; public ReportGenerator(String sender, String title) { this.sender = sender; this.title = title; } @Override public String call() throws Exception { try { Long duration = (long) (Math.random() * 10); System.out .printf("%s_%s: ReportGenerator: Generating a report during %d seconds ", this.sender, this.title, duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } String ret = sender + ": " + title; return ret; } } class ReportRequest implements Runnable { private String name; private CompletionService<String> service; public ReportRequest(String name, CompletionService<String> service) { this.name = name; this.service = service; } @Override public void run() { ReportGenerator reportGenerator = new ReportGenerator(name, "Report"); service.submit(reportGenerator); } } class ReportProcessor implements Runnable { private CompletionService<String> service; private boolean end; public ReportProcessor(CompletionService<String> service) { this.service = service; end = false; } @Override public void run() { while (!end) { try { Future<String> result = service.poll(20, TimeUnit.SECONDS); if (result != null) { String report = result.get(); System.out.printf("ReportReceiver: Report Received:%s ", report); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } System.out.printf("ReportSender: End "); } public void setEnd(boolean end) { this.end = end; } }
public class RejectedTaskController implements RejectedExecutionHandler { public static void main(String[] args) { RejectedTaskController controller = new RejectedTaskController(); ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors .newCachedThreadPool(); executor.setRejectedExecutionHandler(controller); System.out.printf("Main: Starting. "); for (int i = 0; i < 3; i++) { Task task = new Task("Task" + i); executor.submit(task); } System.out.printf("Main: Shutting down the Executor. "); executor.shutdown(); System.out.printf("Main: Shutting down the Executor. "); executor.shutdown(); System.out.printf("Main: Sending another Task. "); Task task = new Task("RejectedTask"); executor.submit(task); System.out.println("Main: End"); System.out.printf("Main: End. "); } @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.printf( "RejectedTaskController: The task %s has been rejected ", r.toString()); System.out.printf("RejectedTaskController: %s ", executor.toString()); System.out.printf("RejectedTaskController: Terminating:%s ", executor.isTerminating()); System.out.printf("RejectedTaksController: Terminated:%s ", executor.isTerminated()); } } class Task implements Runnable { private String name; public Task(String name) { this.name = name; } @Override public void run() { System.out.println("Task " + name + ": Starting"); try { long duration = (long) (Math.random() * 10); System.out .printf("Task %s: ReportGenerator: Generating a report during %d seconds ", name, duration); TimeUnit.SECONDS.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Task %s: Ending ", name); } public String toString() { return name; } }