在本章的开始,我本来只想写一些关于fs模块的内容,虽然这个模块包含的方法非常的多,但没有想到的是它和我们上一篇文章Node.js Buffer还存在着关联,所以我又把关于Buffer的内容简单的学习下,以至于对它不那么陌生。我以为这样就可以来看看fs模块了,没想到又杀出个程咬金,这就是本文要学习的Stream。
这个Stream目前还是个不稳定的模块,从API的介绍来说,所有stream是对象EventEmitter的实例,测试下:
console.log(new(require(‘stream’)) instanceof require(‘events’).EventEmitter);//输出:true
什么是Stream(流)
在Node中,Stream是一个抽象的接口,由不同的实例对象来实现,比如HTTP服务器中的requestI流,再比如stdout标准输出流。流可以是可读的,可写的,或者可读可写的。
为什么要提供Stream
在Node中,大部分的模块能持续的读或者写,为了确保这些模块拥有统一的方法来完成持续的读或者写,Stream于是为它们提供了统一的接口。Stream接口提供了通用的方法和属性,因为stream是EventEmitter的实例,所以允许这些模块emit自己的事件。
Readable Stream(可读流)
一个常见的readable stream就是从一个文件中读取数据。可读流支持的事件有四种:data,end,error,close。它们分别对应如下场景:
data:当有数据可读时触发
end:当没有数据可读时或者达到文件尾
error:读取数据时遇到异常
close:底层的文件描述(fd)关闭时
为了简单起见,我们使用fs模块中的方法来读取windows下的boot.ini文件:
1
2
3
4
5
6
|
var fs = require( 'fs' ); fs.readFile( 'c:/boot.ini' , function (err,data){ if (err) throw err; //console.log(data);//传递给回调函数的data参数是一个Buffer对象,大小默认为64K? console.log(data.toString()); }); |
fs.readFile方法能触发data事件,在这里我们把读取到的数据打印到控制台上。为了让本文稍微有点深度,这里贴上fs.readFile方法的源码:
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
fs.readFile = function (path, encoding_) { var encoding = typeof (encoding_) === 'string' ? encoding_ : null ; var callback = arguments[arguments.length - 1]; if ( typeof (callback) !== 'function' ) callback = function () {}; // first, stat the file, so we know the size. var size; var buffer; // single buffer with file data var buffers; // list for when size is unknown var pos = 0; var fd; fs.open(path, constants.O_RDONLY, 438 /*=0666*/ , function (er, fd_) { if (er) return callback(er); fd = fd_; fs.fstat(fd, function (er, st) { if (er) return callback(er); size = st.size; if (size === 0) { // the kernel lies about many files. // Go ahead and try to read some bytes. buffers = []; return read(); } buffer = new Buffer(size); read(); }); }); function read() { if (size === 0) { buffer = new Buffer(8192); fs.read(fd, buffer, 0, 8192, -1, afterRead); } else { fs.read(fd, buffer, pos, size - pos, -1, afterRead); } } function afterRead(er, bytesRead) { if (er) { return fs.close(fd, function (er2) { return callback(er); }); } if (bytesRead === 0) { return close(); } pos += bytesRead; if (size !== 0) { if (pos === size) close(); else read(); } else { // unknown size, just read until we don't get bytes. buffers.push(buffer.slice(0, bytesRead)); read(); } } function close() { fs.close(fd, function (er) { if (size === 0) { // collected the data into the buffers list. buffer = Buffer.concat(buffers, pos); } else if (pos < size) { buffer = buffer.slice(0, pos); } if (encoding) buffer = buffer.toString(encoding); return callback(er, buffer); }); } }; |
想认真学习的人应该仔细读读fs.readFile是如何实现的,想看fs模块的完整源码位于lib目录下的fs.js文件。下载Node.js源码(版本v0.8.18)。
为了更加直观的描述readable stream中的这些事件,请参见下面的例子
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
var fs = require( 'fs' ); var opt = { flags: 'r' , encoding: 'utf8' , fd: null , mode:0666, bufferSize:64 * 1024, start:0, end:99 }; st = fs.createReadStream( 'c:/boot.ini' ,opt); //st.pause();//暂停data事件的触发 //st.resume();//恢复data事件 var text = "" ; st.on( 'data' , function (data){ text += data; }); st.on( 'end' , function (close){ console.log( "reading 99 bytes:
" + text); }); st.on( 'error' , function (error){ console.log( 'An error occurred:' + error); }); st.on( 'close' , function (){ console.log( 'The file descriptor has been closed.' ); }); st.on( 'open' , function (fd){ console.log( 'Current fd number: ' + fd); }); |
输出:
Current fd number: 3
reading 99 bytes:
[boot loader]
timeout=3
default=multi(0)disk(0)rdisk(0)partition(1)WINDOWS
[operating systems]
The file descriptor has been closed.
F:wampwwwNode>
不想太多解析这个示例,为什么文件描述是3?因为0,1,2已经被占用啦,原因?你懂的!
Writable Stream(可写流)
可写流支持的事件有:drain,error,close,pipe。我只介绍drain和pipe,它们分别对应如下场景:
drain:当Node的写队列为空并且Buffer中没有数据时,发生这个事件
pipe:当把Writable Stream对象传递一个Readable Stream对象的pipe方法时发生
分别来看看这两个事件的例子:
1.drain
01
02
03
04
05
06
07
08
09
10
11
|
var fs = require( 'fs' ); var stream = fs.createWriteStream(__dirname + '/out.txt' ); var str = 'i=[' ; for ( var i=0;i<10;i++){ stream.write(str + i + ']
' ); } stream.on( 'drain' , function (){ console.log( 'start drain' ); }); |
当for循环完成时,我们的控制台才打印出’start drain’,这意味着写队列已经为空,即当前没有事件存在写入的情况。由于drain事件能发生在任何时候,所以为了避免触发不必要的drain事件,官方的建议是,使用events.EventEmitter once()方法,这样drain事件只可能触发一次,尔后它上面的监听器就被删除了。
2.pipe
1
2
3
4
5
6
7
8
|
var fs = require( 'fs' ); var writeStream = fs.createWriteStream( 'copyout.txt' ,{ flags: 'w' }); var readStream = fs.createReadStream( './out.txt' ); readStream.pipe(writeStream); writeStream.on( 'close' , function (){ console.log( 'All done!' ); }); |
这个示例把out.txt文件中的内容复制到了copyout.txt中。
这篇文章写了我很长时间,虽然还有很多的知识点没有介绍上来,也到此为止吧。我希望在日后有机会补充一些。
Date.prototype.format = function(format){
var o = {
"M+" : this.getMonth()+1, //month
"d+" : this.getDate(), //day
"h+" : this.getHours(), //hour
"m+" : this.getMinutes(), //minute
"s+" : this.getSeconds(), //second
"q+" : Math.floor((this.getMonth()+3)/3), //quarter
"S" : this.getMilliseconds() //millisecond
}
if(/(y+)/.test(format)) format=format.replace(RegExp.$1,
(this.getFullYear()+"").substr(4 - RegExp.$1.length));
for(var k in o)if(new RegExp("("+ k +")").test(format))
format = format.replace(RegExp.$1,
RegExp.$1.length==1 ? o[k] :
("00"+ o[k]).substr((""+ o[k]).length));
return format;
}
var fs=require('fs');
fs.open(__dirname+'/error.log','a',function(err,fd){
if(err){ throw err; }
cTime = new Date();
cTime = cTime.format('yyyy-MM-dd h:m:s');
var wBuff = new Buffer('Start log: ' + cTime + "
");
buffPos = 0;
buffLen = wBuff.length;
filePos = 0;
fs.write(fd,wBuff,buffPos,buffLen,filePos,function(err,wbytes,data){
if(err){ throw err; }
console.log('wrote ' + wbytes + ' bytes');
fs.close(fd);
});
});