zoukankan      html  css  js  c++  java
  • Nodejs stream模块-翻译

    花了两天时间尝试按照自己的话翻译了一下stream模块,以下内容皆翻译于:https://nodejs.org/api/stream.html.

    目录

    1  Stream(流)

        1.1     这篇文档的组织方式

        1.2    stream的种类

              1.2.1   对象模式

              1.2.2  Buffering

        1.3    API for Stream Consumers

              1.3.1  Writable Streams

                  1.3.1.1 Class: stream.Writable

                      1.3.1.1.1........................................................................ Event: 'close'

                      1.3.1.1.2........................................................................ Event: 'drain'

                      1.3.1.1.3........................................................................ Event: 'error'

                      1.3.1.1.4....................................................................... Event: 'finish'

                      1.3.1.1.5.......................................................................... Event: 'pipe'

                      1.3.1.1.6..................................................................... Event: 'unpipe'

                      1.3.1.1.7..................................................................... writable.cork()

                      1.3.1.1.8............ writable.end([chunk][, encoding][, callback])

                      1.3.1.1.9...................... writable.setDefaultEncoding(encoding)

                      1.3.1.1.10.............................................................. writable.uncork()

                      1.3.1.1.11................................ writable.writableHighWaterMark

                      1.3.1.1.12................................................ writable.writableLength

                      1.3.1.1.13.......... writable.write(chunk[, encoding][, callback])

                      1.3.1.1.14................................................ writable.destroy([error])

            1.3.2  Readable Streams

                1.3.2.1  两种模式

                1.3.2.2  三种状态

                1.3.2.3  选择一个

                1.3.2.4 Class: stream.Readable

                    1.3.2.4.1........................................................................ Event: 'close'

                    1.3.2.4.2.......................................................................... Event: 'data'

                    1.3.2.4.3........................................................................... Event: 'end‘

                    1.3.2.4.4........................................................................ Event: 'error'

                    1.3.2.4.5................................................................. Event: 'readable'

                    1.3.2.4.6........................................................... readable.isPaused()

                    1.3.2.4.7................................................................. readable.pause()

                    1.3.2.4.8.......................... readable.pipe(destination[, options])

                    1.3.2.4.9................................ readable.readableHighWaterMark

                    1.3.2.4.10....................................................... readable.read([size])

                    1.3.2.4.11.............................................. readable.readableLength

                    1.3.2.4.12........................................................... readable.resume()

                    1.3.2.4.13................................ readable.setEncoding(encoding)

                    1.3.2.4.14.................................... readable.unpipe([destination])

                    1.3.2.4.15................................................ readable.unshift(chunk)

                    1.3.2.4.16................................................... readable.wrap(stream)

                    1.3.2.4.17............................................... readable.destroy([error])

            1.3.3  Duplex and Transform Streams

                1.3.3.1 Class: stream.Duplex

                1.3.3.2 Class: stream.Transform

                    1.3.3.2.1............................................... transform.destroy([error])

        1.4   API for Stream Implementers

            1.4.1   简化的构造器

            1.4.2   实现Writable Stream

                1.4.2.1  构造器: new stream.Writable([options])

                1.4.2.2 writable._write(chunk, encoding, callback)

                1.4.2.3 writable._writev(chunks, callback)

                1.4.2.4 writable._destroy(err, callback)

                1.4.2.5 writable._final(callback)

                1.4.2.6  写数据时出错误怎么办

                1.4.2.7 Writable Stream的一个例子

                1.4.2.8  在 Writable Stream解析buffer

            1.4.3   实现一个Readable Stream

                1.4.3.1 new stream.Readable([options])

                1.4.3.2 readable._read(size)

                1.4.3.3 readable._destroy(err, callback)

                1.4.3.4 readable.push(chunk[, encoding])

                1.4.3.5 Reading时错误怎么办

                1.4.3.6 An Example Counting Stream

           1.4.4   实现一个Duplex Stream

                1.4.4.1 new stream.Duplex(options)

                1.4.4.2 Duplex Stream的一个例子

                1.4.4.3  对象模式的Duplex Streams

           1.4.5   实现一个 Transform Stream

                1.4.5.1 new stream.Transform([options])

                1.4.5.2  事件: 'finish' and 'end'

                1.4.5.3 transform._flush(callback)

                1.4.5.4 transform._transform(chunk, encoding, callback)

                1.4.5.5 Class: stream.PassThrough

        1.5     附加说明

            1.5.1   与旧Nodejs版本的兼容

            1.5.2  readable.read(0)

            1.5.3  readable.push('')

            1.5.4  highWaterMark与readable.setEncoding()

    1   Stream(流)

    stream是一个抽象接口,旨在处理数据流。stream模块提供了基本的API,方便继承stream接口,从而构造流式对象。

    Nodejs里有很多流式对象,比如说http请求对象和process.stdout对象。

    流能写,能写或兼二者,所以的流都是EventEmitter的一个实例,换句话说,Nodejs暴露出来的流都是继承自EventEmitter。

    通过以下方式引入stream模块:

    const stream = require('stream');

    1.1   这篇文档的组织方式

    文档分为两个主要部分,另外第三部分是额外注意的地方。第一部分阐释了使用流的基本要素。第二部分则阐释了如何自定义流。

    1.2   stream的种类

    有4种基本类型的流:

    Readable – 可以从中读取数据的流(比如说  fs.createReadStream())

    Writable – 可以向其写入数据的流(比如说  fs.createWriteStream())

    Duplex – 能读能写的流(比如说 net.Socket)

    Transform – 跟Duplex一样可读写,但可以修改流的数据(比如说 zlib.createDeflate())

    1.2.1  对象模式

    所有的流都专门用来处理strings和Buffer(或者Uint8Array)对象。然而流也可以用来处理其他JavaScript类型。这种流被认为是以“对象模式”去处理数据。

    创建流对象时,可以使用objectMode选项转换成对象模式。但试图把一个已经存在的流转换成对象模式并不安全。

    1.2.2   Buffering

    Writable和Readable流在内部实现上都有一个buffer容器,用来存储数据,可以分别通过writable.writableBuffer和readable.readableBuffer访问这个buffer容器。

    存储数据的容量决定于highWaterMark选项,通过创建流时传入。对象普通的流,highWaterMark值代表总字节数,而对于对象模式流,这个参数指定总的对象数。

    当调用stream.push(chunk),chunk数据会存入Readable流,如果流的消费者没有调用stream.read(),数据会一直在内部队列,直到被消费。

    一旦内部的数据量达到了由highWaterMark指定的临界值,流将会暂时停止从底层资源读取数据,直到当前buffered的数据被消费(也就是说,stream将停止调用内部函数readble._read()方法,此方法用来填充内部buffer)

    当调用writable.write(chunk),数据被buffer在Writable stream中。当内部bbuffer小于highWaterMark值,writable.write()返回true,否则返回false。

    stream API的一个关键目标,特别是stream.pipe()方法,就是以一个合理的方式协调两边的buffer数据,因为源数据和目的数据的读取速度不一致可能会导致内存耗尽。

    Duplex和Transform都是可读可写,所以内部需要维持两个单独的buffer容器,用于读写,这就使得在维持合适和有效的数据流下,读和写可以单独进行。比如说net.Socket实例就是一个Duplex流,可读端可以消费数据,可写端可以写入数据。因为数据的写入可能比数据的读入更快或者更慢,所以单独操作显得是有必要的。

    1.3   API for Stream Consumers

    几乎所有的nodejs应用或多或少都会用到流。下面是一个使用流的例子,实现了http服务:

    const http = require('http');
    const server = http.createServer((req, res) => {
      // req is an http.IncomingMessage, which is a Readable Stream
      // res is an http.ServerResponse, which is a Writable Stream
      let body = '';
      // Get the data as utf8 strings.
      // If an encoding is not set, Buffer objects will be received.
      req.setEncoding('utf8');
      // Readable streams emit 'data' events once a listener is added
      req.on('data', (chunk) => {
        body += chunk;
      });
      // the end event indicates that the entire body has been received
      req.on('end', () => {
        try {
          const data = JSON.parse(body);
          // write back something interesting to the user:
          res.write(typeof data);
          res.end();
        } catch (er) {
          // uh oh! bad json!
          res.statusCode = 400;
          return res.end(`error: ${er.message}`);
        }
      });
    });
     
    server.listen(1337);
     
    // $ curl localhost:1337 -d "{}"
    // object
    // $ curl localhost:1337 -d ""foo""
    // string
    // $ curl localhost:1337 -d "not json"
    // error: Unexpected token o in JSON at position 1

    Writable流暴露了像write()和end()方法来写数据到流中。

    Readable流使用了EventEmitter的API,当有数据可读时,通过事件通知应用。当然,可以通过多种方式获取可读的数据。

    一般情况下,没有必要自己实现流的接口。

    1.3.1  Writable Streams

    可写流是对数据被写入的目的一层抽象.

    Nodejs里可写流的例子有:

    所有的可以流都实现了stream.Writable类里的接口。

    尽管一些特定的可写流在一些方面不 一样,但所有的可写流都遵循下面的基本使用模式:

    const myStream = getWritableStreamSomehow();
    myStream.write('some data');
    myStream.write('some more data');
    myStream.end('done writing data');

    1.3.1.1   Class: stream.Writable

    v0.9.4加入

    1.3.1.1.1 Event: 'close'

    当流和底层资源(文件描述符等)被关闭时触发。这个事件表明不再有其他事件触发,也不在计算数据。

    不是所有的可写流都触发`cloase`事件。

    1.3.1.1.2 Event: 'drain'

    当调用stream.write(chunk)返回false时,`drain`事件触发。当合适地恢复向流中写数据

    // Write the data to the supplied writable stream one million times.
    // Be attentive to back-pressure.
    function writeOneMillionTimes(writer, data, encoding, callback) {
      let i = 1000000;
      write();
      function write() {
        let ok = true;
        do {
          i--;
          if (i === 0) {
            // last time!
            writer.write(data, encoding, callback);
          } else {
            // see if we should continue, or wait
            // don't pass the callback, because we're not done yet.
            ok = writer.write(data, encoding);
          }
        } while (i > 0 && ok);
        if (i > 0) {
          // had to stop early!
          // write some more once it drains
          writer.once('drain', write);
        }
      }
    }

    1.3.1.1.3  Event: 'error'

    当在写入数据或者piping数据时发生错误,`error`事件触发,因调会传入error参数。
    注意:当`error`事件发生,流不会被关闭。

    1.3.1.1.4 Event: 'finish'

    当调用stream.end()方法,而且所有的数据都flush进底层系统时,`finish`事件触发。
    const writer = getWritableStreamSomehow();
    for (let i = 0; i < 100; i++) {
      writer.write(`hello, #${i}!
    `);
    }
    writer.end('This is the end
    ');
    writer.on('finish', () => {
      console.error('All writes are now complete.');
    });
     

    1.3.1.1.5 Event: 'pipe'

    当在一个可读流上调用stream.pipe()方法,把这个可写读作为可读流的一个目地时,`pipe`事触发。
    const writer = getWritableStreamSomehow();
    const reader = getReadableStreamSomehow();
    writer.on('pipe', (src) => {
      console.error('something is piping into the writer');
      assert.equal(src, reader);
    });
    reader.pipe(writer);
     

    1.3.1.1.6 Event: 'unpipe'

    当在一个可读流上调用unpipe()方法时触发,把这个可写流从它的目地流中移除。
    const writer = getWritableStreamSomehow();
    const reader = getReadableStreamSomehow();
    writer.on('unpipe', (src) => {
      console.error('Something has stopped piping into the writer.');
      assert.equal(src, reader);
    });
    reader.pipe(writer);
    reader.unpipe(writer);

    1.3.1.1.7 writable.cork()

    v0.11.2加入
    writable.cork()方法强制使得被写入的数据buffer进内存(不是一直都是在内存里么?),被buffer的数据将会flush,当调用stream.uncork()或者stream.end()方法时。
    cork最主要的目的是避免这样一个场景:当向流中写入许多小块数据时,导致不在内部buffer里备份,这样会对性能有一不利的影响。如此,writable._writev()方法能以更优的方法处理buffer写入。

    1.3.1.1.8 writable.end([chunk][, encoding][, callback])

    • chunk <string> | <Buffer> | <Uint8Array> | <any> 写入stream的数据。对于普通的流,chunk必须是string、Buffer或者Unit8Array对象。对于对象模式流,chunk是JavaScript值,不能是null。
    • encoding <string> 如果设置了编码,那chunk就是string。
    • callback <function> 当流结束时的回调。

    调用end()方法表明不再有数据写入到Writable流。如果传入了chunk参数,则是最后一次向流写入数据,之后会关闭流。如果设置了callback,则回调函数将作为`finish`事件的监听器。

    在调用了stream.end()方法后再调用stream.write()方法,将会抛出异常。

    1.3.1.1.9  writable.setDefaultEncoding(encoding)

    • encoding <string> 新的默认编码方式
    • Returns: this

    设置默认的编码方式

    1.3.1.1.10   writable.uncork()

    uncork()方法flush所有buffer的数据,自cork()方法调用尹始。

    当使用cork()和uncork()方法管理buffer数据时,推荐在process.nextTick()里调用uncork(),在下个事件循环中调用。

    stream.cork();
    stream.write('some ');
    stream.write('data ');
    process.nextTick(() => stream.uncork());

    如果cork()方法被调用多次,那么uncork()方法也必须调用相应多的次数,不然数据不会被flush。

    stream.cork();
    stream.write('some ');
    stream.cork();
    stream.write('data ');
    process.nextTick(() => {
      stream.uncork();
      // The data will not be flushed until uncork() is called a second time.
      stream.uncork();
    });

    1.3.1.1.11      writable.writableHighWaterMark

    v9.3.0加入

    返回构造流时传入的highWaterMark值。

    1.3.1.1.12   writable.writableLength

    v9.4.0加入

    队列里准备写的字节数(应该是这样理解的:数据先在writable流里列队里,再写入到底层资源)

    1.3.1.1.13   writable.write(chunk[, encoding][, callback])

    • chunk <string> | <Buffer> | <Uint8Array> | <any> 写入的数据,对于对象模式,则是JavaScript对象
    • encoding  <string> 编码方式
    • callback <Function> 当这块数据被flush时,回调。
    • Returns: <boolean> 如果流希望在继续写入额外数据时前,`drain`事件触发,则返回false,否则返回true

    write()方法向流写入一些数据,并且一旦写入的数据被处理,回调会调用。如果有错误发生,callback可能,也可能不,会被调用,这进第一个参数是error。 为了更可靠的检测错误的发生,最好的办法是添加`error`事件。

    如果内部buffer数据小于highWaterMark,write()方法返回true。否则返回false,这时不应该继续写入数据到流中,直到`drain`事件触发。这说明`drain`事件触发后,就是内部buffer被可用的时候,因为drain是排,排干的意思。

    当一个流没有在draining(排),调用write()方法会buffer chunk,并且返回false。一旦所有的buffer chunk被drained(排), `drain`事件会触发。推荐一旦write()方法返回false,不再写入chunks,直到`drain`事件触发。当向一个不允许draining的流上调用write()方法时,Nodejs会buffer所有被写入的chunks,直到最大内存使异常出现,此时会无条件中止。基于在中止前,大量的内存使用会导致性能低下的垃圾回收,和高RSS(最大内存常驻区,没有释放给操作系统)。因为TCP sockets可能永远不会drain,因为远端不从流中read数据,不停的向socket写入可能导致远程可利用的漏洞。

    对于Transform,写入一个不会draining数据的流特别是个问题,因为Transform默认会暂停,直到被导向piped或者`data`或`readable`事件处理器被添加。

    如果被写入的数据能被生成或者需要按需获取,推荐封装逻辑进Readable,并且使用stream.pipe()方法。但如果更愿意使用write(),鉴于背压和避免内存问题,应该使用`draing`事件:

    function write(data, cb) {
      if (!stream.write(data)) {
        stream.once('drain', cb);
      } else {
        process.nextTick(cb);
      }
    }
     
    // Wait for cb to be called before doing any other write.
    write('hello', () => {
      console.log('write completed, do more writes now');
    });

    处于对象模式的流会忽视encoding参数。

    1.3.1.1.14      writable.destroy([error])

    v8.0.0加入

    • Returns: this

    摧毁流,并传递错误对象error。destory()调用之后,流会被结束。实现者不应该覆盖这个主应运,而是应该实现writable._destory.

    1.3.2  Readable Streams

    可读流是对可消费的数据源的一层抽象。

    可读流的例子包括:

    所有的可读流都实现了stream.Readable类的接口。

    1.3.2.1      两种模式

    可读流实际上以两种方式运作:流动(flowing)和暂停(pause)。

    当处于流动模式,数据从底层系统自动地读取,并进可能快的通过EventEmitter事件提供给应用。

    当处于暂停模式,必须显示调用stream.read()方法来从流中读取数据。

    所有以暂停模式启动的流都能通过下面几种方式转换成流动模式:

    • 添加`data`事件处理器
    • 调用stream.resume()方法
    • 调用stream.pipe()方法向Writable发送数据。

    可读流可以通过下面几中方式转回暂停模式:

    • 如果没有指定pipe目的,调用stream.pause方法
    • 如果有pipe目的,清除任何的`data`事件处理,并且清除所有的pipe目的,通过stream.unpipe()方法

    一个重要的概念需要记住,就是Readable不会产生数据,直到提供了一种消费或者忽略数据的机制。如果消费机制不能使用,或者被取缔的话,Readable将试图停止生成数据。(?????)

    注意:因为历史兼容性原因,移除`data`事件处理器不会自动暂停流,而且,如果有pipe目的,一旦这些目的流排干,并且请求更多的数据,调用stream.pause()方法不用保证流保持暂停模式。(???)

    注意:如果一个Readable被转换为流动模式,并且没有可用的消费者处理数据,数据将会丢失。这种情况会生在readable.resume()方法调用后,没有为`data`事件添加处理器,或者`data`事件被移除。

    1.3.2.2      三种状态

    可读流的两种运作方式是对更复杂的内部状态管理的一种简化,这种复杂的内部状态发生在可读流的内部实现中。

    特别地,在任何时候,每个可读流都处于下面三种状态的一种:

    • readable.readableFlowing = null
    • readable.readableFlowing = false
    • readable.readableFlowing = true

    当readable.readableFlowing = null时,没有一种为消费流数据的机制提供,所以流不会产生数据。处于这种状态时,为流添加`data`事件处理器,调用readable.pip()方法,或者调用readable.resume()方法将会把readable.readableFlowing转换为true,导致可读流开始主动触发事件,因为数据产生了。

    调用readable.pause(),readable.unpipe()或者接到背压将导致readable.readableFlowing被设置为false,此时会暂时中止流动事件,但数据的产生不会中止,处于这个状态时,添加`data`事件处理器不会导致readable.readableFlowing为true。

    const { PassThrough, Writable } = require('stream');
    const pass = new PassThrough();
    const writable = new Writable();
     
    pass.pipe(writable);
    pass.unpipe(writable);
    // readableFlowing is now false
     
    pass.on('data', (chunk) => { console.log(chunk.toString()); });
    pass.write('ok'); // will not emit 'data'
    pass.resume(); // must be called to make 'data' being emitted

    在readable.readableFlowing是false时,数据可能会在内部buffer里堆积。

    1.3.2.3      选择一个

    在不同的nodejs版本中不断演化的可读流API提供了多种消费流数据的方式。大体上,开发者应该选择一种,并且永远不要对单一的流使用多种方式消费数据。

    对于大多数使用者,推荐使用stream.pipe()方法,因为它提供了消费流数据的最简单的方式。开发者如何需要更精细粒度的控制数据的生产和传输,可以使用EventEmitter和readable.pause()/readable.resume()。

    1.3.2.4      Class: stream.Readable

    v0.9.4加入

    1.3.2.4.1  Event: 'close'

    v0.9.4加入

    当流被关闭,并且底层资源(文件描述符)被关闭时,`close`事触发。这个事件表明不再有更多的事件将触发,也没有更多的计算出现。

    不是所有的可读流都会触发`close`事件。

    1.3.2.4.2  Event: 'data'

    v0.9.4加入

    chunk <Buffer> | <string> | <any> 数据chunk。对于不是对象模式的流,chunk是string、Buffer。对于对象模式的流,chunk是任何JavaScript值,不能是null。

    当流正在放弃数据chunk的所有权,给一个消费者时,`data`事件触发。这可能发生在流被转换到流动模式,通过readable.pipe(), readable.resume()或者添加`data`事件处理器。不管任何时候readable.read()方法被调用,并且数据chunk能被返回时,`data`事件将会触发。

    为流添加`data`事件处理器会转换成流动模式,数据一旦可用,将会被传递。

    处理器回调函数会传入数据chunk,如果使用readable.setEncoding()

    指定了默认的编码方式,chunk数据将作为string传递,否则chunk将以Buffer对象传递。

    const readable = getReadableStreamSomehow();
    readable.on('data', (chunk) => {
      console.log(`Received ${chunk.length} bytes of data.`);
    });

    1.3.2.4.3  Event: 'end‘

    v0.9.4

    当流里没有更多的数据消费时,`end`事件触发。

    注意:`end`事件不会触发,除非数据被完全消费完。如果把流转换为流动模式,或者不断的使用stream.read()方法直到数据被完全消费,将会是一件技艺高超的事。

    const readable = getReadableStreamSomehow();
    readable.on('data', (chunk) => {
      console.log(`Received ${chunk.length} bytes of data.`);
    });
    readable.on('end', () => {
      console.log('There will be no more data.');
    });

    1.3.2.4.4  Event: 'error'

         v0.9.4加入

    `error`事件可能随便触发。典型地,error事件可能在这种情况下发生:由于底层内部错误,底层流不能产生数据,或者流试图push非法的chunk数据到内部伯buffer。

    监听器将传入Error对象。

    1.3.2.4.5  Event: 'readable'

    v0.9.4加入

    当有可用的数据准备从流中读取时,`readable`事件触发。在某些情况下,为`readable`事件添加监听器将导致一些数据即将被读入到内部buffer中。

    const readable = getReadableStreamSomehow();
    readable.on('readable', () => {
      // there is some data to read now
    });

    `readable`事件同样会在流数据读完,在`end`事件前触发。

    实际上,`readable`事件表明这个流的一个新信息:要么新数据可用,要么流到尾了。对于 前者,stream.read()将返回可用的数据,在后者,stream.read()将返回null。在下面的例子中,foo.txt是个空文件:

    const fs = require('fs');
    const rr = fs.createReadStream('foo.txt');
    rr.on('readable', () => {
      console.log(`readable: ${rr.read()}`);
    });
    rr.on('end', () => {
      console.log('end');
    });

    执行脚本,输出是这样:

    $ node test.js
    readable: null
    end

    注意:大体上,比起`readable`事件,readable.pipe()和`data`事件机制更容易理解。无论如何,`readable`可能导致吞吐量不断增加。(???点解)

    1.3.2.4.6  readable.isPaused()

    v0.11.14加入

    返回当前Readable的操作状态。这个主法方要是由readable.pipe()方法之下的机制使用。在大多数情况下,没直接使用这个方法。

    const readable = new stream.Readable();
     
    readable.isPaused(); // === false
    readable.pause();
    readable.isPaused(); // === true
    readable.resume();
    readable.isPaused(); // === false

    1.3.2.4.7  readable.pause()

    v0.9.4加入

    • Returns: this

    readable.pause方法将导致处于流动模式的流停止触发`data`事件,断开流动模式,任何可用的数据将保留在内部buffer中。

    const readable = getReadableStreamSomehow();
    readable.on('data', (chunk) => {
      console.log(`Received ${chunk.length} bytes of data.`);
      readable.pause();
      console.log('There will be no additional data for 1 second.');
      setTimeout(() => {
        console.log('Now data will start flowing again.');
        readable.resume();
      }, 1000);
    });

    1.3.2.4.8  readable.pipe(destination[, options])

    v0.9.4加入

    end <boolean> 当reader结束时,writer也结束,默认是true。

    readable.pipe()方法附加一个Writable流到readable,导致自动转换为流动模式,并且push所有的数据到附加的Writable流。数据流会自动管理,以便目的Writable流不会超过更快的Readable流。

    下面的例子从readable流中导向所有的数据到一个文件,名为file.txt:

    const readable = getReadableStreamSomehow();
    const writable = fs.createWriteStream('file.txt');
    // All the data from readable goes into 'file.txt'
    readable.pipe(writable);

    为单独一个Readable流添加多个Writable流是可能的。

    readable.pip()方法返回指向目的流的引用,这使得建立一系列链式的流成为可能:

    const r = fs.createReadStream('file.txt');
    const z = zlib.createGzip();
    const w = fs.createWriteStream('file.txt.gz');
    r.pipe(z).pipe(w);

    默认情况下,当源Readable流触发`end`事件时,目的Writable流上的end()方法会被调用,以致目的Writable不再可写。可能通过传递end参数取消这个默认行为,如此,目的Writable流会保持打开:

    reader.pipe(writer, { end: false });
    reader.on('end', () => {
      writer.end('Goodbye
    ');
    });

    一个重要的警告是,如果Readable流在动作的时候触发了一个错误,Writable流不 会被自动关闭。如果错误发生,手动关闭每个流是必要的,因为可以防止内存泄露。

    注意:不管什么样的参数,process.stderr和process.stdout这两个Writable流永远不会被关闭,直到Nodejs进程退出。

    1.3.2.4.9  readable.readableHighWaterMark

    v9.3.0加入

    返回构造Readable时传入的highWaterMark值。

    1.3.2.4.10      readable.read([size])

    v0.9.4加入

    read()方法从内部buffer里pull一些数据,并返回。如果没有可用的读数据,将返回null。默认下,返回的数据是Buffer对象,除非有用readable.setEncoding()方法设置编码,或者当前流是对象模式。

    可选的size参数指定了要读取的字节数。如果没有足够的size字节数读取,将会返回null,除非流已经结束了,在这种情况下,所有在内部buffer的数据将会被返回。

    如果没有指定size参数,所有在内部buffer的数据将会被返回。

    readable.read()方法应该只在暂停模式下调。在流动模式,readable.read()会自动被调用,直到内部buffer完全drained。

    const readable = getReadableStreamSomehow();
    readable.on('readable', () => {
      let chunk;
      while (null !== (chunk = readable.read())) {
        console.log(`Received ${chunk.length} bytes of data.`);
      }
    });

    大体上,推荐开发者避免使用`readable`事件和readable.read()方法,而是使用readable.pipe()或者`data`事件。

    一个对象模式的Readable流不管调用readable.read(size)方法时传入什么size的值是什么,都只会返回单一的一项。

    注意:如果readable.read()方法返回一个数据chunk,`data`事件也同样会被触发。

    注意:在`end`事件发生后,调用stream.read([size])将会返回null,不会有运行时错误抛出。

    1.3.2.4.11      readable.readableLength

    v9.4.0加入

    返回在队列里准备读的字节数。

    1.3.2.4.12      readable.resume()

    v0.9.4加入

    • Returns: this

    readable.resume()方法会明确地导致一个已经暂停的Readable流开始恢复触发`data`事件,把流转换为流动模式。

    resume()方法能被用于从流中完全消费数据,而在没有任何处理任何数据的情况下:

    getReadableStreamSomehow()
      .resume()
      .on('end', () => {
        console.log('Reached the end, but did not read anything.');
      });

    1.3.2.4.13      readable.setEncoding(encoding)

    v0,9.4加入

    • encoding <string> 编码类型
    • Returns: this

    readable.setEncoding()方法设置从Readable流读取数据时的字符编码。

    默认情况下,没有指定编码的话,流数据将返回Buffer对象。设置编码后将返回指定编码的string字符串。比如说readable.setEncoding(‘utf8’)将导致输出的数据被编码为utf-8。readable.setEncoding(‘hex’)将导致数据以16进制的字符串形式输出。

    Readable流会适当的处理多字节字符,如果只是简单的从流里pull数据

    作为Buffer对象,会被不恰当的解码。

    const readable = getReadableStreamSomehow();
    readable.setEncoding('utf8');
    readable.on('data', (chunk) => {
      assert.equal(typeof chunk, 'string');
      console.log('got %d characters of string data', chunk.length);
    });

    1.3.2.4.14      readable.unpipe([destination])

    v0.9.4加入

    readable.unpipe()方法拆卸之前使用stream.pipe()方法附加的Writable流。

    没有指定目的流,所有的pipes会被拆卸。

    如果指定了目的流,但没有pipe建立,那这个方法不做什么事。

    const readable = getReadableStreamSomehow();
    const writable = fs.createWriteStream('file.txt');
    // All the data from readable goes into 'file.txt',
    // but only for the first second
    readable.pipe(writable);
    setTimeout(() => {
      console.log('Stop writing to file.txt');
      readable.unpipe(writable);
      console.log('Manually close the file stream');
      writable.end();
    }, 1000);

    1.3.2.4.15      readable.unshift(chunk)

    • chunk <Buffer> | <Uint8Array> | <string> | <any>  将chunk数据添加到读队列首上。对于不是对象模式的流,chunk必须是string,Buffer或者Uint8Array。对于对象模式流,chunk是JavaScript值。

    readable.unshift()方法push chunk数据到内部的buffer。在这种场景下会很有用:流被消费后,需要反消费一些数据,以便数据能被其他第三方获取。

    注意:在`end`事件触发后,stream.unshift(chunk)方法不能再调用,否则会抛出一个运行时错误。

    使用stream.unshift()方法应该考虑使用Transform代替。

    // Pull off a header delimited by 
    
    
    // use unshift() if we get too much
    // Call the callback with (error, header, stream)
    const { StringDecoder } = require('string_decoder');
    function parseHeader(stream, callback) {
      stream.on('error', callback);
      stream.on('readable', onReadable);
      const decoder = new StringDecoder('utf8');
      let header = '';
      function onReadable() {
        let chunk;
        while (null !== (chunk = stream.read())) {
          const str = decoder.write(chunk);
          if (str.match(/
    
    /)) {
            // found the header boundary
            const split = str.split(/
    
    /);
            header += split.shift();
            const remaining = split.join('
    
    ');
            const buf = Buffer.from(remaining, 'utf8');
            stream.removeListener('error', callback);
            // remove the readable listener before unshifting
            stream.removeListener('readable', onReadable);
            if (buf.length)
              stream.unshift(buf);
            // now the body of the message can be read from the stream.
            callback(null, header, stream);
          } else {
            // still reading the header.
            header += str;
          }
        }
      }
    }

    注意:不像stream.push(chunk),通过重置内部读状态,unshift不会结束读过程。如果unshift()在读数据期间被调用,这可能导致意想不倒的结果。调用unshift()后紧接着调用stream.push(‘’)会合适地重置读状态,但是当在进行一个读过程时,最好还是避免使用unshift()方法。

    1.3.2.4.16      readable.wrap(stream)

    v0.9.4加入

    • stream <Stream> 一个老式的可读流

    早于Nodjes v0.10的版本有一些流没有完全实现如今stream模块的API。

    当使用旧的Nodejs库触发`data`事件,并且只有stream.pause()方法时,readable.wrap()方法可以用来创建一个Readable流,将旧的流作为数据源。

    很少使用readable.wrap()方法,这个方法提供了一种简便的与旧Nodejs版本交互的方式。

    const { OldReader } = require('./old-api-module.js');
    const { Readable } = require('stream');
    const oreader = new OldReader();
    const myReader = new Readable().wrap(oreader);
     
    myReader.on('readable', () => {
      myReader.read(); // etc.
    });

    1.3.2.4.17   readable.destroy([error])

    v8.0.0加入

    摧毁流,并且触发`error`事件。之后,可读流会释放内部资源。实现者不应该覆盖这个方法,而是应该实现readable._destory。

    1.3.3 Duplex and Transform Streams

    1.3.3.1  Class: stream.Duplex

    Duplex 流实现了Readable和Writable接口。

    Duplex流包括:

    1.3.3.2      Class: stream.Transform

    v0.9.4加入

    Transform流也是Duplex流,只是它的输出与输入存在关联。像所有的Duplex流一样,Transform流实现了Readable和Writable接口。

    Transform流包括:

    1.3.3.2.1  transform.destroy([error])

    v8.0.0加入

    摧毁流,并且触发`error`事件。之后,transform流会释放内部资源。实现者不应该覆盖这个方法,而应该实现readable._destory。默认的_destory实现会触发`close`事件。

    1.4    API for Stream Implementers

    stream模式的API的设计使得使用Javscript原型继承流变得简单。

    首先,开发者应该声明一个新的JavaScript类,继承自四种基本流类(Writable, Readable, Duplex, Transform),并且确保调用父类的构造函数:

    const { Writable } = require('stream');
     
    class MyWritable extends Writable {
      constructor(options) {
        super(options);
        // ...
      }
    }

    新的流类必须实现一个或者多个特定的方法,实现什么方法取决于创建的流,像下面的图表所示:

    注意:实现代码最好不要调用公共的方法。以免在消费流的时候导致不利的副作用。

    1.4.1  简化的构造器

    v1.2.0加入

    在一些简单的情况下,不必通过继承来构造一个流。在创建流对象的时候传递合适的方法作为参数了也是可行的:

    const { Writable } = require('stream');
     
    const myWritable = new Writable({
      write(chunk, encoding, callback) {
        // ...
      }
    });

    1.4.2  实现Writable Stream

    stream.Writable类旨在实现一个Writable流。

    自定义一个Writable stream必须调用 new stream.Writable([options])构造器,并且实现writable._write()方法。writable._writev()方法可选择实现。

    1.4.2.1  构造器: new stream.Writable([options])

    options <Object>

    • highWaterMark <number> 默认16kb,对于对象模式是16.
    • decodeStrings <boolean> 在传递到_write()方法前,是否把字符串解码进Buffer。默认是true.
    • objectMode <boolean> 决定stream.write(anyObj)方法是否有效。如果设置了,则应该写入JavaScript值,而不是string,Buffer或Unit8Array。默认是false.
    • write <Function> stream._write()方法的实现.
    • writev <Function>  stream._writev()方法的实现.
    • destroy <Function>  stream._destory()方法的实现.
    • final <Function>  stream._final()方法的实现.

    比如:

    const { Writable } = require('stream');
     
    class MyWritable extends Writable {
      constructor(options) {
        // Calls the stream.Writable() constructor
        super(options);
        // ...
      }
    }

    或者使用es6之前的构造器:

    const { Writable } = require('stream');
    const util = require('util');
     
    function MyWritable(options) {
      if (!(this instanceof MyWritable))
        return new MyWritable(options);
      Writable.call(this, options);
    }
    util.inherits(MyWritable, Writable);

    或者使用简化的构造器:

    const { Writable } = require('stream');
     
    const myWritable = new Writable({
      write(chunk, encoding, callback) {
        // ...
      },
      writev(chunks, callback) {
        // ...
      }
    });

    1.4.2.2      writable._write(chunk, encoding, callback)

    • chunk <Buffer> | <string> | <any> 要写入的chunk数据。如果decodeStrings设置为true,chunk是Buffer对象;如果是false,chunk不是Buffer。如果是对象模式,也不是Buffer。
    • encoding <string> 如果chunk是string,那么会使用encoding编码chunk。如果chunk是个buffer或者对象模式流,encoding值会被忽略。
    • callback <Function> 当提供的chunk被完全处理时,回调这个函数。

    所有实现Writable流的类都必须提供writable._write()方法,向底层资源发送数据。

    注意:Transform流有自己的writable._write()实现。

    注意:这个方法一定不能由开发者直接调用。应该由子类实现,再由内部Writable类自行调用。

    callback方法用来获取此次写入是否成功或者失败。传递的第一个参数是一个error对象,如果成功,error是null,如果失败,error是一个对象。

    在writable._write()方法和callback方法调用间隙,调用writable.write()将导致写入的数据被buffer。一旦callback方法完成,`drain`事件将触发。如果一个流能够同时处理多个chunk数据,那么应该实现writable._writev()方法。

    如果在构造函数中设置了decodeStrings,chunk是一个string而不是Buffer,encoding选项将表明string的编码。这是为了支持特定的字符串编码。如果decodeStrings被显示地设置为false,encodeing参数会被忽略,并且chunk将保持不变,进而传递给.write()。

    writable._write()方法是下划线开头的,因为这是一个内部方法,不应该在用户代码里直接调用。

    1.4.2.3      writable._writev(chunks, callback)

    • chunks <Array> 要写入的chunks,每个chunk都有这样的格式:{ chunk: …, encoding: …}.
    • callback <Function> 当处理完传入的chunks,回调.

    注意:不能直接调用。应该由子类实现,再由Writable内部调用。

    writable._writev()方法可能是writable._write()方法的另一种实现。旨在处理多个chunk数据。如果实现了,将随着所有buffer在写队列里的数据,调用这个方法。

    writable._writev()方法是下划线开头的,因为这是一个内部方法,不应该在用户代码里直接调用。

    1.4.2.4      writable._destroy(err, callback)

    v8.0.0加入

    • err <Error> 可能的错误.
    • callback <Function> 回调函数,接收一个可选的error参数.

    _destory()方法由writable.destory()方法调用。应该由子类覆盖,但千万不能直接调用。

    1.4.2.5      writable._final(callback)

    v8.0.0加入

    • callback <Function> 当结束写入剩余的数据,调用这个方法,传入可选的error参数。

    _final()方法不能直接调用。应该由子类覆盖,再由内部自行调用。

    在流结束前,可选的函数参数将会被调用,callback调用后,`finish`事件才会触发。这对于在流结束前,关闭资源和写buffered的数据很有用。

    1.4.2.6       写数据时出错误怎么办

    在writable._write()和writable._writev()方法出现错误时,推荐通过回调报告错误。如此,将导致`error`事件触发。而在writable._write()方法里抛出一个异常将导致意想不到的秘不一致的行为。使用回调确保处理错误时一致。

    const { Writable } = require('stream');
     
    const myWritable = new Writable({
      write(chunk, encoding, callback) {
        if (chunk.toString().indexOf('a') >= 0) {
          callback(new Error('chunk is invalid'));
        } else {
          callback();
        }
      }
    });

    1.4.2.7       Writable Stream的一个例子

    下面简单的自定义了一个Writable流。尽管这个特定的Writable流没有什么实际用处,但却描绘了自定义一个Writable流所需要的每个方面:

    const { Writable } = require('stream');
     
    class MyWritable extends Writable {
      constructor(options) {
        super(options);
        // ...
      }
     
      _write(chunk, encoding, callback) {
        if (chunk.toString().indexOf('a') >= 0) {
          callback(new Error('chunk is invalid'));
        } else {
          callback();
        }
      }
    }

    1.4.2.8       在 Writable Stream解析buffer

    解码buffer是一件很普遍的事,比如说,当使用输入是string的Transform流时。当处理多字节字符串编码时(像utf-8),这是有意义的。下面的例子展示了如何解码多字节字符,使用StringDecoder和Writable。

    const { Writable } = require('stream');
    const { StringDecoder } = require('string_decoder');
     
    class StringWritable extends Writable {
      constructor(options) {
        super(options);
        const state = this._writableState;
        this._decoder = new StringDecoder(state.defaultEncoding);
        this.data = '';
      }
      _write(chunk, encoding, callback) {
        if (encoding === 'buffer') {
          chunk = this._decoder.write(chunk);
        }
        this.data += chunk;
        callback();
      }
      _final(callback) {
        this.data += this._decoder.end();
        callback();
      }
    }
     
    const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
    const w = new StringWritable();
     
    w.write('currency: ');
    w.write(euro[0]);
    w.end(euro[1]);
     
    console.log(w.data); // currency: €

    1.4.3  实现一个Readable Stream

    stream.Readable类用来实现Readable stream。

    自定义的Readable stream必须调用new stream.Readable([options])构造函数,并且实现readable._read()方法。

    1.4.3.1      new stream.Readable([options])

    options <Object>

    • highWaterMark <number> 在停止从底层资源读取前,存储在内部buffer里的最大字节数。默认16kb,对象模式是16。
    • encoding <string> 如果指定了,buffer将被解码成特定的编码格式,默认null。
    • objectMode <boolean> 是否是对象模式,意思着stream.read(n)将返回单一的一个值,而不是大小为n的buffer对象。默认false。
    • read <Function> stream._read()方法的实现.
    • destroy <Function> stream._destory()方法的实现.

    例子:

    const { Readable } = require('stream');
     
    class MyReadable extends Readable {
      constructor(options) {
        // Calls the stream.Readable(options) constructor
        super(options);
        // ...
      }
    }

    使用es6之前的代码风格:

    const { Readable } = require('stream');
    const util = require('util');
     
    function MyReadable(options) {
      if (!(this instanceof MyReadable))
        return new MyReadable(options);
      Readable.call(this, options);
    }
    util.inherits(MyReadable, Readable);

    或者使用简化的构造器:

    const { Readable } = require('stream');
     
    const myReadable = new Readable({
      read(size) {
        // ...
      }
    });

    1.4.3.2      readable._read(size)

    • size <number> 要异步读取的字节数。

    注意:这个方法千万不能由应用代码直接调用。应该由子类覆盖,再由内部的Readable类调用。

    所有实现Redable流的类都必须提供readable._read()方法的实现,来向底层资源获取数据。

    当readable._read()被调用,如果底层的数据可用时,开发者应该使用this.push(dataChunk)方法, 把数据push进read queue里面。_read()方法应该继续从底层获取可用数据,并push数据,直到readable.push()返回false。一旦在停止之后,_read()方法再次被调用,就应该push额外的数据到内部read queue。

    注意:一旦readable._read()方法被调用了,将不会再调用,直到readable.push方法被调用。

    size参数只是个参考。在某些实现中,read是一个单独的操作,size参数用来决定读取多少数据。而一些实现中可能会忽略这个参数,不管是否可用,都只是提供数据,没有必要等到所有的字节数都可用之后,再调用stream.push(chunk)。

    _read()下划线开头,表示这是一个内部方法,不应该直接由用户调用。

    1.4.3.3      readable._destroy(err, callback)

    v8.0.0加入

    • err <Error> 一个可能的错误.
    • callback <Function> 回调,接收一个可选的error参数.

    _destory()由readable.destory()调用。可由子类覆盖,但千万不能直接调用。

    1.4.3.4      readable.push(chunk[, encoding])

    • chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> 要push进read queue的chunk数据。非对象模式的流,chunk是string,Buffer,或Unit8Array。对象模式,chunk可能是任意JavaScript值。
    • encoding <string> chunk的编码,必须是一个合法的Buffer编码,比如’utf8’或者’ascii’。
    • Returns: <boolean> 如果可以继续push,返回true;否则返回false.

    当chunk是一个Buffer,Uint8Array或者string时,chunk数据将会被添加到内部队列,等待被消费。传递chunk为null值表明已经到流的结束了,不再有数据写入。

    当Readable操作在暂停模式,由readable.push()进的数据可以通过readable.read()方法读出来,在`redable`事件触发后。

    当Redable操作在流动模式,由readable.push()进的数据将由`data`事件传递。

    readable.push()主法被设计得尽可能的灵活。比如说,存在一个低级的源,这个源提供了暂停/恢复机制和数据回调,这个低级源能被Redable实例包裹:

    // source is an object with readStop() and readStart() methods,
    // and an `ondata` member that gets called when it has data, and
    // an `onend` member that gets called when the data is over.
     
    class SourceWrapper extends Readable {
      constructor(options) {
        super(options);
     
        this._source = getLowlevelSourceObject();
     
        // Every time there's data, push it into the internal buffer.
        this._source.ondata = (chunk) => {
          // if push() returns false, then stop reading from source
          if (!this.push(chunk))
            this._source.readStop();
        };
     
        // When the source ends, push the EOF-signaling `null` chunk
        this._source.onend = () => {
          this.push(null);
        };
      }
      // _read will be called when the stream wants to pull more data in
      // the advisory size argument is ignored in this case.
      _read(size) {
        this._source.readStart();
      }
    }

    注意:readable.push()方汉应该由Readable实例者调用,并且只能在readable._read()方法里调用。

    1.4.3.5       Reading时错误怎么办

    readable._read()方法在处理的时候发生错误时,推荐使用`error`事件,而不是抛出错误。在readable._read()方法里抛出错误可能导致意想不到和不一致的行为。使用`errro`事件确保了错误处理的一致性和可预期性。

    const { Readable } = require('stream');
     
    const myReadable = new Readable({
      read(size) {
        if (checkSomeErrorCondition()) {
          process.nextTick(() => this.emit('error', err));
          return;
        }
        // do some work
      }
    });

    1.4.3.6       An Example Counting Stream

    下面是个一个基本的Readable流的例子,这个流产生1到1000000的数字,然后结束。

    const { Readable } = require('stream');
     
    class Counter extends Readable {
      constructor(opt) {
        super(opt);
        this._max = 1000000;
        this._index = 1;
      }
     
      _read() {
        const i = this._index++;
        if (i > this._max)
          this.push(null);
        else {
          const str = '' + i;
          const buf = Buffer.from(str, 'ascii');
          this.push(buf);
        }
      }
    }

    1.4.4  实现一个Duplex Stream

    Duplex流是都实现了Readable和Writable的流,比如说TCP socket连接。

    因为JavaScript不支持多继承,stream.Duplex类实现了Duplex流。

    注意:stream.Duplex类在原型链上继承了stream.Readable,并且寄生自stream.Writable,但使用instanceof操作符对两个类都会起作用,因为覆盖了stream.Writable的Symbol.hasInstance。

    自定义的Duplex流必须调用new stream.Duplex([options])构造器,并且都要实现readable._read()和writable._write()方法。

    1.4.4.1      new stream.Duplex(options)

    options <Object> 会传递给Readable和Writable的构造器,有如下字段:

    • allowHalfOpen <boolean> 默认true,如果设置为false,当可读端结束时,可写端会自动结束.
    • readableObjectMode <boolean> 默认false,为可读端设置对象模式,如果objectMode是true,则无影响.
    • writableObjectMode <boolean> 默认false,为可写端设置对象模式,如果objectMode是true,则无影响.
    • readableHighWaterMark <number> 设置可读端的highWaterMark值,如果highWaterMark提供了,则无效.
    • writableHighWaterMark <number> 设置可写端的highWaterMark值,如果highWaterMark提供了,则无效.

    比如:

    const { Duplex } = require('stream');
     
    class MyDuplex extends Duplex {
      constructor(options) {
        super(options);
        // ...
      }
    }

    或者使用es6之前的格式:

    const { Duplex } = require('stream');
    const util = require('util');
     
    function MyDuplex(options) {
      if (!(this instanceof MyDuplex))
        return new MyDuplex(options);
      Duplex.call(this, options);
    }
    util.inherits(MyDuplex, Duplex);

    或者使用简单构造器:

    const { Duplex } = require('stream');
     
    const myDuplex = new Duplex({
      read(size) {
        // ...
      },
      write(chunk, encoding, callback) {
        // ...
      }
    });

     

    1.4.4.2       Duplex Stream的一个例子

    下面是一个简单的Duplex的例子,Duplex流包裹一个假想的底层源对象,数据会向这个底层源对象写入数据,并且也能从这个对象读数据,虽然使用的是与Nodejs流不兼容的API。下面的例子中,Duplex流通过Writable接口buffer到来的即将被写入的数据,再通过Readable接口读出。

    const { Duplex } = require('stream');
    const kSource = Symbol('source');
     
    class MyDuplex extends Duplex {
      constructor(source, options) {
        super(options);
        this[kSource] = source;
      }
     
      _write(chunk, encoding, callback) {
        // The underlying source only deals with strings
        if (Buffer.isBuffer(chunk))
          chunk = chunk.toString();
        this[kSource].writeSomeData(chunk);
        callback();
      }
     
      _read(size) {
        this[kSource].fetchSomeData(size, (data, encoding) => {
          this.push(Buffer.from(data, encoding));
        });
      }
    }
     

    Duplex流最重要的部分是Readable和Writable都是独立运作,尽管在单一的实例中,他们互相存在。

    1.4.4.3       对象模式的Duplex Streams

    对于Duplex流,对象模式能专门为Readable和Writable端设置对象模式,使用readableObjectMode和writableOjbectMode选项。

    在下面的例子中,创建了一个新的Transform流,在Writable端使用了对象模式接收JavaScript数字,随便在Readable端转化为16进制字符串。

    // All Transform streams are also Duplex Streams
    const myTransform = new Transform({
      writableObjectMode: true,
     
      transform(chunk, encoding, callback) {
        // Coerce the chunk to a number if necessary
        chunk |= 0;
     
        // Transform the chunk into something else.
        const data = chunk.toString(16);
     
        // Push the data onto the readable queue.
        callback(null, '0'.repeat(data.length % 2) + data);
      }
    });
     
    myTransform.setEncoding('ascii');
    myTransform.on('data', (chunk) => console.log(chunk));
     
    myTransform.write(1);
    // Prints: 01
    myTransform.write(10);
    // Prints: 0a
    myTransform.write(100);
    // Prints: 64
    
    

    1.4.5   实现一个 Transform Stream

    Transform流是一个Duplex流,这个流的输出能根据输入计算。像zlib流或者crypto流的压缩,解密或者加密数据。

    注意:没有要求输出的大小应该等于输入的大小,同等数据的chunk大小,或者到达的时间。比如说,当输入结束时,一个Hash流只会有一个单一的输出chunk。一个zlib流会产生输出,这个输出比可能更小,也可能更大。

    也就是说,Transform的输入输出不用对等,因为可以修改。

    stream.Transform类实现 了Transform流。

    stream.Transform类在原型上继承自stream.Duplex,并且实现了自己的writable._write()和readable._read()方法。自定义Transform必须实现transform._transform()方法,按需实现transform._flush()方法。

    注意:当使用Transform流时需要格式外小心一点,如果Readable端没有消费的话,Writable端可能会暂停。

    1.4.5.1      new stream.Transform([options])

    options <Object> 传递给Writable和Readable的构造器,同时有以下字段:

    • transform <Function> stream._transform()方法的实现.
    • flush <Function> stream._flush()方法的实现

    例子:

    const { Transform } = require('stream');
     
    class MyTransform extends Transform {
      constructor(options) {
        super(options);
        // ...
      }
    }

    es6之前的风格:

    const { Transform } = require('stream');
    const util = require('util');
     
    function MyTransform(options) {
      if (!(this instanceof MyTransform))
        return new MyTransform(options);
      Transform.call(this, options);
    }
    util.inherits(MyTransform, Transform);

    或者使用简单构造器:

    const { Transform } = require('stream');
     
    const myTransform = new Transform({
      transform(chunk, encoding, callback) {
        // ...
      }
    });

    1.4.5.2       事件: 'finish' and 'end'

    `finish`和`end`事件分别来自stream.Writable和stream.Readable。`finish`事件是stream.end()调用后触发,并且所有的chunk都由stream._transform()函数处理了。`end`事件 是所有的数据都输出后触发,即在transform._flush()调用后,回调。

    1.4.5.3      transform._flush(callback)

    • callback <Function> 回调函数,当剩余的数据都被flush后,回调该函数,传入error参数。

    注意:这个函数千万不能直接调用。应该由子类实现,在内部Readable类调用。

    在某些情况下,一个变换的操作在流结束时,可能需要触发一些额外的数据。比如,zlib压缩流会存储大量的内部状态,以便最好地压缩输出,当这个流结束时,这些额外的数据应该被flush,以便压缩数据得以完成。

    自定义Transform可以按需实现readable._flush()方法,在没有更多的被写入的数据消费时,这个方法会被调用,在`end`事件触发前。

    在transform._flush()实现内部,readable.push()方法可能被零次或多次调用,这得视情况而定。当flush操作完成,则调用回调。

    transform._flush()方法是下划线开头的,表明这是一个内部函数,不应该直接调用。

    1.4.5.4      transform._transform(chunk, encoding, callback)

    • chunk <Buffer> | <string> | <any> 需要被转换的chunk。如果decodeStrings选项设置成false或者流处于对象模式,将会是一个Buffer.
    • encoding <string> 如果chunk是string,这将是编码类型。如果chunk是buffer,这是一个特定的值’buffer’。
    • callback <Function> 回调函数,当chunk被处理后回调,传入error参数。

    注意:这个函数千万不能直接调用。应该由子类实现,再由内部Readable调用。

    所有的Transform流必须提供_transform()方法来接收输出,产生输出。transform._transform()处理写入的数据,计算得到输出,然后使用readable.push()方法传递输出到Readable端。

    transform.push()方法可能被多次调用,来从单一的输入chunk生成输出。

    从任何输入的chunk数据,可能不再产生输出。这是有可能。

    只有在当前chunk被完全消费后,callback函数才必须被调用。第一个传递到callback的是eror对象,如果没有有错误产生,则是null。如果第传递了第二个参数,将会被转发到readable.push()方法,换句话说,下面是等价的:

    transform.prototype._transform = function(data, encoding, callback) {
      this.push(data);
      callback();
    };
     
    transform.prototype._transform = function(data, encoding, callback) {
      callback(null, data);
    };

    transform._transform()方法是下划线开头的,因为这是一个内部函数,不应该直接调用。

    transform._transform()永远不会并行调用,流的内部实现是队列机制,为了接收下一个chunk,callback必须被调用,要么同步要么异步。

    1.4.5.5      Class: stream.PassThrough

    stream.PassThrough类是Transform流的一个实现,这个类没有什么实际意义,只是简单的把输入传给输出。它主要作为例子和测试出现.

    1.5     附加说明

    1.5.1   与旧Nodejs版本的兼容

    在v0.10之前的版本,Readable流接口更简单,但同时功能不够强大,也不够用。

    • 相比于等待调用stream.read()方法,`data`事件会马上触发。如果一个应该需要花费一些工作来决定如何处理数据,那么,应用需要存储读取的数据到buffer,以便数据不会丢失。
    • stream.pause()方法只是参考,不能保证。这意味着仍然有必要接收`data`事件,甚至当流处于暂停状态时。

    在v0.10.0版本,增加了Readable类。为了向后兼容兼容旧的Nodejs版本,当添加 了`data`事件或者调用stream.resume()方法后,Readable流转换为流动模式。这样的影响就是,即使不使用新的stream.read()方法和`readable`事件,也不用担心丢失数据chunk。

    尽管大部分应用都能正常运行,还是存在一边缘案例,如下所述:

    • 没有添加`data`事件监听器.
    • 从没调用过stream.resume().
    • stream没有被pipe到任何目的.

    比如说,考虑如下的代码:

    // WARNING!  BROKEN!
    net.createServer((socket) => {
     
      // we add an 'end' method, but never consume the data
      socket.on('end', () => {
        // It will never get here.
        socket.end('The message was received but was not processed.
    ');
      });
     
    }).listen(1337);

    在早于v.10版本的Nodejs,到来的消息被简单的丢弃。然后在v0.10版本及后面的版本,scoket永远保持暂停状态。这种情况下,暂时的解决的办法是调用stream.resume()方法:

    // Workaround
    net.createServer((socket) => {
     
      socket.on('end', () => {
        socket.end('The message was received but was not processed.
    ');
      });
     
      // start the flow of data, discarding it.
      socket.resume();
     
    }).listen(1337);

    除了新的Readable流可以转换到流动模式外,v0.10以前的流能使用readable.wrap()方法包裹进Readable类中。

    1.5.2 readable.read(0)

    有这样一些情况:需要对底层Readable流机制触发一次刷新,同时不消费任何数据。这时就可以调用readable.read(0),这个方法始终返回null。

    如果内部的reader buffer小于highWaterMark,并且流没有在reading,调用stream.read(0)将触发底层stream._read()调用。

    尽管大部分应用都不需要这么做,但在Nodejs内部有一些情况要这么做,尤其是在Readable流内部。

    1.5.3 readable.push('')

    不推荐使用。

    push一个零字节字符串,Buffer或者Unit8Array到一个不是对象模式的流中会有一个很有意思的副作用。因为是调用readable.push(),这个调用会结束reading进程。但因为这是一个空字符串,没有数据添加到Readable buffer里,所以没有可以消费的东西。

    1.5.4 highWaterMarkreadable.setEncoding()

    readable.setEncoding()会改变处于非对象模式流的highWaterMark的运作行为.

    典型地,当前buffer的size是比对highWaterMark值测量的,单位是字节。然而,在调用setEncoding()方法后,比较函数将会是以字符数测试。

    在使用latin1或ascii码时,没有什么问题。但建议在处理多字节字符时,考虑这种行为。

  • 相关阅读:
    RapidJSON简介及使用(转)
    唯一ID生成算法剖析(转)
    OpenCV相机标定及距离估计(单目)(转)
    Linux 格式化分区 报错Could not stat --- No such file or directory 和 partprobe 命令
    Linux下chkconfig命令详解
    Linux设置开机服务自动启动
    CentOS 6.3下Samba服务器的安装与配置
    vmware 命令行启动虚拟机
    Linux自动修改IP脚本(手动编写)
    网络高清视频监控传输:如何减少带宽消耗?
  • 原文地址:https://www.cnblogs.com/cool-fire/p/8403567.html
Copyright © 2011-2022 走看看