zoukankan      html  css  js  c++  java
  • Promise.all并发限制

    Promise.all概念

    首先了解一下Promise.all

    Promise.all可以将多个Promise实例包装成一个新的Promise实例。同时,成功和失败的返回值是不同的,成功的时候返回的是一个结果数组,而失败的时候则返回最先被reject失败状态的值(第一次失败就返回了,而不等待后续promise的状态)。

    Promise.all 可以保证,promises 数组中所有promise对象都达到 resolve 状态,才执行 then 回调,而如果是有一个reject了,那就直接执行catch的代码。

    需要特别注意的是,Promise.all获得的成功结果的数组里面的数据顺序和Promise.all接收到的数组顺序是一致的。这带来了一个绝大的好处:在前端开发请求数据的过程中,偶尔会遇到发送多个请求并根据请求顺序获取和使用数据的场景,使用Promise.all毫无疑问可以解决这个问题。

    注意Promises数组的执行并不是通过Promise.all而是在我们初始化Promise的时候就执行了。可以看一下下面这个情况:

    let a = new Promise((resolve,eject)=>{
        console.log(123);
        setTimeout(()=>resolve("Promise a"),1000)
    })
    let b = new Promise((resolve,eject)=>{
        console.log(222);
        setTimeout(()=>resolve("Promise b"),5000)
    })
    Promise.all([a,b]).then((res)=>console.log(res));
    Promise.all([a,b]).then((res)=>console.log(res));

    输出结果为

     相当于,a和b两个promise在创建的时候执行,我们只是通过Promise.all拿到结果而已,多次执行Promise.all只是多次获取到结果,并不会执行promise数组。

    Promise.all并发限制实现

    ⛲️ 场景:如果你都的promises 数组中每个对象都是http请求,你在瞬间发出几十万http请求(tcp连接数不足可能造成等待),或者堆积了无数调用栈导致内存溢出。这时,就需要考虑对Promise.all做并发限制

    Promise.all并发限制指的是,每个时刻并发执行的promise数量是固定的,最终的执行结果还是保持与原来的Promise.all一致。

    首先先看一下要实现的效果,有一个定时器,在1000,5000,3000,2000s之后输出时间,每次并发为2。也就是先执行1000和5000,等到1000完成后,开始执行3000,以此类推直到所有任务完成(先实现这样一个简单的并发控制,并没有对promise数组拿到结果,执行then)

     const timeout = i => new Promise(resolve => {
        console.log(i);
        setTimeout(() => resolve(i), i)
      });
     asyncPools(2, [1000, 5000, 3000, 2000], timeout)

    1. asyncPools 接受三个参数(poolLimit, array, iteratorFn

    poolLimit:并发限制

    array:需要执行的并发数组

    iteratorFn:具体执行的promise函数

     function asyncPools(poolLimit,array,fnInter) {
          let doing = [];
          let i =0;
          function pp(){
              if(i>=array.length) {
                  return;
              }
              let e = fnInter(array[i++]); //初始化promise
              e.then(()=>doing.splice(doing.indexOf(e),1))  //完成之后从doing删除
              doing.push(e); //放进doing列表
              if(doing.length>=poolLimit) {  //超出限制的话
                Promise.race(doing).then(pp); //监听每完成一个就再放一个
              }
              else {  //否则直接放进去
                pp();
              }
          }
          pp();
      }
    

      思路:首先需要一个doing数组记录正在执行的promise,因为需要不停的初始化并且放入doing数组,所以采用递归的形式。有一个变量i记录初始化到array的第几个了。---> 开始从array中初始化promise并且放入doing数组,因为promise完成之后需要从doing中删除,所以注册这个操作到初始化的promise中。--->那么下一次再初始化放入的时机就由poolLimit来决定,超出限制的话,使用promise.race来监听doing列表有一个promise完成,就放入;没有超出限制,直接放入。--->当array列表执行完毕也就是i>=array.length的时候结束操作。

    2. 接下来是保存每一个promise状态,在使用了并发asyncPools函数可以通过他的then方法拿到结果。所以我们对上面代码进行了改进,增加一个ret来保存每一个初始化的promise,最后返回Promise.all(ret)拿到promise数组的结果。

    使用示例:

    asyncPools(2, [1000, 5000, 3000, 2000], timeout).then(res=>console.log(res));

    第一步:存储Promise数组,返回Promise.all(ret);

     function asyncPools(poolLimit,array,fnInter) {
        let doing = [];
        let i =0;
        let ret = [];  //结果数组
        function pp(){
            if(i>=array.length) {
                return Promise.all(ret); //返回结果
            }
            let e = fnInter(array[i++]); 
            e.then(()=>doing.splice(doing.indexOf(e),1))  
            doing.push(e); 
            ret.push(e); //放入ret数组
            if(doing.length>=poolLimit) {  
              Promise.race(doing).then(pp); 
            }
            else {  
              pp();
            }
        }
        pp();
    }
    

     但是直接使用会报错,因为内部是异步的,执行完之后拿到的结果其实是undefined,没有then方法。

    第二步:改写函数的return,使得返回的是promise对象可以实现then的链式调用

    function asyncPools(poolLimit,array,fnInter) {
        let doing = [];
        let i =0;
        let ret = [];
        function pp(){
            if(i>=array.length) {
                return Promise.resolve(); //最后一个resolve状态,会进入外层返回Promise.then
            }
            let e = fnInter(array[i++]); 
            e.then(()=>doing.splice(doing.indexOf(e),1))  
            doing.push(e); 
            ret.push(e); 
            if(doing.length>=poolLimit) {  
              return Promise.race(doing).then(pp); //return返回
            }
            else { 
              return Promise.resolve().then(pp); //改写一下保证then链式调用
            }
        }
        return pp().then(()=>Promise.all(ret)); //只有当array结束,最后一个resolve才会进入then
    }
    

     至此我们的并发限制代码就完成了!


    下面的内容就是介绍一下和Promise.all经常一起出现的概念Promise.race了!

    Promise.race(iterable) 方法返回一个 promise,一旦迭代器中的某个promise解决或拒绝,返回的 promise就会解决或拒绝。

  • 相关阅读:
    我的架构经验系列文章 前端架构
    我的架构经验系列文章 后端架构 安全层面
    Adhesive框架系列文章报警服务模块使用和实现
    Adhesive框架系列文章Mongodb数据服务模块实现(上)
    在Silverlight程序中使用Thread一个很容易被忽略的问题
    .net(偏web) vs j2ee的一些框架选型
    Wcf扩展
    Adhesive框架系列文章内存队列服务模块使用和实现
    【翻译】C#编程语言和JAVA编程语言的比较(下)
    在测试Adhesive的时候发现一个Mongodb官方驱动1.1.0.4184比较严重的BUG
  • 原文地址:https://www.cnblogs.com/longlongdan/p/13962460.html
Copyright © 2011-2022 走看看