zoukankan      html  css  js  c++  java
  • async源码学习

    waterfall函数会连续执行数组中的函数,每次通过数组下一个函数的结果。
    然而,数组任务中的任意一个函数结果传递失败,那么该函数的下一个函数将不会执行,
    并且主回调函数立马把错误作为参数执行。

    以上是我翻译的,我都不知道翻译的什么鬼。

    其实该函数的作用就是: 上一个异步函数返回结果可以传给下一个异步函数,如果传递过程中,第一个参数出错了也就是真值的话,
    下一个回调函数将会停止调用,并且直接调用waterfall函数的第二个参数,其实也是个回调函数。并且把错误的参数传过去。

    先看看官网的demo:
            async.waterfall([
                fn1,
                fn2,
                fn3
            ], function(err, arg1) {
                console.log(err);
                console.log(arg1);
            });
    
            function fn1(next) {
                next(null, '11');
            };
    
            function fn2(arg1, next) {
                console.log(arg1);
                next(null, '22', '33');
            };
    
            function fn3(arg1, arg2, next) {
                console.log(arg1);
                console.log(arg2);
                next(null, 'done');
            };
            async.waterfall([function(aaa) {
                console.log(11);
                aaa(null, 'one');
            }, function(arg1, bbb) {
                console.log(arg1);
                bbb(null, 'two', 'three');
            }, function(arg1, arg2, ccc) {
                console.log(arg1);
                console.log(arg2);
                ccc(null, 'down', 'down2');
            }], function(err, result, res2) {
                console.log(err);
                console.log(result);
                console.log(res2);
            });

    自己搞个创建文件的实例,看看

    class File {
                constructor() {}
    
                // 创建文件
                createFile(callback) {
                    setTimeout(() => {
                        if (0) {
                            console.log('创建文件失败');
                            callback('err');
                        } else {
                            console.log('创建文件成功');
                            callback(null);
                        };
                    }, 3000);
                }
    
                // 写文件
                writeFile(callback) {
                    setTimeout(() => {
                        if (1) {
                            console.log('写文件失败');
                            callback('err');
                        } else {
                            console.log('写文件成功');
                            callback(null);
                        };
                    }, 2000);
                }
    
                // 读文件
                readFile(callback) {
                    setTimeout(() => {
                        if (0) {
                            console.log('读文件失败');
                            callback('err');
                        } else {
                            console.log('读文件成功');
                            callback(null, 'I love async!');
                        };
                    }, 4000);
                }
            };
            let file = new File();
    
            async.waterfall([function(callback) {
                file.createFile(function(err) {
                    if (!err) {
                        callback(null, 'createFile Ok');
                    } else {
                        callback('createFileFail');
                    };
                });
            }, function(err, callback) {
                file.writeFile(function(err) {
                    if (!err) {
                        callback(null, 'writeFile Ok');
                    } else {
                        callback('writeFileFail');
                    };
                });
            }, function(err, callback) {
                file.readFile(function(err) {
                    if (!err) {
                        callback(null, 'readFile Ok');
                    } else {
                        callback('readFileFail');
                    };
                });
            }], function(err, result) {
                console.log(err);
                console.log(result);
            });

    我一直纳闷,他怎么做到,上一个异步什么时候做完后,通知下一个异步开始执行,并且把参数传给下一个异步函数的。看看源码实现:

    /**
     * Created by Sorrow.X on 2017/5/28.
     */
    
    var waterfall = (function() {
    
        var isArray = Array.isArray;    // 把数组的isArray赋给isArray变量
    
        // 是否支持Symbol
        var supportsSymbol = typeof Symbol === 'function';
    
        var setImmediate$1 = wrap(_defer);
    
        function wrap(defer) {
            return function (fn/*, ...args*/) {
                var args = slice(arguments, 1);
                defer(function () {
                    fn.apply(null, args);
                });
            };
        };
    
        // 是否是异步
        function isAsync(fn) {
            return supportsSymbol && fn[Symbol.toStringTag] === 'AsyncFunction';
        };
    
        // 空函数
        function noop() {
            // No operation performed.
        };
    
        // 一次(偏函数)
        function once(fn) {    // fn: waterfall的第二个参数(回调函数)
            return function () {
                if (fn === null) return;
                var callFn = fn;
                fn = null;    // 把上级函数作用域中的fn置空
                callFn.apply(this, arguments);    // 调用回调函数
            };
        };
    
        // 包装成异步
        function wrapAsync(asyncFn) {
            return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
        };
    
        function asyncify(func) {
            return initialParams(function (args, callback) {
                var result;
                try {
                    result = func.apply(this, args);
                } catch (e) {
                    return callback(e);
                }
                // if result is Promise object
                if (isObject(result) && typeof result.then === 'function') {
                    result.then(function(value) {
                        invokeCallback(callback, null, value);
                    }, function(err) {
                        invokeCallback(callback, err.message ? err : new Error(err));
                    });
                } else {
                    callback(null, result);
                }
            });
        };
    
        function isObject(value) {
            var type = typeof value;
            return value != null && (type == 'object' || type == 'function');
        };
    
        function invokeCallback(callback, error, value) {
            try {
                callback(error, value);
            } catch (e) {
                setImmediate$1(rethrow, e);
            }
        };
    
        // 重写数组中的slice方法
        function slice(arrayLike, start) {    // arrayLike: 类数组对象  start: 开始位置
            start = start|0;
            var newLen = Math.max(arrayLike.length - start, 0);    // 长度
            var newArr = Array(newLen);    // 创建一个长度为newLen的数组
            for(var idx = 0; idx < newLen; idx++)  {
                newArr[idx] = arrayLike[start + idx];
            };
            return newArr;    // 返回数组
        };
    
        // 执行一次
        function onlyOnce(fn) {
            return function() {
                if (fn === null) throw new Error("Callback was already called.");    // 回调已被调用
                var callFn = fn;
                fn = null;
                callFn.apply(this, arguments);    //调用callFn 参数就是用户回调函数中的参数
            };
        };
    
        var waterfall = function(tasks, callback) {    // tasks: 异步函数数组容器, callback: 回调
            callback = once(callback || noop);    // 回调函数
            if (!isArray(tasks)) return callback(new Error('First argument to waterfall must be an array of functions'));    // 第一个参数必须是数组(函数数组)!
            if (!tasks.length) return callback();    // 空数组的话直接调用回调函数(无参数)
            var taskIndex = 0;    // 任务索引
    
            function nextTask(args) {    // 参数数组
                var task = wrapAsync(tasks[taskIndex++]);    // 数组中的任务
                args.push(onlyOnce(next));    // 把next方法添加到args数组中去
                task.apply(null, args);    // 调用数组中task函数(参数是数组)
            };
    
            function next(err/*, ...args*/) {    // 其实就是函数参数中的回调函数callback
                if (err || taskIndex === tasks.length) {    // 只要有错误或者函数数组任务都完成了
                    return callback.apply(null, arguments);    // 就执行回调
                };
                nextTask(slice(arguments, 1));    // 数组中的函数没循环完且没出错,那就继续调用
            };
    
            nextTask([]);    // 调用
        };
    }());
    waterfall函数中有个next方法,其实我们写的回调就是next方法。

    好吧,以上代码直接抽取async中的代码,可以直接使用。如果只想要这一个功能的话。
  • 相关阅读:
    ubuntu 15.04默认root用户登陆
    hive的not in
    Spark 1.4.1中Beeline使用的gc overhead limit exceeded
    Sequoiadb该如何选择合适的SQL引擎
    scala的object知识点
    scala中同步块
    英语口语练习系列-C36-城市-谈论活动-登高
    英语口语练习系列-C35-马戏-谈论语言-己亥杂诗
    英语口语练习系列-C34-儿童-谈论物品和人-武陵春
    英语口语练习系列-C33-露营-谈论日期-离思
  • 原文地址:https://www.cnblogs.com/sorrowx/p/6918417.html
Copyright © 2011-2022 走看看