工作告一段落,闲来无事,写了一个在nodejs实现“半阻塞”的控制程序。
一直以来,nodejs以单线程非阻塞,高并发的特性而闻名。搞这个“半阻塞”是东西,有什么用呢?
场景一:
现在的web应用可有都是一个这样的结构:
http服务(node) > 接口(业务逻辑) > 数据库
很多时候,瓶颈一般出现在业务层,或者数据层。更多的可能是某一个业务的处理,拉下整个系统的性能。
当用户或一些不怀好意的人,故意大量调用这些处理逻辑,好吧,你nodejs是非阻塞的,这一大波处理请求就一窝蜂冲到到业务层,很可能导致整个系统性能下降,或者瘫痪。
如果这个时候,node层就把这些耗资源的请求,排好队,控制好并发,甚至分用户排成队,配置好不同身份的用户并发量。是不是很好。
场景二:
应用中,可能会有一些无需即时处理的同一类业务,处理前都需要收集一次资源,处理业务,处理后再清理一次。高消耗工作主要在收集或是清理上。
这个时候,我们可将要处理的业务暂存到队列中,当队列数量到达一个值或是某个时间点时,我们一次性处理完队列中的任务。在消耗上,只做一次“收集”、“清理”的操作。
可见,如果在小型的项目中如果node方便实现类似线程池的功能,对整个项目的稳定性及工作效率是有很大贡献的。
现在说一说我写的这一个“半阻塞”的东西,为什么要基于Promise;
如果你用过q.js或了解EC6,应该对Promise有所了解,不然还请先了解下。
我的想法是,并不需要把整个业务的后续处理全都放到队列中去,而只是将高消耗的那一部分放入队列,利用Promise的异部处理机制来处理后续的操作。
在编写代码的时候你几乎可以忘记队列的存在,但是他就在那里默默的工作着,代码可读性和灵活性没有丝毫影响。
queue-fun
queue-fun 是基于Promise的 运行队列控制类。
使用场景
- 巨量同逻辑业务平稳处理
- 间歇性高并发系统
- 控制单用户占用资源过高
队列
queue-fun.Queue(q)
初始化队列控制 参数q可传
- 无参数 队列使用内置的实现的Promise;
- q/ES6原生Promise 插入队列方法:
push
unshift
go
jump
返回对应的promise
实例化队列 new queue-fun.Queue()(runMax, con)
- runMax 并行运行队列方法的最大个数
- con 配置队列 开始 结束 事件,运行单元的 成功,失败 事件及配置执行单元出错的 重试 机制。
var queue = new queue-fun.Queue()(100,{
"event_succ":function(){} //成功
,"event_err":function(){} //失败
,"event_begin":function(){} //队列开始
,"event_end":function(){} //队列完成
,"event_add":function(){} //有执行项添加进执行单元后执行,注意go及jump不会触发
,"retryON":0 //队列单元出错重试次数
,"retryType":0 //重试模式true/false(优先/搁置)执行
})`
API
queue.push(promisefun, args[], con)
向队列中尾部添加运行单元 fun: promise function args: 传入的参数 con 默认值
{
'event_succ':null //一般没用,某些情况下,可能比then要方便
,'event_err':null //一般没用,某些情况下,可能比then要方便
,'Queue_event':true //默认会执行队列定义的回调
}
queue.unshift(promisefun, args[], con) 同push 向队列中头部添加运行单元
queue.go(promisefun, args[], con) 同push,添加后会启动队列.
queue.jump(promisefun, args[], con) 同unshift,添加后启动队列.
setMax(newMax)
修改并行数
queue.start()
启动队列
queue.stop()
暂停队列
queue.clear()
清空队列
demo
var queuefun = require('queue-fun'); //引入
//初始化Promise异步队列类
var Queue = queuefun.Queue();
//实列化最大并发为2的运行队列
var queue1 = new Queue(2,{
"event_succ":function(data){console.log('queue-succ:',data)} //成功
,"event_err":function(err){console.log('queue-succ:',data)} //失败
});
var q = queuefun.Q; //模块中简单实现了Q的基本功能,可以一试,
//定义一个Promise风格的异步方法
function testfun(i){
var deferred = q.defer();
setTimeout(function(){
if(i && i % 3 == 0){
deferred.reject(new Error("err " + i))
}else{
deferred.resolve(i)
}
},(Math.random() * 2000)>>0)
return deferred.promise;
}
//向队列添加运行单元
queue1.push(testfun,[1]) //添加运行项
queue1.go(testfun,[2]) //添加并自动启动队列
queue1.go(testfun,[3],{Queue_event:0}) //添加不会触发队列 回调的运行项.
queue1.go(testfun,[4]).then(
function(data){console.log('done-succ:',data)},
function(err){console.log('done-err:',err)}
)
queue1.go(testfun,[5],{
event_succ:function(data){console.log('conf-succ:',data)},
event_err:function(err){console.log('conf-err:',err)}
})
关于内置Promise实现类queuefun.Q
实现了Promises/A+规范及done
,spread
,fail
;
API模仿Q;
模拟实现了 q.defer
,q.Promise
,q.all
,q.any
,q.nfcall
,q.nfapply
,q.denodeify
等函数.
.toPromis(obj).then()
如果你习惯了.then风格写代码,你可以尝试用toPromis将普通函数/语句包装一下,让他可以获得then方法,及捕获错误。
var add = function(a,b){return a+b;}
q.toPromis(function(){return add(a+b)})
.then(console.log,console.error)
待完善
- 集群支持
- 内存溢出隐患处理
- 其它Promise实现类的支持
安装:npm install quque-fun