zoukankan      html  css  js  c++  java
  • Node.js:理解stream

    Stream在node.js中是一个抽象的接口,基于EventEmitter,也是一种Buffer的高级封装,用来处理流数据。流模块便是提供各种API让我们可以很简单的使用Stream。
    流分为四种类型,如下所示:

    • Readable,可读流
    • Writable,可写流
    • Duplex,读写流
    • Transform,扩展的Duplex,可修改写入的数据

    1、Readable可读流

    通过stream.Readable可创建一个可读流,它有两种模式:暂停和流动。
    在流动模式下,将自动从下游系统读取数据并使用data事件输出;暂停模式下,必须显示调用stream.read()方法读取数据,并触发data事件。
    所有的可读流最开始都是暂停模式,可以通过以下方法切换到流动模式:

    • 监听'data'事件
    • 调用stream.resume()方法
    • 调用stream.pipe()方法将数据输出到一个可写流Writable

    同样地,也可以切换到暂停模式,有两种方法:

    • 如果没有设置pipe目标,调用stream.pause()方法即可。
    • 如果设置了pipe目标,则需要移除所有的data监听和调用stream.unpipe()方法

    在Readable对象中有一个_readableSate的对象,通过该对象可以得知流当前处于什么模式,如下所示:

    • readable._readableState.flowing = null,没有数据消费者,流不产生数据
    • readable._readableState.flowing = true,处于流动模式
    • readable._readableState.flowing = false,处于暂停模式

    为什么使用流取数据
    对于小文件,使用fs.readFile()方法读取数据更方便,但需要读取大文件的时候,比如几G大小的文件,使用该方法将消耗大量的内存,甚至使程序崩溃。这种情况下,使用流来处理是更合适的,采用分段读取,便不会造成内存的'爆仓'问题。
    data事件
    在stream提供数据块给消费者时触发,有可能是切换到流动模式的时候,也有可能是调用readable.read() 方法且有有效数据块的时候,使用如下所示:

    const fs = require('fs');
    
    const rs = fs.createReadStream('./appbak.js');
    var chunkArr = [],
    	chunkLen = 0;
    rs.on('data',(chunk)=>{
    	chunkArr.push(chunk);
    	chunkLen+=chunk.length;
    });
    rs.on('end',(chunk)=>{
    	console.log(Buffer.concat(chunkArr,chunkLen).toString());
    });
    

    readable事件
    当流中有可用数据能被读取时触发,分为两种,新的可用的数据和到达流的末尾,前者stream.read()方法返回可用数据,后者返回null,如下所示:

    const rs = fs.createReadStream('./appbak.js');
    var chunkArr = [],
    	chunkLen = 0;
    
    rs.on('readable',()=>{
    	var chunk = null;
    	//这里需要判断是否到了流的末尾
    	if((chunk = rs.read()) !== null){
    		chunkArr.push(chunk);
    		chunkLen+=chunk.length;
    	}
    });
    rs.on('end',(chunk)=>{
    	console.log(Buffer.concat(chunkArr,chunkLen).toString());
    });
    

    pause和resume方法
    stream.pause()方法让流进入暂停模式,并停止'data'事件触发,stream.resume()方法使流进入流动模式,并恢复'data'事件触发,也可以用来消费所有数据,如下所示:

    const rs = fs.createReadStream('./下载.png');
    rs.on('data',(chunk)=>{
    	console.log(`接收到${chunk.length}字节数据...`);
    	rs.pause();
    	console.log(`数据接收将暂停1.5秒.`);
    	setTimeout(()=>{
    		rs.resume();
    	},1000);
    });
    rs.on('end',(chunk)=>{
    	console.log(`数据接收完毕`);
    });
    

    pipe(destination[, options])方法
    pipe()方法绑定一个可写流到可读流上,并自动切换到流动模式,将所有数据输出到可写流,以及做好了数据流的管理,不会发生数据丢失的问题,使用如下所示:

    const rs = fs.createReadStream('./app.js');
    rs.pipe(process.stdout);
    

    以上介绍了多种可读流的数据消费的方法,但对于一个可读流,最好只选择其中的一种,推荐使用pipe()方法。

    2、Writable可写流

    所有的可写流都是基于stream.Writable类创建的,创建之后便可将数据写入该流中。
    write(chunk[, encoding][, callback])方法
    write()方法向可写流中写入数据,参数含义:

    • chunk,字符串或buffer
    • encoding,若chunk为字符串,则是chunk的编码
    • callback,当前chunk数据写入磁盘时的回调函数

    该方法的返回值为布尔值,如果为false,则表示需要写入的数据块被缓存并且此时缓存的大小超出highWaterMark阀值,否则为true。
    使用如下所示:

    const ws = fs.createWriteStream('./test.txt');
    ws.write('nihao','utf8',()=>{process.stdout.write('this chunk is flushed.');});
    ws.end('done.')
    

    背压机制
    如果可写流的写入速度跟不上可读流的读取速度,write方法添加的数据将被缓存,逐渐增多,导致占用大量内存。我们希望的是消耗一个数据,再去读取一个数据,这样内存就维持在一个水平上。如何做到这一点?可以利用write方法的返回值来判断可写流的缓存状态和'drain'事件,及时切换可读流的模式,如下所示:

    function copy(src,dest){
    	src = path.resolve(src);
    	dest = path.resolve(dest);
    	const rs = fs.createReadStream(src);
    	const ws = fs.createWriteStream(dest);
    	console.log('正在复制中...');
    	const stime = +new Date();
    	rs.on('data',(chunk)=>{
    		if(null === ws.write(chunk)){
    			rs.pause();
    		}
    	});
    	ws.on('drain',()=>{
    		rs.resume();
    	});
    	rs.on('end',()=>{
    		const etime = +new Date();
    		console.log(`已完成,用时:${(etime-stime)/1000}秒`);
    		ws.end();
    	});
    	function calcProgress(){
    		
    	}
    }
    copy('./CSS权威指南 第3版.pdf','./javascript.pdf');
    

    drain事件
    如果Writable.write()方法返回false,则drain事件将会被触发,上面的背压机制已经使用了该事件。
    finish事件
    在调用stream.end()方法之后且所有缓存区的数据都被写入到下游系统,就会触发该事件,如下所示:

    const ws = fs.createWriteStream('./alphabet.txt');
    const alphabetStr = 'abcdefghijklmnopqrstuvwxyz';
    ws.on('finish',()=>{
    	console.log('done.');
    });
    for(let letter of alphabetStr.split()){
    	ws.write(letter);
    }
    ws.end();//必须调用
    

    end([chunk][, encoding][, callback])方法
    end()方法被调用之后,便不能再调用stream.write()方法写入数据,负责将抛出错误。

    3、Duplex读写流

    Duplex流同时实现了Readable与Writable类的接口,既是可读流,也是可写流。例如'zlib streams'、'crypto streams'、'TCP sockets'等都是Duplex流。

    4、Transform流

    Duplex流的扩展,区别在于,Transform流自动将写入端的数据变换后添加到可读端。例如:'zlib streams'、'crypto streams'等都是Transform流。

    5、四种流的实现

    stream模块提供的API可以让我们很简单的实现流,该模块使用require('stream')引用,我们只要继承四种流中的一个基类 (stream.Writable, stream.Readable, stream.Duplex, or stream.Transform),然后实现它的接口就可以了,需要实现的接口如下所示:

    Use-case Class Method(s) to implement
    Reading only Readable _read
    Writing only Writable _write, _writev
    Reading and writing Duplex _read, _write, _writev
    Operate on written data, then read the result Transform _transform, _flush

    Readable流实现
    如上所示,我们只要继承Readable类并实现_read接口即可,,如下所示:

    const Readable = require('stream').Readable;
    const util = require('util');
    const alphabetArr = 'abcdefghijklmnopqrstuvwxyz'.split();
    /*function AbReadable(){
    	if(!this instanceof AbReadable){
    		return new AbReadable();
    	}
    	Readable.call(this);
    }
    util.inherits(AbReadable,Readable);
    AbReadable.prototype._read = function(){
    	if(!alphabetArr.length){
    		this.push(null);
    	}else{
    		this.push(alphabetArr.shift());
    	}
    };
    
    const abReadable = new AbReadable();
    abReadable.pipe(process.stdout);*/
    
    /*class AbReadable extends Readable{
    	constructor(){
    		super();
    	}
    	_read(){
    		if(!alphabetArr.length){
    			this.push(null);
    		}else{
    			this.push(alphabetArr.shift());
    		}
    	}
    }
    const abReadable = new AbReadable();
    abReadable.pipe(process.stdout);*/
    
    /*const abReadable = new Readable({
    	read(){
    		if(!alphabetArr.length){
    			this.push(null);
    		}else{
    			this.push(alphabetArr.shift());
    		}
    	}
    });
    abReadable.pipe(process.stdout);*/
    
    const abReadable = Readable();
    abReadable._read = function(){
    	if (!alphabetArr.length) {
    		this.push(null);
    	} else {
    		this.push(alphabetArr.shift());
    	}
    }
    abReadable.pipe(process.stdout);
    

    以上代码使用了四种方法创建一个Readable可读流,必须实现_read()方法,以及用到了readable.push()方法,该方法的作用是将指定的数据添加到读取队列。
    Writable流实现
    我们只要继承Writable类并实现_write或_writev接口,如下所示(只使用两种方法):

    /*class MyWritable extends Writable{
    	constructor(){
    		super();
    	}
    	_write(chunk,encoding,callback){
    		process.stdout.write(chunk);
    		callback();
    	}
    }
    const myWritable = new MyWritable();*/
    const myWritable = new Writable({
    	write(chunk,encoding,callback){
    		process.stdout.write(chunk);
    		callback();
    	}
    });
    myWritable.on('finish',()=>{
    	process.stdout.write('done');
    })
    myWritable.write('a');
    myWritable.write('b');
    myWritable.write('c');
    myWritable.end();
    

    Duplex流实现
    实现Duplex流,需要继承Duplex类,并实现_read和_write接口,如下所示:

    class MyDuplex extends Duplex{
    	constructor(){
    		super();
    		this.source = [];
    	}
    	_read(){
    		if (!this.source.length) {
    			this.push(null);
    		} else {
    			this.push(this.source.shift());
    		}
    	}
    	_write(chunk,encoding,cb){
    		this.source.push(chunk);
    		cb();
    	}
    }
    
    const myDuplex = new MyDuplex();
    myDuplex.on('finish',()=>{
    	process.stdout.write('write done.')
    });
    myDuplex.on('end',()=>{
    	process.stdout.write('read done.')
    });
    myDuplex.write('
    a
    ');
    myDuplex.write('c
    ');
    myDuplex.end('b
    ');
    myDuplex.pipe(process.stdout);
    

    上面的代码实现了_read()方法,可作为可读流来使用,同时实现了_write()方法,又可作为可写流来使用。
    Transform流实现
    实现Transform流,需要继承Transform类,并实现_transform接口,如下所示:

    class MyTransform extends Transform{
    	constructor(){
    		super();
    	}
    	_transform(chunk, encoding, callback){
    		chunk = (chunk+'').toUpperCase();
    		callback(null,chunk);
    	}
    }
    const myTransform = new MyTransform();
    myTransform.write('hello world!');
    myTransform.end();
    myTransform.pipe(process.stdout);
    

    上面代码中的_transform()方法,其第一个参数,要么为error,要么为null,第二个参数将被自动转发给readable.push()方法,因此该方法也可以使用如下写法:

    _transform(chunk, encoding, callback){
    	chunk = (chunk+'').toUpperCase()
    	this.push(chunk)
    	callback();
    }
    

    Object Mode流实现
    我们知道流中的数据默认都是Buffer类型,可读流的数据进入流中便被转换成buffer,然后被消耗,可写流写入数据时,底层调用也将其转化为buffer。但将构造函数的objectMode选择设置为true,便可产生原样的数据,如下所示:

    const rs = Readable();
    rs.push('a');
    rs.push('b');
    rs.push(null);
    rs.on('data',(chunk)=>{console.log(chunk);});//<Buffer 61>与<Buffer 62>
    
    const rs1 = Readable({objectMode:!0});
    rs1.push('a');
    rs1.push('b');
    rs1.push(null);
    rs1.on('data',(chunk)=>{console.log(chunk);});//a与b
    

    下面利用Transform流实现一个简单的CSS压缩工具,如下所示:

    function minify(src,dest){
    	const transform = new Transform({
    		transform(chunk,encoding,cb){
    			cb(null,(chunk.toString()).replace(/[s
    	]/g,''));
    		}
    	});
    	fs.createReadStream(src,{encoding:'utf8'}).pipe(transform).pipe(fs.createWriteStream(dest));
    }
    minify('./reset.css','./reset.min.css');
    
  • 相关阅读:
    链接和作用域2 C++快速入门43
    位运算符
    代码编辑器和代码浏览器
    关系运算符
    delphi教程 | 第一个程序
    代码编辑器和代码浏览器
    链接和作用域2 C++快速入门43
    delphi教程 | 第一个程序
    位运算符
    [原创 js] 点击即可修改内容函数
  • 原文地址:https://www.cnblogs.com/zmxmumu/p/6141482.html
Copyright © 2011-2022 走看看