zoukankan      html  css  js  c++  java
  • 巧妙复制一个流

    场景

    实际业务中可能出现重复消费一个可读流的情况,比如在前置过滤器解析请求体,拿到body进行相关权限及身份认证;认证通过后框架或者后置过滤器再次解析请求体传递给业务上下文。因此,重复消费同一个流的需求并不奇葩,这类似于js上下文中通过 deep clone一个对象来操作这个对象副本,防止源数据被污染。

    
    const Koa = require('koa');
    const app = new Koa();
    
    let parse = function(ctx){
        return new Promise((res)=>{
            let chunks = [],len  = 0, body = null;
            ctx.req.on('data',(chunk)=>{
                chunks.push(chunk)
                len += chunk.length
            });
            ctx.req.on('end',()=>{
                body = (Buffer.concat(chunks,len)).toString();
                res(body);
            });
        })
    }
    // 认证
    app.use(async (ctx,next) => {
        let body = JSON.parse(decodeURIComponent(await parse(ctx)));
        if(body.name != 'admin'){
            return ctx.body = 'permission denied!'
        }
        await next();
    })
    // 解析body体,传递给业务层
    app.use(async (ctx,next) => {
        let body = await parse(ctx);
        ctx.postBody = body;
        await next();
    })
    app.use(async ctx => {
      ctx.body = 'Hello World
    ';
      ctx.body += `post body: ${ctx.postBody}`;
    });
    
    app.listen(3000);
    

    上述代码片段无法正常运行,请求无法得到响应。这是因为在前置过滤器的认证逻辑中消费了请求体,在第二级过滤器中就无法再次消费请求体,因此请求会阻塞。实际业务中,认证逻辑往往是与每个公司规范相关的,是一个“二方库”;而示例中的第二季过滤器则通常作为一个三方库存在,因此为了不影响第三方包消费请求体,必须在认证的二方包中保存 ctx.req 这个可读流的数据仍然存在,这就涉及到本文的主旨了。

    实现

    复制流并不像复制一个对象一样简单与直接,流的使用是一次性的,一旦一个可读流被消费(写入一个Writeable对象中),那么这个可读流就是不可再生的,无法再使用。可是通过一些简单的技巧可以再次复原一个可读流,不过这个复原出来的流虽然内容和之前的流相同,但却不是同一个对象了,因此这两个对象的属性及原型都不同,这往往会影响后续的使用,不过办法总是有的,且看下文。

    实现一:可读流的“影分身之术”

    可读流的“影分身之术”和鸣人的差不多,不过仅限于被克隆对象的 这一特性,即保证克隆出的流有着相同的数据。但是克隆出来的流却无法拥有原对象的其他属性,但我们可通过原型链继承的方式实现属性及方法的继承。

    
    let Readable = require('stream').Readable;
    let fs = require('fs');
    let path = require('path');
    
    class NewReadable extends Readable{
        constructor(originReadable){
            super();
            this.originReadable = originReadable;
            this.start();
        }
    
        start() {
            this.originReadable.on('data',(chunck)=>{
                this.push(chunck);
            });
    
            this.originReadable.on('end',()=>{
                this.push(null);
            });
            
            this.originReadable.on('error',(e)=>{
                this.push(e);
            });
        }
    
        // 作为Readable的实现类,必须实现_read函数,否则会throw Error
        _read(){
        }
    }
    
    app.use(async (ctx,next) => {
        let cloneReq = new NewReadable(ctx.req);
        let cloneReq2 = new NewReadable(ctx.req);
        // 此时,ctx.req已被消费完(没有内容),所有的数据都完全在克隆出的两个流上
    
        // 消费cloneReq,获取认证数据
        let body = JSON.parse(decodeURIComponent(await parse({req: cloneReq})));
    
        // 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
        cloneReq2.__proto__ = ctx.req;
        // 此后重新给ctx.req复制,留给后续过滤器消费
        ctx.req = cloneReq2;
    
        if(body.name != 'admin'){
            return ctx.body = 'permission denied!'
        }
        await next();
    })
    

    点评: 这种影分身之术可以同时复制出多个可读流,同时需要针对原来的流重新进行赋值,并继承原有属性,这样才能不影响后续的重复消费。

    实现二:懒人实现

    stream模块有一个特殊的类,即 Transform。关于Transfrom的特性,我曾在 深入node之Transform 一文中详细介绍过,他拥有可读可写流双重特性,那么利用Transfrom可以快速简单的实现克隆。

    首先,通过 pipe 函数将可读流导向两个 Transform流(之所以是两个,是因为需要在前置过滤器消费一个流,后续的过滤器消费第二个)。

    
    let cloneReq = new Transform({
        highWaterMark: 10*1024*1024,
        transform: (chunk,encode,next)=>{
            next(null,chunk);
        }
    });
    let cloneReq2 = new Transform({
        highWaterMark: 10*1024*1024,
        transform: (chunk,encode,next)=>{
            next(null,chunk);
        }
    });
    ctx.req.pipe(cloneReq)
    ctx.req.pipe(cloneReq2)
    

    上述代码中,看似 ctx.req 流被消费(pipe)了两次,实际上 pipe 函数则可以看成 Readable和Writeable实现backpressure的一种“语法糖”实现,具体可通过 node中的Stream-Readable和Writeable解读 了解,因此得到的结果就是“ctx.req被消费了一次,可是数据却复制在cloneReq和cloneReq2这两个Transfrom对象的读缓冲区里,实现了clone”

    其实pipe针对Readable和Writeable做了限流,首先针对Readable的data事件进行侦听,并执行Writeable的write函数,当Writeable的写缓冲区大于一个临界值(highWaterMark),导致write函数返回false(此时意味着Writeable无法匹配Readable的速度,Writeable的写缓冲区已经满了),此时,pipe修改了Readable模式,执行pause方法,进入paused模式,停止读取读缓冲区。而同时Writeable开始刷新写缓冲区,刷新完毕后异步触发drain事件,在该事件处理函数中,设置Readable为flowing状态,并继续执行flow函数不停的刷新读缓冲区,这样就完成了pipe限流。需要注意的是,Readable和Writeable各自维护了一个缓冲区,在实现的上有区别:Readable的缓冲区是一个数组,存放Buffer、String和Object类型;而Writeable则是一个有向链表,依次存放需要写入的数据。

    最后,在数据复制的同时,再给其中一个对象复制额外的属性即可:

    
    // 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
    cloneReq2.__proto__ = ctx.req;
    // 此后重新给ctx.req复制,留给后续过滤器消费
    ctx.req = cloneReq2;
    

    至此,通过Transform实现clone已完成。完整的代码如下(最前置过滤器):

    
    // 认证
    app.use(async (ctx,next) => {
        // let cloneReq = new NewReadable(ctx.req);
        // let cloneReq2 = new NewReadable(ctx.req);
        let cloneReq = new Transform({
            highWaterMark: 10*1024*1024,
            transform: (chunk,encode,next)=>{
                next(null,chunk);
            }
        });
        let cloneReq2 = new Transform({
            highWaterMark: 10*1024*1024,
            transform: (chunk,encode,next)=>{
                next(null,chunk);
            }
        });
        ctx.req.pipe(cloneReq)
        ctx.req.pipe(cloneReq2)
        // 此时,ctx.req已被消费完(没有内容),所有的数据都完全在克隆出的两个流上
    
        // 消费cloneReq,获取认证数据
        let body = JSON.parse(decodeURIComponent(await parse({req: cloneReq})));
    
        // 将克隆出的cloneReq2重新设置原型链,继承ctx.req原有属性
        cloneReq2.__proto__ = ctx.req;
        // 此后重新给ctx.req复制,留给后续过滤器消费
        ctx.req = cloneReq2;
    
        if(body.name != 'admin'){
            return ctx.body = 'permission denied!'
        }
        await next();
    })
    

    说明

    1. ctx.req执行两次pipe到对应cloneReq和cloneReq2,然后立即消费cloneReq对象,这样合理吗?如果源数据够大,pipe还未结束就在消费cloneReq,会不会有什么问题?

      其实 pipe函数里面大多是异步操作,即针对 源和目的流做的一些流控措施。目的流使用的是cloneReq对象,该对象在实例化的过程中 transform函数直接通过调用next函数将接受到的数据传入到Transform对象的可读流缓存中,同时触发‘readable和data事件’。这样,我们在下文消费cloneReq对象也是通过“侦听data事件”实现的,因此即使ctx.req的数据仍没有被消费完,下文仍可以正常消费cloneReq对象。数据流仍然可以看做是从ctx.req --> cloneReq --> 消费。

    2. 使用Transform流实现clone 可读流的弊端:

      上例中,Transfrom流的实例化传入了一个参数 highWaterMark,该参数在Transfrom中的作用 在 上文 深入node之Transform 中有过详解,即当Transfrom流的读缓冲大小 < highWaterMark时,Transfrom流就会将接收到的数据存储在读缓冲里,等待消费,同时执行 transfrom函数;否则什么都不做。

      因此,当要clone的源内容大于highWaterMark时,就无法正常使用这种方式进行clone了,因为由于源内容>highWaterMark,在没有后续消费Transfrom流的情况下就不执行transfrom方法(当Transfrom流被消费时,Transfrom流的读缓冲就会变小,当其大小<highWaterMark时,又可以执行transfrom方法继续存储源数据),无法存储源文件内容。

      所以设置一个合理的highWaterMark大小很重要,默认的highWaterMark为 16kB。

    来源:https://segmentfault.com/a/1190000016072883

  • 相关阅读:
    Max History CodeForces
    Buy a Ticket CodeForces
    AC日记——字符串的展开 openjudge 1.7 35
    AC日记——回文子串 openjudge 1.7 34
    AC日记——判断字符串是否为回文 openjudge 1.7 33
    AC日记——行程长度编码 openjudge 1.7 32
    AC日记——字符串P型编码 openjudge 1.7 31
    AC日记——字符环 openjudge 1.7 30
    AC日记——ISBN号码 openjudge 1.7 29
    AC日记——单词倒排 1.7 28
  • 原文地址:https://www.cnblogs.com/datiangou/p/10179810.html
Copyright © 2011-2022 走看看