zoukankan      html  css  js  c++  java
  • mafintosh/end-of-stream

    https://github.com/mafintosh/end-of-stream

    Call a callback when a readable/writable/duplex stream has completed or failed.

    end-of-stream

    A node module that calls a callback when a readable/writable/duplex stream has completed or failed.

    就是当一个可读/可写。双工流完成或失败时可以回调的模块,如eos(readableStream,cb)意思就是当将这个回调函数cb与流封装到一起啦,readableStream之后可以进行各种操作,pipe/destroy等,然后相应地触发cb回调函数

    npm install end-of-stream

    Usage

    Simply pass a stream and a callback to the eos. Both legacy streams, streams2 and stream3 are supported.

    var eos = require('end-of-stream');
    
    eos(readableStream, function(err) {
      // this will be set to the stream instance
        if (err) return console.log('stream had an error or closed early');
        console.log('stream has ended', this === readableStream);//readableStream关闭可读是end,上下文环境this即readableStream
    });
    
    eos(writableStream, function(err) {
        if (err) return console.log('stream had an error or closed early');
        console.log('stream has finished', this === writableStream);//writableStream关闭可写是finish
    });
    
    eos(duplexStream, function(err) {
        if (err) return console.log('stream had an error or closed early');
        console.log('stream has ended and finished', this === duplexStream);//duplexStream关闭要end和finish
    });
    
    eos(duplexStream, {readable:false}, function(err) {
        if (err) return console.log('stream had an error or closed early');
        console.log('stream has finished but might still be readable');//{readable:false}即不end,该duplexStream可能还能读
    });
    
    eos(duplexStream, {writable:false}, function(err) {
        if (err) return console.log('stream had an error or closed early');
        console.log('stream has ended but might still be writable');//{writable:false}即不finish,该duplexStream可能还能写
    });
    
    eos(readableStream, {error:false}, function(err) {
        // do not treat emit('error', err) as a end-of-stream
    });

    end-of-stream/index.js

    在这里设置是finish后是不可写,end后是不可读

    var once = require('once');
    
    var noop = function() {};
    
    var isRequest = function(stream) {
        return stream.setHeader && typeof stream.abort === 'function';
    };
    
    var isChildProcess = function(stream) {
        return stream.stdio && Array.isArray(stream.stdio) && stream.stdio.length === 3
    };
    
    var eos = function(stream, opts, callback) {
        if (typeof opts === 'function') return eos(stream, null, opts);
        if (!opts) opts = {};
    
        callback = once(callback || noop);
    
        var ws = stream._writableState;
        var rs = stream._readableState;
        var readable = opts.readable || (opts.readable !== false && stream.readable);
        var writable = opts.writable || (opts.writable !== false && stream.writable);
    
        var onlegacyfinish = function() {
            if (!stream.writable) onfinish();
        };
    
        var onfinish = function() {
            writable = false;
            if (!readable) callback.call(stream);
        };
    
        var onend = function() {
            readable = false;
            if (!writable) callback.call(stream);
        };
    
        var onexit = function(exitCode) {
            callback.call(stream, exitCode ? new Error('exited with error code: ' + exitCode) : null);
        };
    
        var onerror = function(err) {
            callback.call(stream, err);
        };
    
        var onclose = function() {
            if (readable && !(rs && rs.ended)) return callback.call(stream, new Error('premature close'));
            if (writable && !(ws && ws.ended)) return callback.call(stream, new Error('premature close'));
        };
    
        var onrequest = function() {
            stream.req.on('finish', onfinish);
        };
    
        if (isRequest(stream)) {
            stream.on('complete', onfinish);
            stream.on('abort', onclose);
            if (stream.req) onrequest();
            else stream.on('request', onrequest);
        } else if (writable && !ws) { // legacy streams
            stream.on('end', onlegacyfinish);
            stream.on('close', onlegacyfinish);
        }
    
        if (isChildProcess(stream)) stream.on('exit', onexit);
        
    //三类事件的监听 stream.on(
    'end', onend); stream.on('finish', onfinish); if (opts.error !== false) stream.on('error', onerror); stream.on('close', onclose); return function() { stream.removeListener('complete', onfinish);//移除改事件并调用onfinish函数 stream.removeListener('abort', onclose); stream.removeListener('request', onrequest); if (stream.req) stream.req.removeListener('finish', onfinish); stream.removeListener('end', onlegacyfinish); stream.removeListener('close', onlegacyfinish); stream.removeListener('finish', onfinish); stream.removeListener('exit', onexit); stream.removeListener('end', onend); stream.removeListener('error', onerror); stream.removeListener('close', onclose); }; }; module.exports = eos;

    所以当调用eos(...)时,基本上就是监听了各式各样的事件,然后返回一个函数。这个函数里面是各类的removeListener,就是将你之前的监听全部移除,并触发相应的事件,运行返回的函数的方法就是eos(...)(),如下面的第四个例子

    end-of-stream/test.js
    var assert = require('assert');
    var eos = require('./index');
    
    var expected = 10;//下面有十个例子
    var fs = require('fs');
    var cp = require('child_process');//nodejs子进程,看博客
    var net = require('net');
    var http = require('http');
    
    var ws = fs.createWriteStream('/dev/null');
    eos(ws, function(err) {
        expected--;
        assert(!!err);
        assert(this === ws);
        if (!expected) process.exit(0);
    });
    ws.destroy();//将会触发error和close事件
    
    var rs1 = fs.createReadStream('/dev/urandom');
    eos(rs1, function(err) {
        expected--;
        assert(!!err);
        assert(this === rs1);
        if (!expected) process.exit(0);//说明运行成功
    });
    rs1.destroy();//将会触发error和close事件
    
    var rs2 = fs.createReadStream(__filename);
    eos(rs2, function(err) {
        expected--;
        assert.ifError(err);
        assert(this === rs2);
        if (!expected) process.exit(0);
    });
    rs2.pipe(fs.createWriteStream('/dev/null'));
    
    var rs3 = fs.createReadStream(__filename);
    eos(rs3, function(err) {
        assert.ifError(err);
        assert(this === rs);
        throw new Error('no go');
    })();//运行eos的返回函数,即各个removeListener
    rs3.pipe(fs.createWriteStream('/dev/null'));
    
    var exec = cp.exec('echo hello world');
    eos(exec, function(err) {
        expected--;
        assert.ifError(err);
        assert(this === exec);
        if (!expected) process.exit(0);
    });
    
    var spawn = cp.spawn('echo', ['hello world']);
    eos(spawn, function(err) {
        expected--;
        assert.ifError(err);
        assert(this === spawn);
        if (!expected) process.exit(0);
    });
    
    var socket = net.connect(50000);
    eos(socket, function(err) {
        expected--;
        assert(!!err);
        assert(this === socket);
        if (!expected) process.exit(0);
    });
    
    var server = net.createServer(function(socket) {
        eos(socket, function(err) {
            expected--;
            assert(!!err);
            assert(this === socket);
            if (!expected) process.exit(0);
        });
        socket.destroy();
    }).listen(30000, function() {
        var socket = net.connect(30000);
        eos(socket, function(err) {
            expected--;
            assert.ifError(err);
            assert(this === socket);
            if (!expected) process.exit(0);
        });
    });
    
    var server2 = http.createServer(function(req, res) {
        eos(res, function(err) {
            expected--;
            assert.ifError(err);
        });
        res.end();
    }).listen(function() {
        var port = server2.address().port;
        http.get('http://localhost:' + port, function(res) {
            eos(res, function(err) {
                expected--;
                assert.ifError(err);
                server2.close();
            });
            res.resume();
        });
    });
    
    setTimeout(function() {
        assert(expected === 0);
        process.exit(0);
    }, 1000);
      • process.exit(0)表示成功完成,回调函数中,err将为null;

      • process.exit(非0)表示执行失败,回调函数中,err不为null,err.code就是我们传给exit的数字。

     
  • 相关阅读:
    Android之在linux终端执行shell脚本直接打印当前运行app的日志
    webpack打包vue项目之后生成的dist文件该怎么启动运行
    Android工程中javax annotation Nullable找不到的替代方案
    绝对良心提供百度网盘的jdk1.8源码下载包含sun包的
    上周热点回顾(12.23-12.29)团队
    上周热点回顾(12.16-12.22)团队
    k8s 开船记:升级为豪华邮轮(高可用集群)与遇到奇怪故障(dns解析异常)团队
    上周热点回顾(12.9-12.15)团队
    k8s 开船记-修船:改 readinessProbe ,去 DaemonSet ,上 Autoscaler团队
    k8s 开船记-触礁:四涡轮发动机撞坏3个引发502故障团队
  • 原文地址:https://www.cnblogs.com/wanghui-garcia/p/9889215.html
Copyright © 2011-2022 走看看