zoukankan      html  css  js  c++  java
  • 第二部分:并发工具类25->CompletionService,批量执行异步任务

    1.如何优化一个询价应用的核心代码呢?

    ThreadPoolExecutor+Future的方案,
    用3个线程异步执行,通过3次调用future的get方法后去查价结果,然后价格保存在数据库中

    
    // 创建线程池
    ExecutorService executor =
      Executors.newFixedThreadPool(3);
    // 异步向电商S1询价
    Future<Integer> f1 = 
      executor.submit(
        ()->getPriceByS1());
    // 异步向电商S2询价
    Future<Integer> f2 = 
      executor.submit(
        ()->getPriceByS2());
    // 异步向电商S3询价
    Future<Integer> f3 = 
      executor.submit(
        ()->getPriceByS3());
        
    // 获取电商S1报价并保存
    r=f1.get();
    executor.execute(()->save(r));
      
    // 获取电商S2报价并保存
    r=f2.get();
    executor.execute(()->save(r));
      
    // 获取电商S3报价并保存  
    r=f3.get();
    executor.execute(()->save(r));
    
    

    问题
    获取s1报价耗时很长,那么即使获取s2的报价耗时端,也无法保证s2的报价先执行,因为住线程都阻塞在了f1.get()上

    2.解决方案

    利用阻塞队列,将获取报价的结果放入阻塞队列,主线程消费阻塞队列,保证先获取的报价先保存到数据库中
    在执行f1.get方法,不是在主线程中执行,而是用线程池新开线程执行,防止阻塞

    
    // 创建阻塞队列
    BlockingQueue<Integer> bq =
      new LinkedBlockingQueue<>();
    //电商S1报价异步进入阻塞队列  
    executor.execute(()->
      bq.put(f1.get()));
    //电商S2报价异步进入阻塞队列  
    executor.execute(()->
      bq.put(f2.get()));
    //电商S3报价异步进入阻塞队列  
    executor.execute(()->
      bq.put(f3.get()));
    //异步保存所有报价  
    for (int i=0; i<3; i++) {
      Integer r = bq.take();
      executor.execute(()->save(r));
    }  
    

    3.使用CompletionService实现询价系统

    CompletionService,就是实现2中的类似功能
    sdk并发包中提供的
    内嵌一个阻塞队列,然后把执行结果的future对象加入到阻塞队列,也不是任务做种的结果

    4.创建CompletionService

    1. ExecutorCompletionService(Executor executor)
    2. ExecutorCompletionService(Executor executor,BlockQueue<Future> completionQueue)

    共同参数都有线程池,如果不指定队列,使用的是无界队列LinkedBlockQueue,任务执行结果的future对象加入到completionQueue中

    
    // 创建线程池
    ExecutorService executor = 
      Executors.newFixedThreadPool(3);
    // 创建CompletionService
    CompletionService<Integer> cs = new 
      ExecutorCompletionService<>(executor);
    // 异步向电商S1询价
    cs.submit(()->getPriceByS1());
    // 异步向电商S2询价
    cs.submit(()->getPriceByS2());
    // 异步向电商S3询价
    cs.submit(()->getPriceByS3());
    // 将询价结果异步保存到数据库
    for (int i=0; i<3; i++) {
      Integer r = cs.take().get();
      executor.execute(()->save(r));
    }
    

    5.CompletionService 接口说明

    submit()方法,参数Callable task
    submit()方法,参数Runnable task和V result,类似于ThreadPoolExecutor的 Future submit(Runnable task,T result)

    task(),从阻塞队列中获取并移除一个元素,阻塞队列是空,take会被阻塞
    poll(),从阻塞对垒中获取并移除一个元素,阻塞队列是空,poll会返回null值
    poll(long timeout,Timeunit unit)支持以超时的方式获取并移除阻塞队列头部的一个元素,等待了timeout unit时间,阻塞队列还是空,该方法会返回null值

    6.Dubbo中的Forking cluster集群模式的使用

    dubblo中的集群模式,支持并行调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了
    注意:是查询服务,而不能是修改服务。

    例如提供地址转换坐标的服务,保证高可用和高性能,并行调用3个地图服务商的API,然后只要有1个正确返回了结果r,那么地址坐标整个服务就可以直接返回r了,这种集群模式可以容忍2个地图服务商异常,但缺点是消耗资源偏多

    
    geocoder(addr) {
      //并行执行以下3个查询服务, 
      r1=geocoderByS1(addr);
      r2=geocoderByS2(addr);
      r3=geocoderByS3(addr);
      //只要r1,r2,r3有一个返回
      //则返回
      return r1|r2|r3;
    }
    

    7.用CompletionService实现Forking集群模式

    
    // 创建线程池
    ExecutorService executor =
      Executors.newFixedThreadPool(3);
    // 创建CompletionService
    CompletionService<Integer> cs =
      new ExecutorCompletionService<>(executor);
    // 用于保存Future对象
    List<Future<Integer>> futures =
      new ArrayList<>(3);
    //提交异步任务,并保存future到futures 
    futures.add(
      cs.submit(()->geocoderByS1()));
    futures.add(
      cs.submit(()->geocoderByS2()));
    futures.add(
      cs.submit(()->geocoderByS3()));
    // 获取最快返回的任务执行结果
    Integer r = 0;
    try {
      // 只要有一个成功返回,则break
      for (int i = 0; i < 3; ++i) {
        r = cs.take().get();
        //简单地通过判空来检查是否成功返回
        if (r != null) {
          break;
        }
      }
    } finally {
      //取消所有任务
      for(Future<Integer> f : futures)
        f.cancel(true);
    }
    // 返回结果
    return r;
    

    8.总结

    批量提交异步任务时,建议使用CompletionService,将线程池executor和blockingQueue功能融合一起
    CompletionService能够让异步任务执行结果有序化,先执行 完的先进入阻塞队列

    后续处理有序性,避免无谓等待

    原创:做时间的朋友
  • 相关阅读:
    NC20565 生日礼物(双指针)
    NC20566 游戏(二分图)
    NC19833 地斗主(dp+矩阵快速幂)
    CF505C Mr. Kitayuta, the Treasure Hunter(dp)
    HDU5493 Queue(线段树)
    HDU5489 Removed Interval (LIS+分治)
    CF1158C Permutation recovery(线段树优化建图)
    NC20811 蓝魔法师(树形dp)
    NC20857 Xor Path(dfs)
    chrony同步时间
  • 原文地址:https://www.cnblogs.com/PythonOrg/p/15010734.html
Copyright © 2011-2022 走看看