zoukankan      html  css  js  c++  java
  • kumavis/obj-multiplex

    https://github.com/kumavis/obj-multiplex

    obj-multiplex多路复用

    simple stream multiplexing for objectMode

    其实就是一个多路复用流能够使用name来区分各个子流,以达到一个parent流下其实有多个子流在运行,可以通过多个子流来读入写出数据,效率更高。而且parent流结束了,则所有子流也会被销毁

    usage

    // create multiplexer
    const mux = new ObjMultiplex()
    
    // setup substreams
    const streamA = mux.createStream('hello')
    const streamB = mux.createStream('world')
    
    // pipe over transport (and back)
    mux.pipe(transport).pipe(mux)
    
    // send values over the substreams
    streamA.write({ thisIsAn: 'object' })
    streamA.write(123)
    
    // or pipe together normally
    streamB.pipe(evilAiBrain).pipe(streamB)

     obj-multiplex/index.js

    const { Duplex } = require('readable-stream')
    const endOfStream = require('end-of-stream')//看博客mafintosh/end-of-stream
    const once = require('once')
    const noop = () => {}
    
    const IGNORE_SUBSTREAM = {}
    
    
    class ObjectMultiplex extends Duplex {
    
      constructor(_opts = {}) {
        const opts = Object.assign({}, _opts, {
          objectMode: true,//流可传各类形式数据
        })
        super(opts)//生成这个流
    
        this._substreams = {}
      }
    
      createStream (name) {//就是创建两个流,一个是这个流,另一个是parent是这个流的一个子流,并返回子流
        // validate name
        if (!name) throw new Error('ObjectMultiplex - name must not be empty')//name不能为空
        if (this._substreams[name]) throw new Error('ObjectMultiplex - Substream for name "${name}" already exists')//name不能重复
    
        // create substream
        const substream = new Substream({ parent: this, name: name })
        this._substreams[name] = substream
    
        // listen for parent stream to end
        anyStreamEnd(this, (err) => {//定义当parent流结束,则相应的所有子流也要被销毁
          substream.destroy(err)//substream被destroy,如果出错返回的错误信息即err
        })
    
        return substream
      }
    
      // ignore streams (dont display orphaned data warning)
      ignoreStream (name) {//就是将之前创建的name的子流的内容清空
        // validate name
        if (!name) throw new Error('ObjectMultiplex - name must not be empty')
        if (this._substreams[name]) throw new Error('ObjectMultiplex - Substream for name "${name}" already exists')
        // set
        this._substreams[name] = IGNORE_SUBSTREAM
      }
    
      // stream plumbing
      //下面就是parent流能够做的一系列读写操作
      _read () {}
    
      _write(chunk, encoding, callback) {//当调用 writable.write(chunk) 时,数据会被缓冲在可写流中。
        // parse message,就是当parent流write时,将根据其传入的name来决定该数据是写到哪个子流上的
        const name = chunk.name
        const data = chunk.data
        if (!name) {//name不能为空,否则不知道是哪个子流
          console.warn(`ObjectMultiplex - malformed chunk without name "${chunk}"`)
          return callback()
        }
    
        // get corresponding substream
        const substream = this._substreams[name]//然后根据name得到子流
        if (!substream) {//如果为空则warn
          console.warn(`ObjectMultiplex - orphaned data for stream "${name}"`)
          return callback()
        }
    
        // push data into substream
        if (substream !== IGNORE_SUBSTREAM) {//只有当子流不为{}时,才将data压入
          substream.push(data)//当调用 stream.push(chunk) 时,数据会被缓冲在可读流中。 如果流的消费者没有调用 stream.read(),则数据会保留在内部队列中直到被消费
        }
    
        callback()
      }//_write
    
    }//class
    
    
    class Substream extends Duplex {
    
      constructor ({ parent, name }) {
        super({
          objectMode: true,
        })
    
        this._parent = parent
        this._name = name
      }
    
      _read () {}//读入的操作即Duplex的定义
    
      _write (chunk, enc, callback) {//当子流被写入时,其实是将数据压入流parent中
        this._parent.push({
          name: this._name,
          data: chunk,
        })
        callback()//然后调用回调函数
      }
    
    }
    
    module.exports = ObjectMultiplex
    
    // util
    
    function anyStreamEnd(stream, _cb) {//就是当stream结束的时候就会调用cb回调函数
      const cb = once(_cb)
      endOfStream(stream, { readable: false }, cb)
      endOfStream(stream, { writable: false }, cb)
    }

    通过测试学习该库的使用:

    obj-multiplex/test/index.js

    const test = require('tape')
    const once = require('once')
    const { PassThrough, Transform } = require('readable-stream')//PassThrough本质也是Transform流,是最简单的Transform流,只是将数据从此处传过
    // a passthrough stream.
    // basically just the most minimal sort of Transform stream.
    // Every written chunk gets output as-is
    const endOfStream = require('end-of-stream') const pump = require('pump') const ObjMultiplex = require('../index.js') test('basic - string', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() bufferToEnd(outStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, ['haay', 'wuurl'], 'results should match') t.end() }) // pass in messages inStream.write('haay') inStream.write('wuurl') // simulate disconnect setTimeout(() => inTransport.destroy()) }) test('basic - obj', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() bufferToEnd(outStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, [{ message: 'haay' }, { message: 'wuurl' }], 'results should match') t.end() }) // pass in messages inStream.write({ message: 'haay' }) inStream.write({ message: 'wuurl' }) // simulate disconnect setTimeout(() => inTransport.destroy()) }) test('roundtrip', (t) => { t.plan(2) const { inTransport, outTransport, inMux, outMux, inStream, outStream, } = basicTestSetup() const doubler = new Transform({ objectMode: true, transform (chunk, end, callback) {//对流内数据进行*2计算 // console.log('doubler!', chunk) const result = chunk * 2 callback(null, result) } }) pump(//即将从outStream处得到的数据进行*2处理后再传回outStream outStream, doubler, outStream ) bufferToEnd(inStream, (err, results) => { t.error(err, 'should not error') t.deepEqual(results, [20, 24], 'results should match') t.end() }) // pass in messages inStream.write(10) inStream.write(12) // simulate disconnect setTimeout(() => outTransport.destroy(), 100) }) // util function basicTestSetup() { // setup multiplex and Transport const inMux = new ObjMultiplex()//定义了两个parent流 const outMux = new ObjMultiplex() const inTransport = new PassThrough({ objectMode: true }) const outTransport = new PassThrough({ objectMode: true }) // setup substreams const inStream = inMux.createStream('hello')//分别在两个parent流中各自定义一个name为hello的子流 const outStream = outMux.createStream('hello') pump(//形成一个pipe流 inMux, inTransport, outMux, outTransport, inMux ) return { inTransport, outTransport, inMux, outMux, inStream, outStream, } } function bufferToEnd(stream, callback) { const results = [] endOfStream(stream, (err) => callback(err, results))//定义了流结束后的回调 stream.on('data', (chunk) => results.push(chunk))//并监听data事件,用于将数据压入流 }


  • 相关阅读:
    模拟光照中的凹凸纹理原理和应用
    Visual Studio 2010 SP1正式开放下载
    同桌的你网工版
    [转载]同桌的你程序员版
    学习计划:SSIS
    基于Java的HTML解析器
    初次使用NHibernate遇到的问题
    .NET下开源CMS系统汇总
    MyEclipse、Tomcat启动项目报错
    VBA 分文件夹 分excel
  • 原文地址:https://www.cnblogs.com/wanghui-garcia/p/9885481.html
Copyright © 2011-2022 走看看