zoukankan      html  css  js  c++  java
  • node 通过 Promise Pool 实现多进程限制

    Promise Pool

    在自己工作生活中,有时候会自己写一下小工具,来代替人工去操作一些事情,比如从网页上下载图片时,需要限制同时下载的数量,防止被网站拉入黑名单,就需要使用一定的方法来控制同时进行的任务数了

    之前自己写过python脚本来爬虫下载图片,但还是对python还是不太熟,对js却熟悉点,有了node 后,就改用node进行相关批量操作了。本工具类也就是基于这个需求编写的。

    /**
     * Promise 线程池工具类
     * 主要用于使用node进行一些批量操作时,限制同时进行的任务数
     * 
     * 接收参数
     * 1. max : 同时进行的最大任务数量
     * 2. callback : 所有任务完成后执行的方法
     * 使用时,需先创建线程池,然后使用addTask来添加任务,最后调用addTaskEnd 来标记任务添加结束
     * 具体使用方法demo:
     *  const { PromisePool }  = require('./PromisePool')
        let sleep = (millsSeconds) => new Promise(resolve => { setTimeout(resolve, millsSeconds); })
        let pool = new PromisePool(3, ()=>{
            console.log('finish');
        })
        let taskParamArray = [1,2,3,4,5,6];
        for(let i = 0; i < taskParamArray.length; i++){
            pool.addTask(null, async (param) => {
                console.log(`${param} start`);
                await sleep(param*1000);
                console.log(`${param} end`);
            }, taskParamArray[i]);
        }
        pool.addTaskEnd();
     * 
     */
    class PromisePool{
        
        constructor(max=10, callback=()=>{}){
            this._max = max;
            this._count = 0;
            this._taskQueue = [];
            this._addEnd = false;
            this._callback = callback;
        }
        /**
         * 添加任务到线程池,最终执行会是
         * target.caller(args) 这样的方式执行
         * 
         * @param  target 方法执行的对象
         * @param {function} caller 方法体
         * @param  {...any} args 方法参数
         */
        addTask(target, caller, ...args){
            return new Promise((resolve, reject) => {
                const _task = this._addTask(target, caller, args, resolve, reject);
                if(this._count >= this._max){
                    this._taskQueue.push(_task);
                }else{
                    _task();
                }
            });
        }
        addTaskEnd(){
            this._addEnd = true;
        }
    
        /**
         * 返回一个匿名方法,方法内异步执行函数
         * 
         * @param {*} caller 执行的方法
         * @param {*} args 执行的方法参数
         * @param {*} resolve 
         * @param {*} reject 
         */
        _addTask(target, caller, args, resolve, reject ){
            return () => {
                Promise.resolve(caller.call(target, ...args))
                .then(resolve)
                .catch(reject)
                .finally(() => {
                    this._count--;
                    if(this._taskQueue.length > 0){
                        let _task = this._taskQueue.shift();
                        _task();
                    }
                    if(this._addEnd && this._count <= 0 && this._taskQueue.length === 0){
                        this._callback();
                    }
                });
                this._count++;
            }
        }
    }
    
    /**
     * 将数据集合 添加到PromisePool 执行
     * 其中 dataHanle是处理集合中的数据,接收参数为单个数据
     * finalFunc 为全部完成后执行的方法
     * 
     * @param {数据集合} dataList 
     * @param {对每个数据进行的处理} dataHandle 
     * @param {最终的数据处理} finalFunc 
     */
    function addTaskToPromisePool(dataList, dataHandle, finalFunc = () => { }) {
        let pool = new PromisePool(4, finalFunc)
        for (const data of dataList) {
            pool.addTask(null, dataHandle, data);
        }
        pool.addTaskEnd();
    }
    module.exports = {
        PromisePool,
        addTaskToPromisePool
    }
    

    代码参考自其他人的博客,然后我稍做了一些优化和整理,具体参考博客已经忘了。

  • 相关阅读:
    ssh速度慢
    ps -ef和ps aux的区别
    docker国内镜像加速
    pptpd的log整理
    docker入门2--生命周期
    docker入门1--简介、安装
    Cent7.2单用户模式
    shell中得到当下路径所有文件夹名称
    在centos 7下升级内核
    Mysql如何将一张表重复数据删除
  • 原文地址:https://www.cnblogs.com/feiyu127/p/13614716.html
Copyright © 2011-2022 走看看