zoukankan      html  css  js  c++  java
  • Node.js Stream基础篇

    Node.js Stream - 基础篇

    邹斌 ·2016-07-08 11:51

    背景

    在构建较复杂的系统时,通常将其拆解为功能独立的若干部分。这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务。譬如,shell通过管道|连接各部分,其输入输出的规范是文本流。

    Node.js中,内置的Stream模块也实现了类似功能,各部分通过.pipe()连接。

    鉴于目前国内系统性介绍Stream的文章较少,而越来越多的开源工具都使用了Stream,本系列文章将从以下几方面来介绍相关内容:

    1. 流的基本类型,以及Stream模块的基本使用方法
    2. 流式处理与back pressure的工作原理
    3. 如何开发流式程序,包括对GulpBrowserify的剖析,以及一个实战示例。

    本文为系列文章的第一篇。

    流的四种类型

    Stream提供了以下四种类型的流:

    var Stream = require('stream')
    
    var Readable = Stream.Readable
    var Writable = Stream.Writable
    var Duplex = Stream.Duplex
    var Transform = Stream.Transform

    使用Stream可实现数据的流式处理,如:

    var fs = require('fs')
    // `fs.createReadStream`创建一个`Readable`对象以读取`bigFile`的内容,并输出到标准输出
    // 如果使用`fs.readFile`则可能由于文件过大而失败
    fs.createReadStream(bigFile).pipe(process.stdout)

    Readable

    创建可读流。

    实例:流式消耗迭代器中的数据。

    'use strict'
    const Readable = require('stream').Readable
    
    class ToReadable extends Readable {
      constructor(iterator) {
        super()
        this.iterator = iterator
      }
    
      // 子类需要实现该方法
      // 这是生产数据的逻辑
      _read() {
        const res = this.iterator.next()
        if (res.done) {
          // 数据源已枯竭,调用`push(null)`通知流
          return this.push(null)
        }
        setTimeout(() => {
          // 通过`push`方法将数据添加到流中
          this.push(res.value + '\n')
        }, 0)
      }
    }
    
    module.exports = ToReadable

    实际使用时,new ToReadable(iterator)会返回一个可读流,下游可以流式的消耗迭代器中的数据。

    const iterator = function (limit) {
      return {
        next: function () {
          if (limit--) {
            return { done: false, value: limit + Math.random() }
          }
          return { done: true }
        }
      }
    }(1e10)
    
    const readable = new ToReadable(iterator)
    
    // 监听`data`事件,一次获取一个数据
    readable.on('data', data => process.stdout.write(data))
    
    // 所有数据均已读完
    readable.on('end', () => process.stdout.write('DONE'))

    执行上述代码,将会有100亿个随机数源源不断地写进标准输出流。

    创建可读流时,需要继承Readable,并实现_read方法。

    • _read方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑。
    • _read方法中,通过调用push(data)将数据放入可读流中供下游消耗。
    • _read方法中,可以同步调用push(data),也可以异步调用。
    • 当全部数据都生产出来后,必须调用push(null)来结束可读流。
    • 流一旦结束,便不能再调用push(data)添加数据。

    可以通过监听data事件的方式消耗可读流。

    • 在首次监听其data事件后,readable便会持续不断地调用_read(),通过触发data事件将数据输出。
    • 第一次data事件会在下一个tick中触发,所以,可以安全地将数据输出前的逻辑放在事件监听后(同一个tick中)。
    • 当数据全部被消耗时,会触发end事件。

    上面的例子中,process.stdout代表标准输出流,实际是一个可写流。下小节中介绍可写流的用法。

    Writable

    创建可写流。

    前面通过继承的方式去创建一类可读流,这种方法也适用于创建一类可写流,只是需要实现的是_write(data, enc, next)方法,而不是_read()方法。

    有些简单的情况下不需要创建一类流,而只是一个流对象,可以用如下方式去做:

    const Writable = require('stream').Writable
    
    const writable = Writable()
    // 实现`_write`方法
    // 这是将数据写入底层的逻辑
    writable._write = function (data, enc, next) {
      // 将流中的数据写入底层
      process.stdout.write(data.toString().toUpperCase())
      // 写入完成时,调用`next()`方法通知流传入下一个数据
      process.nextTick(next)
    }
    
    // 所有数据均已写入底层
    writable.on('finish', () => process.stdout.write('DONE'))
    
    // 将一个数据写入流中
    writable.write('a' + '\n')
    writable.write('b' + '\n')
    writable.write('c' + '\n')
    
    // 再无数据写入流时,需要调用`end`方法
    writable.end()
    • 上游通过调用writable.write(data)将数据写入可写流中。write()方法会调用_write()data写入底层。
    • _write中,当数据成功写入底层后,必须调用next(err)告诉流开始处理下一个数据。
    • next的调用既可以是同步的,也可以是异步的。
    • 上游必须调用writable.end(data)来结束可写流,data是可选的。此后,不能再调用write新增数据。
    • end方法调用后,当所有底层的写操作均完成时,会触发finish事件。

    Duplex

    创建可读可写流。

    Duplex实际上就是继承了ReadableWritable的一类流。
    所以,一个Duplex对象既可当成可读流来使用(需要实现_read方法),也可当成可写流来使用(需要实现_write方法)。

    var Duplex = require('stream').Duplex
    
    var duplex = Duplex()
    
    // 可读端底层读取逻辑
    duplex._read = function () {
      this._readNum = this._readNum || 0
      if (this._readNum > 1) {
        this.push(null)
      } else {
        this.push('' + (this._readNum++))
      }
    }
    
    // 可写端底层写逻辑
    duplex._write = function (buf, enc, next) {
      // a, b
      process.stdout.write('_write ' + buf.toString() + '\n')
      next()
    }
    
    // 0, 1
    duplex.on('data', data => console.log('ondata', data.toString()))
    
    duplex.write('a')
    duplex.write('b')
    
    duplex.end()

    上面的代码中实现了_read方法,所以可以监听data事件来消耗Duplex产生的数据。
    同时,又实现了_write方法,可作为下游去消耗数据。

    因为它既可读又可写,所以称它有两端:可写端和可读端。
    可写端的接口与Writable一致,作为下游来使用;可读端的接口与Readable一致,作为上游来使用。

    Transform

    在上面的例子中,可读流中的数据(0, 1)与可写流中的数据('a', 'b')是隔离开的,但在Transform中可写端写入的数据经变换后会自动添加到可读端。
    Tranform继承自Duplex,并已经实现了_read_write方法,同时要求用户实现一个_transform方法。

    'use strict'
    
    const Transform = require('stream').Transform
    
    class Rotate extends Transform {
      constructor(n) {
        super()
        // 将字母旋转`n`个位置
        this.offset = (n || 13) % 26
      }
    
      // 将可写端写入的数据变换后添加到可读端
      _transform(buf, enc, next) {
        var res = buf.toString().split('').map(c => {
          var code = c.charCodeAt(0)
          if (c >= 'a' && c <= 'z') {
            code += this.offset
            if (code > 'z'.charCodeAt(0)) {
              code -= 26
            }
          } else if (c >= 'A' && c <= 'Z') {
            code += this.offset
            if (code > 'Z'.charCodeAt(0)) {
              code -= 26
            }
          }
          return String.fromCharCode(code)
        }).join('')
    
        // 调用push方法将变换后的数据添加到可读端
        this.push(res)
        // 调用next方法准备处理下一个
        next()
      }
    
    }
    
    var transform = new Rotate(3)
    transform.on('data', data => process.stdout.write(data))
    transform.write('hello, ')
    transform.write('world!')
    transform.end()
    
    // khoor, zruog!

    objectMode

    前面几节的例子中,经常看到调用data.toString()。这个toString()的调用是必需的吗?
    本节介绍完如何控制流中的数据类型后,自然就有了答案。

    在shell中,用管道(|)连接上下游。上游输出的是文本流(标准输出流),下游输入的也是文本流(标准输入流)。在本文介绍的流中,默认也是如此。

    对于可读流来说,push(data)时,data只能是StringBuffer类型,而消耗时data事件输出的数据都是Buffer类型。对于可写流来说,write(data)时,data只能是StringBuffer类型,_write(data)调用时传进来的data都是Buffer类型。

    也就是说,流中的数据默认情况下都是Buffer类型。产生的数据一放入流中,便转成Buffer被消耗;写入的数据在传给底层写逻辑时,也被转成Buffer类型。

    但每个构造函数都接收一个配置对象,有一个objectMode的选项,一旦设置为true,就能出现“种瓜得瓜,种豆得豆”的效果。

    Readable未设置objectMode时:

    const Readable = require('stream').Readable
    
    const readable = Readable()
    
    readable.push('a')
    readable.push('b')
    readable.push(null)
    
    readable.on('data', data => console.log(data))

    输出:

    <Buffer 61>
    <Buffer 62>

    Readable设置objectMode后:

    const Readable = require('stream').Readable
    
    const readable = Readable({ objectMode: true })
    
    readable.push('a')
    readable.push('b')
    readable.push({})
    readable.push(null)
    
    readable.on('data', data => console.log(data))

    输出:

    a
    b
    {}

    可见,设置objectMode后,push(data)的数据被原样地输出了。此时,可以生产任意类型的数据。

    预告

    Stream系列共三篇文章:

    • 第一部分:基础篇,介绍Stream接口的基本使用。
    • 第二部分:进阶篇,重点剖析Stream底层如何支持流式数据处理,及其back pressure机制。
    • 第三部分:实战篇。介绍如何使用Stream进行程序设计。从BrowserifyGulp总结出两种设计模式,并基于Stream构建一个为Git仓库自动生成changelog的应用作为示例。

    参考文献

  • 相关阅读:
    Row Cache lock Problem
    AIX操作系统上安装Oracle数据库必不可少的几项检查工作
    如何使用MOS风格的代码背景?
    在Ubuntu 10上使用DLink DWA 130无线网卡
    PL/SQL Developer View SQL功能的一个Bug
    11g新特性SQL PLUS 错误日志
    生病了。。。
    ORA00600: [7005], [192]内部错误一例
    Linux:vmware下ubuntu更换网卡后无法识别网卡
    Linux:LFS:第一天:今天开始学习,计划7天时间
  • 原文地址:https://www.cnblogs.com/zapple/p/5759670.html
Copyright © 2011-2022 走看看