zoukankan      html  css  js  c++  java
  • ExecutorCompletionService分析及使用

    当我们通过Executor提交一组并发执行的任务,并且希望在每一个任务完成后能立即得到结果,有两种方式可以采取:

     

    方式一:

    通过一个list来保存一组future,然后在循环中轮训这组future,直到每个future都已完成。如果我们不希望出现因为排在前面的任务阻塞导致后面先完成的任务的结果没有及时获取的情况,那么在调用get方式时,需要将超时时间设置为0 


    1. public class CompletionServiceTest {
    2. static class Task implements Callable<String>{
    3. private int i;
    4. public Task(int i){
    5. this.i = i;
    6. }
    7. @Override
    8. public String call() throws Exception {
    9. Thread.sleep(10000);
    10. return Thread.currentThread().getName() + "执行完任务:" + i;
    11. }
    12. }
    13. public static void main(String[] args){
    14. testUseFuture();
    15. }
    16. private static void testUseFuture(){
    17. int numThread = 5;
    18. ExecutorService executor = Executors.newFixedThreadPool(numThread);
    19. List<Future<String>> futureList = new ArrayList<Future<String>>();
    20. for(int i = 0;i<numThread;i++ ){
    21. Future<String> future = executor.submit(new CompletionServiceTest.Task(i));
    22. futureList.add(future);
    23. }
    24. while(numThread > 0){
    25. for(Future<String> future : futureList){
    26. String result = null;
    27. try {
    28. result = future.get(0, TimeUnit.SECONDS);
    29. } catch (InterruptedException e) {
    30. e.printStackTrace();
    31. } catch (ExecutionException e) {
    32. e.printStackTrace();
    33. } catch (TimeoutException e) {
    34. //超时异常直接忽略
    35. }
    36. if(null != result){
    37. futureList.remove(future);
    38. numThread--;
    39. System.out.println(result);
    40. //此处必须break,否则会抛出并发修改异常。(也可以通过将futureList声明为CopyOnWriteArrayList类型解决)
    41. break;
    42. }
    43. }
    44. }
    45. }
    46. }

    方式二:

    第一种方式显得比较繁琐,通过使用ExecutorCompletionService,则可以达到代码最简化的效果。

    1. public class CompletionServiceTest {
    2. static class Task implements Callable<String>{
    3. private int i;
    4. public Task(int i){
    5. this.i = i;
    6. }
    7. @Override
    8. public String call() throws Exception {
    9. Thread.sleep(10000);
    10. return Thread.currentThread().getName() + "执行完任务:" + i;
    11. }
    12. }
    13. public static void main(String[] args) throws InterruptedException, ExecutionException{
    14. testExecutorCompletionService();
    15. }
    16. private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{
    17. int numThread = 5;
    18. ExecutorService executor = Executors.newFixedThreadPool(numThread);
    19. CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
    20. for(int i = 0;i<numThread;i++ ){
    21. completionService.submit(new CompletionServiceTest.Task(i));
    22. }
    23. }
    24. for(int i = 0;i<numThread;i++ ){
    25. System.out.println(completionService.take().get());
    26. }
    27. }

    其实completionService 中维持了一个blockingqueue,一旦有任务完成就放入,如果没有则阻塞。








  • 相关阅读:
    k8s 重要概念
    k8s 核心功能
    5 秒创建 k8s 集群
    学习 Kubernetes 的 Why 和 How
    Yeelink初步体验
    为Qemu aarch32添加BeautifulSoup4模块
    实现Qemu aarch32虚拟开发板ping www.baidu.com
    利用/proc/pid/pagemap将虚拟地址转换为物理地址
    加快Qemu Aarch32虚拟开发板的启动速度
    为Qemu aarch32开发板添加sd卡
  • 原文地址:https://www.cnblogs.com/You0/p/6670203.html
Copyright © 2011-2022 走看看