zoukankan      html  css  js  c++  java
  • nodejs stream基础知识

    分类

    nodejs 的 stream 有四种:

    • Readable:可读流
    • Writable: 可写流
    • Duplex:双工流
    • Transform:转换流

    Readable

    // _read方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑。
    // 在_read方法中,通过调用push(data)将数据放入可读流中供下游消耗。
    // 在_read方法中,可以同步调用push(data),也可以异步调用。
    // 当全部数据都生产出来后,必须调用push(null)来结束可读流。
    // 流一旦结束,便不能再调用push(data)添加数据。
    // 创建一个可以读的流,通过这个流对象监听数据
    const stream = require('stream');
    const Readable = stream.Readable;
    
    class MyFileReader extends Readable {
      constructor(options) {
        super(options);
      }
    
      attachDataSource(dataSource) {
        this.dataSource = dataSource;
      }
    
      // 实现 _read 方法
      _read() {
        // 通过 DataSource 传入数据
        // 当 DataSource get 方法返回 null 会触发 Readable 的 end 事件,表示会结束
        this.push(this.dataSource.get());
      }
    }
    
    class DataSource {
      constructor(size = 10) {
        this.size = size;
        this.data = [];
    
        this.fill();
      }
    
      // 生产数据
      fill() {
        for (let index = 0; index < this.size; index++) {
          this.data.push(index.toString());
        }
      }
    
      // 获取数据
      get() {
        if (this.data.length === 0) {
          return null;
        }
    
        return this.data.pop();
      }
    }
    
    const dataSource = new DataSource();
    const myFileReader = new MyFileReader();
    
    myFileReader.attachDataSource(dataSource);
    myFileReader.on('data', (data) => console.log('data: ' + data));
    myFileReader.on('end', () => console.log('end'));
    

    在 _read 方法里,可以调用 push 方法往缓冲池里放入数据,这里写的this.push(this.dataSource.get())方法是一个同步方法,获取完数据后立马再次执行 _read 方法获取下一条数据,如果你在_read 方法里写一个异步push 的数据

     setTimeout(() => {
       this.push(this.dataSource.get());
     });
    

    那么本次 _read 方法执行后,没有数据则不会执行下一次 _read 方法,直到 state 发生了变化,具体的细节可以参看:https://blog.csdn.net/shasharoman/article/details/80251512 这篇文章

    Writable

    // 上游通过调用writable.write(data)将数据写入可写流中。write()方法会调用_write()将data写入底层。
    // 在_write中,当数据成功写入底层后,必须调用next(err)告诉流开始处理下一个数据。
    // next的调用既可以是同步的,也可以是异步的。
    // 上游必须调用writable.end(data)来结束可写流,data是可选的。此后,不能再调用write新增数据。
    // 在end方法调用后,当所有底层的写操作均完成时,会触发finish事件。
    
    const Writable = require('stream').Writable
    
    class ToWritable extends Writable{
      constructor(){
        super();
      }
    
      _write(data, enc, next) {
        // 将流中的数据写入底层
        process.stdout.write(data.toString().toUpperCase());
        // 写入完成时,调用`next()`方法通知流传入下一个数据
        process.nextTick(next);
      }
    }
    
    const writable = new ToWritable();
    
    writable.on('finish', () => process.stdout.write('DONE'));
    
    // 将一个数据写入流中
    writable.write(`a
    `);
    writable.write(`b
    `);
    writable.write(`c
    `);
    
    // 再无数据写入流时,需要调用`end`方法
    writable.end();
    

    Duplex

    // 代码中实现了_read方法,所以可以监听data事件来消耗Duplex产生的数据。
    // 同时,又实现了_write方法,可作为下游去消耗数据。
    
    const Duplex = require('stream').Duplex
    
    class ToDuplexable extends Duplex{
      constructor(){
        super();
        this.limit = 0;
      }
    
      _write(buf, enc, next) {
        this.limit ++ ;
        next();
      }
    
      _read() {
        if(this.limit--){
          this.push('data');
        }
        // this.push(this.index++)
      }
    }
    
    const toDuplexable = new ToDuplexable();
    
    toDuplexable.on('data', data => process.stdout.write(data));
    toDuplexable.on('end', () => process.stdout.write('DONE'));
    
    toDuplexable.write('');
    
    setTimeout(() => {
      toDuplexable.write('');
      toDuplexable.end();
    }, 1000);
    

    Transform

    // Duplex 可读流中的数据(0, 1)与可写流中的数据('a', 'b')是隔离开的,但在Transform中可写端写入的数据经变换后会自动添加到可读端
    //Tranform继承自Duplex,并已经实现了_read和_write方法,同时要求用户实现一个_transform方法。
    
    const Transform = require('stream').Transform
    
    class MyTransform extends Transform{
      constructor() {
        super()
      }
    
      _transform(buf, enc, next){
        let res = buf.toString('utf8');
    
        // 调用push方法将变换后的数据添加到可读端
        this.push(`${res}_transform
    `);
        // 调用next方法准备处理下一个
        next();
      }
    }
    
    var myTransform = new MyTransform();
    
    myTransform.on('data', data => process.stdout.write(data));
    
    myTransform.write('abc');
    myTransform.write('def');
    myTransform.end();
    

    参考:

  • 相关阅读:
    〖教程〗Ladon提权MS16-135参数版(WIN7-2016)
    〖教程〗RDP会话劫持 Ladon无密码登陆管理员桌面会话
    Ladon for PowerShell远程加载教程
    〖教程〗NbtScan 139端口弱口令/Netbios密码爆破
    给你一个免费加入"小密圈"的机会
    活动目录(Active Directory,AD)的主要功能
    如何通过审计安全事件日志检测密码喷洒(Password Spraying)攻击
    mouseenter 和mouseover的区别
    如何获取可视区域宽高,获取元素到在文档中的位置
    闭包
  • 原文地址:https://www.cnblogs.com/xiaoniuzai/p/7240017.html
Copyright © 2011-2022 走看看