在本章的开始,我本来只想写一些关于fs模块的内容,虽然这个模块包含的方法非常的多,但没有想到的是它和我们上一篇文章Node.js Buffer还存在着关联,所以我又把关于Buffer的内容简单的学习下,以至于对它不那么陌生。我以为这样就可以来看看fs模块了,没想到又杀出个程咬金,这就是本文要学习的Stream。
这个Stream目前还是个不稳定的模块,从API的介绍来说,所有stream是对象EventEmitter的实例,测试下:
console.log(new(require(‘stream’)) instanceof require(‘events’).EventEmitter);//输出:true
什么是Stream(流)
在Node中,Stream是一个抽象的接口,由不同的实例对象来实现,比如HTTP服务器中的requestI流,再比如stdout标准输出流。流可以是可读的,可写的,或者可读可写的。
为什么要提供Stream
在Node中,大部分的模块能持续的读或者写,为了确保这些模块拥有统一的方法来完成持续的读或者写,Stream于是为它们提供了统一的接口。Stream接口提供了通用的方法和属性,因为stream是EventEmitter的实例,所以允许这些模块emit自己的事件。
Readable Stream(可读流)
一个常见的readable stream就是从一个文件中读取数据。可读流支持的事件有四种:data,end,error,close。它们分别对应如下场景:
data:当有数据可读时触发
end:当没有数据可读时或者达到文件尾
error:读取数据时遇到异常
close:底层的文件描述(fd)关闭时
为了简单起见,我们使用fs模块中的方法来读取windows下的boot.ini文件:
|
1
2
3
4
5
6
|
var fs = require('fs');fs.readFile('c:/boot.ini',function(err,data){ if(err) throw err; //console.log(data);//传递给回调函数的data参数是一个Buffer对象,大小默认为64K? console.log(data.toString());}); |
fs.readFile方法能触发data事件,在这里我们把读取到的数据打印到控制台上。为了让本文稍微有点深度,这里贴上fs.readFile方法的源码:
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
fs.readFile = function(path, encoding_) { var encoding = typeof(encoding_) === 'string' ? encoding_ : null; var callback = arguments[arguments.length - 1]; if (typeof(callback) !== 'function') callback = function() {}; // first, stat the file, so we know the size. var size; var buffer; // single buffer with file data var buffers; // list for when size is unknown var pos = 0; var fd; fs.open(path, constants.O_RDONLY, 438 /*=0666*/, function(er, fd_) { if (er) return callback(er); fd = fd_; fs.fstat(fd, function(er, st) { if (er) return callback(er); size = st.size; if (size === 0) { // the kernel lies about many files. // Go ahead and try to read some bytes. buffers = []; return read(); } buffer = new Buffer(size); read(); }); }); function read() { if (size === 0) { buffer = new Buffer(8192); fs.read(fd, buffer, 0, 8192, -1, afterRead); } else { fs.read(fd, buffer, pos, size - pos, -1, afterRead); } } function afterRead(er, bytesRead) { if (er) { return fs.close(fd, function(er2) { return callback(er); }); } if (bytesRead === 0) { return close(); } pos += bytesRead; if (size !== 0) { if (pos === size) close(); else read(); } else { // unknown size, just read until we don't get bytes. buffers.push(buffer.slice(0, bytesRead)); read(); } } function close() { fs.close(fd, function(er) { if (size === 0) { // collected the data into the buffers list. buffer = Buffer.concat(buffers, pos); } else if (pos < size) { buffer = buffer.slice(0, pos); } if (encoding) buffer = buffer.toString(encoding); return callback(er, buffer); }); }}; |
想认真学习的人应该仔细读读fs.readFile是如何实现的,想看fs模块的完整源码位于lib目录下的fs.js文件。下载Node.js源码(版本v0.8.18)。
为了更加直观的描述readable stream中的这些事件,请参见下面的例子
|
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
var fs = require('fs');var opt = { flags:'r', encoding:'utf8', fd:null, mode:0666, bufferSize:64 * 1024, start:0, end:99};st = fs.createReadStream('c:/boot.ini',opt);//st.pause();//暂停data事件的触发//st.resume();//恢复data事件var text = "";st.on('data',function(data){ text += data;});st.on('end',function(close){ console.log("reading 99 bytes:
" + text);});st.on('error',function(error){ console.log('An error occurred:' + error);});st.on('close',function(){ console.log('The file descriptor has been closed.');});st.on('open',function(fd){ console.log('Current fd number: ' + fd);}); |
输出:
Current fd number: 3
reading 99 bytes:
[boot loader]
timeout=3
default=multi(0)disk(0)rdisk(0)partition(1)WINDOWS
[operating systems]
The file descriptor has been closed.
F:wampwwwNode>
不想太多解析这个示例,为什么文件描述是3?因为0,1,2已经被占用啦,原因?你懂的!
Writable Stream(可写流)
可写流支持的事件有:drain,error,close,pipe。我只介绍drain和pipe,它们分别对应如下场景:
drain:当Node的写队列为空并且Buffer中没有数据时,发生这个事件
pipe:当把Writable Stream对象传递一个Readable Stream对象的pipe方法时发生
分别来看看这两个事件的例子:
1.drain
|
01
02
03
04
05
06
07
08
09
10
11
|
var fs = require('fs');var stream = fs.createWriteStream(__dirname + '/out.txt');var str = 'i=['; for (var i=0;i<10;i++){ stream.write(str + i + ']
');} stream.on('drain',function(){ console.log('start drain');}); |
当for循环完成时,我们的控制台才打印出’start drain’,这意味着写队列已经为空,即当前没有事件存在写入的情况。由于drain事件能发生在任何时候,所以为了避免触发不必要的drain事件,官方的建议是,使用events.EventEmitter once()方法,这样drain事件只可能触发一次,尔后它上面的监听器就被删除了。
2.pipe
|
1
2
3
4
5
6
7
8
|
var fs = require('fs');var writeStream = fs.createWriteStream('copyout.txt',{ flags:'w'});var readStream = fs.createReadStream('./out.txt');readStream.pipe(writeStream);writeStream.on('close',function(){ console.log('All done!');}); |
这个示例把out.txt文件中的内容复制到了copyout.txt中。
这篇文章写了我很长时间,虽然还有很多的知识点没有介绍上来,也到此为止吧。我希望在日后有机会补充一些。
Date.prototype.format = function(format){
var o = {
"M+" : this.getMonth()+1, //month
"d+" : this.getDate(), //day
"h+" : this.getHours(), //hour
"m+" : this.getMinutes(), //minute
"s+" : this.getSeconds(), //second
"q+" : Math.floor((this.getMonth()+3)/3), //quarter
"S" : this.getMilliseconds() //millisecond
}
if(/(y+)/.test(format)) format=format.replace(RegExp.$1,
(this.getFullYear()+"").substr(4 - RegExp.$1.length));
for(var k in o)if(new RegExp("("+ k +")").test(format))
format = format.replace(RegExp.$1,
RegExp.$1.length==1 ? o[k] :
("00"+ o[k]).substr((""+ o[k]).length));
return format;
}
var fs=require('fs');
fs.open(__dirname+'/error.log','a',function(err,fd){
if(err){ throw err; }
cTime = new Date();
cTime = cTime.format('yyyy-MM-dd h:m:s');
var wBuff = new Buffer('Start log: ' + cTime + "
");
buffPos = 0;
buffLen = wBuff.length;
filePos = 0;
fs.write(fd,wBuff,buffPos,buffLen,filePos,function(err,wbytes,data){
if(err){ throw err; }
console.log('wrote ' + wbytes + ' bytes');
fs.close(fd);
});
});