zoukankan      html  css  js  c++  java
  • 巧妙复制一个流

    场景

    实际业务中可能出现重复消费一个可读流的情况,比如在前置过滤器解析请求体,拿到body进行相关权限及身份认证;认证通过后框架或者后置过滤器再次解析请求体传递给业务上下文。因此,重复消费同一个流的需求并不奇葩,这类似于js上下文中通过 deep clone一个对象来操作这个对象副本,防止源数据被污染。

    
    const Koa = require('koa');
    const app = new Koa();
    
    let parse = function(ctx){
        return new Promise((res)=>{
            let chunks = [],len  = 0, body = null;
            ctx.req.on('data',(chunk)=>{
                chunks.push(chunk)
                len += chunk.length
            });
            ctx.req.on('end',()=>{
                body = (Buffer.concat(chunks,len)).toString();
                res(body);
            });
        })
    }
    // 认证
    app.use(async (ctx,next) => {
        let body = JSON.parse(decodeURIComponent(await parse(ctx)));
        if(body.name != 'admin'){
            return ctx.body = 'permission denied!'
        }
        await next();
    })
    // 解析body体,传递给业务层
    app.use(async (ctx,next) => {
        let body = await parse(ctx);
        ctx.postBody = body;
        await next();
    })
    app.use(async ctx => {
      ctx.body = 'Hello World
    ';
      ctx.body += `post body: ${ctx.postBody}`;
    });
    
    app.listen(3000);
    

    上述代码片段无法正常运行,请求无法得到响应。这是因为在前置过滤器的认证逻辑中消费了请求体,在第二级过滤器中就无法再次消费请求体,因此请求会阻塞。实际业务中,认证逻辑往往是与每个公司规范相关的,是一个“二方库”;而示例中的第二季过滤器则通常作为一个三方库存在,因此为了不影响第三方包消费请求体,必须在认证的二方包中保存 ctx.req 这个可读流的数据仍然存在,这就涉及到本文的主旨了。

    实现

    复制流并不像复制一个对象一样简单与直接,流的使用是一次性的,一旦一个可读流被消费(写入一个Writeable对象中),那么这个可读流就是不可再生的,无法再使用。可是通过一些简单的技巧可以再次复原一个可读流,不过这个复原出来的流虽然内容和之前的流相同,但却不是同一个对象了,因此这两个对象的属性及原型都不同,这往往会影响后续的使用,不过办法总是有的,且看下文。

    实现一:可读流的“影分身之术”

    可读流的“影分身之术”和鸣人的差不多,不过仅限于被克隆对象的 这一特性,即保证克隆出的流有着相同的数据。但是克隆出来的流却无法拥有原对象的其他属性,但我们可通过原型链继承的方式实现属性及方法的继承。

    
    let Readable = require('stream').Readable;
    let fs = require('fs');
    let path = require('path');
    
    class NewReadable extends Readable{
        constructor(originReadable){
            super();
            this.originReadable = originReadable;
            this.start();
        }
    
        start() {
            this.originReadable.on('data',(chunck)=>{
                this.push(chunck);
            });
    
            this.originReadable.on('end',()=>{
                this.push(null);
            });
            
            this.originReadable.on('error',(e)=>{
                this.push(e);
            });
        }
    
        // 作为Readable的实现类,必须实现_read函数,否则会throw Error
        _read(){
        }
    }
    
    app.use(async (ctx,next) => {
        let cloneReq = new NewReadable(ctx.req);
        let cloneReq2 = new NewReadable(ctx.req);
        // 此时,ctx.req已被消费完(没有内容),所有的数据都完全在克隆出的两个流上
    
        // 消费cloneReq,获取认证数据
        let body = JSON.parse(decodeURIComponent(await parse({req: cloneReq})));
    
        // 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
        cloneReq2.__proto__ = ctx.req;
        // 此后重新给ctx.req复制,留给后续过滤器消费
        ctx.req = cloneReq2;
    
        if(body.name != 'admin'){
            return ctx.body = 'permission denied!'
        }
        await next();
    })
    

    点评: 这种影分身之术可以同时复制出多个可读流,同时需要针对原来的流重新进行赋值,并继承原有属性,这样才能不影响后续的重复消费。

    实现二:懒人实现

    stream模块有一个特殊的类,即 Transform。关于Transfrom的特性,我曾在 深入node之Transform 一文中详细介绍过,他拥有可读可写流双重特性,那么利用Transfrom可以快速简单的实现克隆。

    首先,通过 pipe 函数将可读流导向两个 Transform流(之所以是两个,是因为需要在前置过滤器消费一个流,后续的过滤器消费第二个)。

    
    let cloneReq = new Transform({
        highWaterMark: 10*1024*1024,
        transform: (chunk,encode,next)=>{
            next(null,chunk);
        }
    });
    let cloneReq2 = new Transform({
        highWaterMark: 10*1024*1024,
        transform: (chunk,encode,next)=>{
            next(null,chunk);
        }
    });
    ctx.req.pipe(cloneReq)
    ctx.req.pipe(cloneReq2)
    

    上述代码中,看似 ctx.req 流被消费(pipe)了两次,实际上 pipe 函数则可以看成 Readable和Writeable实现backpressure的一种“语法糖”实现,具体可通过 node中的Stream-Readable和Writeable解读 了解,因此得到的结果就是“ctx.req被消费了一次,可是数据却复制在cloneReq和cloneReq2这两个Transfrom对象的读缓冲区里,实现了clone”

    其实pipe针对Readable和Writeable做了限流,首先针对Readable的data事件进行侦听,并执行Writeable的write函数,当Writeable的写缓冲区大于一个临界值(highWaterMark),导致write函数返回false(此时意味着Writeable无法匹配Readable的速度,Writeable的写缓冲区已经满了),此时,pipe修改了Readable模式,执行pause方法,进入paused模式,停止读取读缓冲区。而同时Writeable开始刷新写缓冲区,刷新完毕后异步触发drain事件,在该事件处理函数中,设置Readable为flowing状态,并继续执行flow函数不停的刷新读缓冲区,这样就完成了pipe限流。需要注意的是,Readable和Writeable各自维护了一个缓冲区,在实现的上有区别:Readable的缓冲区是一个数组,存放Buffer、String和Object类型;而Writeable则是一个有向链表,依次存放需要写入的数据。

    最后,在数据复制的同时,再给其中一个对象复制额外的属性即可:

    
    // 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
    cloneReq2.__proto__ = ctx.req;
    // 此后重新给ctx.req复制,留给后续过滤器消费
    ctx.req = cloneReq2;
    

    至此,通过Transform实现clone已完成。完整的代码如下(最前置过滤器):

    
    // 认证
    app.use(async (ctx,next) => {
        // let cloneReq = new NewReadable(ctx.req);
        // let cloneReq2 = new NewReadable(ctx.req);
        let cloneReq = new Transform({
            highWaterMark: 10*1024*1024,
            transform: (chunk,encode,next)=>{
                next(null,chunk);
            }
        });
        let cloneReq2 = new Transform({
            highWaterMark: 10*1024*1024,
            transform: (chunk,encode,next)=>{
                next(null,chunk);
            }
        });
        ctx.req.pipe(cloneReq)
        ctx.req.pipe(cloneReq2)
        // 此时,ctx.req已被消费完(没有内容),所有的数据都完全在克隆出的两个流上
    
        // 消费cloneReq,获取认证数据
        let body = JSON.parse(decodeURIComponent(await parse({req: cloneReq})));
    
        // 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
        cloneReq2.__proto__ = ctx.req;
        // 此后重新给ctx.req复制,留给后续过滤器消费
        ctx.req = cloneReq2;
    
        if(body.name != 'admin'){
            return ctx.body = 'permission denied!'
        }
        await next();
    })
    

    说明

    1. ctx.req执行两次pipe到对应cloneReq和cloneReq2,然后立即消费cloneReq对象,这样合理吗?如果源数据够大,pipe还未结束就在消费cloneReq,会不会有什么问题?

      其实 pipe函数里面大多是异步操作,即针对 源和目的流做的一些流控措施。目的流使用的是cloneReq对象,该对象在实例化的过程中 transform函数直接通过调用next函数将接受到的数据传入到Transform对象的可读流缓存中,同时触发‘readable和data事件’。这样,我们在下文消费cloneReq对象也是通过“侦听data事件”实现的,因此即使ctx.req的数据仍没有被消费完,下文仍可以正常消费cloneReq对象。数据流仍然可以看做是从ctx.req --> cloneReq --> 消费。

    2. 使用Transform流实现clone 可读流的弊端:

      上例中,Transfrom流的实例化传入了一个参数 highWaterMark,该参数在Transfrom中的作用 在 上文 深入node之Transform 中有过详解,即当Transfrom流的读缓冲大小 < highWaterMark时,Transfrom流就会将接收到的数据存储在读缓冲里,等待消费,同时执行 transfrom函数;否则什么都不做。

      因此,当要clone的源内容大于highWaterMark时,就无法正常使用这种方式进行clone了,因为由于源内容>highWaterMark,在没有后续消费Transfrom流的情况下就不执行transfrom方法(当Transfrom流被消费时,Transfrom流的读缓冲就会变小,当其大小<highWaterMark时,又可以执行transfrom方法继续存储源数据),无法存储源文件内容。

      所以设置一个合理的highWaterMark大小很重要,默认的highWaterMark为 16kB。

    来源:https://segmentfault.com/a/1190000016072883

  • 相关阅读:
    Django-website 程序案例系列-3 URL详解
    Django-website 程序案例系列-1 最简单的web服务器
    c# 多维数组、交错数组(转化为DataTable)
    c++(重载等号=操作为深拷贝)
    c# 导入c++ dll
    Nhibernate HQL 匿名类(严格说是map的使用以及构造函数的使用
    spring.net 集成nhibernate配置文件(这里暴露了GetCurrentSession 对于 CurrentSession unbond thread这里给出了解决方法)
    hibernate mapping文件中 xmlns会导致linq to xml 查询不到对应的节点
    linq to xml
    xml 操作(动态添加 property属性 其他节点同理)
  • 原文地址:https://www.cnblogs.com/datiangou/p/10179810.html
Copyright © 2011-2022 走看看