zoukankan      html  css  js  c++  java
  • 异步编程解决方案

    • 事件发布/订阅模式
    • promise/deferrd模式
    • 流程控制模式

    事件发布/订阅模式

    事件监听器模式是异步回调的事件化,又称发布订阅/模式

    node核心模块events

    方法

    • addListener/on
    • once
    • removeListener
    • removeAllListeners
    • emit

    简单操作

    let events = require('events')
    let request = require('http').request
    let emit = new events.EventEmitter()
    emit.on('message', function (msg) {
        console.log(msg)
    })
    emit.once('message1', function (msg) {
        console.log(msg)
    })
    emit.emit('message','message')
    emit.emit('message','message')
    emit.emit('message1','message1')
    emit.emit('message1','message1')


    此示例中,因为message事件通过on来监听,message1通过once来监听,所以message事件的回调可以执行多次,message1事件的回调只能执行一次

    结果:

    message
    message
    message1

    事件分发/订阅也可以用来做一个功能黑盒,分发者只需要触发这个事件,不需要知道有多少订阅者,这些订阅者都在处理什么业务。事件侦听器模式也是一种钩子机制,node中的封装很多是一个黑盒,不使用事件监听我们很难知道内部状态变化的中间数据。使用事件侦听器可以不必知道内部如何实现,只关注特定的事件点,比如下面的http请求

    let request = require('http').request
    let options = {
        host: 'www.google.com',
        port: 80,
        path: '/upload',
        method: 'POST'
    }
    let req = request(options, function(res) {
        console.log('STATUS', res.statusCode)
        console.log('header', JSON.stringify(res.headers))
        res.setEncoding('utf-8')
        res.on('data', function (chunck) {
            console.log('Chunck', chunck)
        })
        res.on('end', function() {
            console.log('end')
        })
    })
    req.on('error', function(err) {
        console.log(err)
    })
    req.write('data
    ')
    req.write('data
    ')
    req.end()

    只需要监听data  end  error事件就可以完成功能,不需要知道request究竟如何实现

    细节:

    1. 如果对一个事件设置了10个以上的监听器,会有警告。可通过emitter.setMaxListeners(0)将限制去掉
    2. 异常处理,如果有监听error事件,则会执行error中的代码否则将会抛出错误,整个进程结束

    利用队列解决雪崩问题

    雪崩问题:在高并发高访问量的情况下缓存失效的情境下,大量请求同时发向数据库,数据库无法承受如此多的查询请求,进而影响网站整体性能

    var proxy = new events.EventEmitter();
    var status = "ready";
    var select = function (callback) {
      proxy.once("selected", callback);
      if (status === "ready") {
        status = "pending";
        db.select("SQL", function (results) {
          proxy.emit("selected", results);
          status = "ready";
        });
      }
    };


    短时间内,如果执行多个select,不会执行多次查询请求,而是会用此次的callback来监听select事件,只执行一次查询请求,当这个请求返回时,触发select事件,执行之前的所有callback,由于用的once来监听,所以每个callback只会执行一次。

    多异步之间的协作方案

    const fs = require('fs')
    const after = function (times = 0, callback) {
        let time = 0,datas = {}
        return function (key, value) {
            datas[key] = value
            if(++ time >= times) {
                return callback(datas)
            }
        }
    }
    const done = after(2, function(data) {
        console.log(data)
    })
    const readFile = function(path,key, callback) {
        fs.readFile(path, 'utf-8', function(err, data) {
            if(err) {
                throw(err)
            }
            callback(key,data)
        })
    }
    readFile('../a.json', 'a', done)
    readFile('../b.json', 'b', done)


    通过总数和当前执行的次数来计数并执行最终的calback

    const done = after(2, function(data) {
        console.log(data)
    })
    let emit = new events.EventEmitter()
    emit.on('done', done)
    emit.on('done', other)
    function readFile(path, key, emit, name) {
        fs.readFile(path, 'utf-8', function(err, data) {
            if(err) {
                throw err
            }
            emit.emit(name, key, data)
        })
    }
    readFile('../a.json', 'a', emit, 'done')
    readFile('../b.json', 'b', emit, 'done')

    通过事件监听来实现多对多异步

    let EventProxy = require('eventproxy')
    let fs = require('fs')
    let proxy = new EventProxy()
    proxy.all('a', 'b', function(a, b) {
        console.log('a',a,'b',b)
    })
    function readFile(path, name) {
        fs.readFile(path, 'utf-8', function(err, data) {
            if(err) {
                throw err
            }
            proxy.emit(name, data)
        })
    }
    readFile('../a.json', 'a')
    readFile('../b.json', 'b')
    readFile('../a.json', 'a')
    readFile('../b.json', 'b')

    通过eventproxy来实现一对多异步

    promise/deffered模式

    用event实现一个request的promise

    var Deferred = function() {
        this.promise = new Promise()
        this.status = 'unfullfiled'
    }
    Deferred.prototype.resolve = function(obj) {
        this.status = 'fullfiled'
        this.promise.emit('success', obj)
    }
    Deferred.prototype.reject = function(err) {
        this.status = 'failed'
        this.promise.emit('error', err)
    }
    Deferred.prototype.process = function(data) {
        this.promise.emit('process', data)
    }
    
    var promisify = function(res) {
        var deffer = new Deferred()
        let result = ''
        res.setEncoding('utf-8')
        res.on('data', function(chunck) {
            result += chunck
            deffer.process(chunck)
        })
        res.on('end', function() {
            deffer.resolve(result) 
        })
        res.on('error', function(err) {
            deffer.reject(err)
        })
        return deffer.promise
    }
    
    let option = {
        host: 'www.baidu.com',
        port: 80,
        method: 'POST',
        path: 'upload'
    }
    let req = http.request(option, function(res) {
        promisify(res).then(function(data) {
            console.log('success', data)
        }, function(err) {
            console.log('error', err)
        }, function(chunck) {
            console.log('Chunck', chunck)
        })
    })
    req.write('data')
    req.write('data')
    req.end()

    defferd为内部封装promise为外部调用

    有一些库来帮助定制化promise

    流程控制库

    1. 尾触发和next(connect)
    2. 流程控制模块async
      1. 无依赖串行

        let async = require('async')
        let fs = require('fs')
        async.series([
            function(callback) {
                fs.readFile('../a.json', 'utf-8', callback)
            },
            function(callback) {
                fs.readFile('../b.json', 'utf-8', callback)
            },
        ], function(err, datas) {
            console.log(datas)
        })


        相当于 

        let fs = require('fs')
        let callback = function(err, results) {
            console.log(results)
        }
        fs.readFile('../a.json', 'utf-8', function(err, content) {
            if(err) {
                return callback(err)
            }
            fs.readFile('../b.json', 'utf-8', function(err, data) {
                if(err) {
                    return callback(err)
                }
                callback(null, [content, data])
            })
        })

         callback传递由async完成

      2. 并行

        let fs = require('fs')
        let async = require('async')
        async.parallel([
            function(callback) {
                fs.readFile('../a.json', 'utf-8', callback)
            },
            function(callback) {
                fs.readFile('../b.json', 'utf-8', callback)
            },
        ], function(err, results) {
            console.log(results)
        })

        相当于

        let fs = require('fs')
        
        let counter = 2
        let results = []
        let failError = null
        let callback = function(err, data) {
            console.log(data)
        }
        
        let done = function(index, data) {
            results[index] = data
            if(-- counter === 0) {
                callback(null, results)
            }
        }
        
        let fail = function(err) {
            if(!failError) {
                failError = err
                callback(failError)
            }
        }
        
        fs.readFile('../a.json', 'utf-8', function(err, data) {
            if(err) {
                fail()
            }
            done(0, data)
        })
        fs.readFile('../b.json', 'utf-8', function(err, data) {
            if(err) {
                fail()
            }
            done(1, data)
        })
      3. 前一个输出为后一个输入async.waterfall
      4. 自动化执行

        let async = require('async')
        let fs = require('fs')
        
        let dep = {
            b: function(callback) {
                fs.readFile('../b.json', 'utf-8', function(err, data) {
                    console.log(data)
                    callback()
                })
            },
            a: ['b', function(callback) {
                fs.readFile('../a.json', 'utf-8', function(err, data) {
                    console.log(data)
                })
            }]
        }
        
        async.auto(dep)

         使a在b执行完后执行

  • 相关阅读:
    339. Nested List Weight Sum
    41. First Missing Positive
    366. Find Leaves of Binary Tree
    287. Find the Duplicate Number
    130. Surrounded Regions
    ubuntu18.04安装mongodb4.4
    阿里dataX配置使用
    MySQL主从同步简单介绍&配置
    阿里yugong配置使用
    ubuntu编译安装mysql
  • 原文地址:https://www.cnblogs.com/ranjianxi/p/8404541.html
Copyright © 2011-2022 走看看