EventEmitter
var stream = require('stream');
var Readable = stream.Readable; //写入类(http-req就是),初始化时会自动调用_read接口;
var util = require('util');
var Reader = function () {
Readable.call(this); //继承其他构造器;
this.counter = 0;
}
util.inherits(Reader, Readable);
Reader.prototype._read = function () {
if(++this.counter > 10) return this.push(null); //传入null即停止;
return this.push(this.counter.toString());
}
var reader = new Reader();
reader.setEncoding('utf8');
reader.on('data', function(chunk) {
console.log(chunk);
})
reader.on('end', function(chunk){
console.log('--finished--');
})
http.createServer
http.Server
的实例,后者扩展自EventEmitter
import http from 'http';
const server = new http.Server();
server.on('connection', socket=> {
console.log('Client arrived: ' + new Date());
/*
socket.on("end", function() {
console.log("Client left: " + new Date());
});*/
socket.write('Hello World!', 'utf8');
socket.end();
});
//客户端数据处理
//测试: curl http://localhost:3000 -d "Here is some data"
server.on('request', (req, socket)=> {
req.setEncoding('utf8');
req.on('readable', ()=> {
console.log(req.read()) //读取
});
});
/*
可以把end延迟执行
server.setTimeout(2000, socket=> {
socket.write('Too Slow!', 'utf8');
socket.end();
});
*/
server.listen(3000);
http.request
- 读取流,很容易piped到写入流
import http from 'http';
const options = {
host: 'www.google.com',
method: 'GET',
path: '/'
};
const callback = function (res) {
res.setEncoding('utf8');
res.on('readable', ()=> {
console.log(res.read()); //读取
});
/*
res.on('end', ()=> {
console.log('client end');
});
*/
};
const client = http.request(options, callback);
client.end();
get
方式直接使用http.get
;
构建代理和隧道
import http from 'http';
const proxy = new http.Server();
proxy.on('request', (req, socket)=> {
const options = {
host: 'www.google.com',
method: 'GET',
path: '/'
};
http.request(options, res=> {
res.pipe(socket); //不再手动监听,直接push到写入流;
}).end();
});
proxy.listen(3000);
子进程
-
child_process
: 使用的进程方法包括spawn, fork, exec, execFile.
-
spawn(command, [arguments], [options])
//option
cwd: 子进程的当前工作目录
env: 环境变量键值对
stdio: 子进程 stdio 配置
customFds: 作为子进程 stdio 使用的文件标示符
detached: 进程组的主控制
uid: 用户进程的ID.
gid: 进程组的ID.
import {spawn} from 'child_process';
const ls = spawn('ls', ['-1h']);
ls.stdout.on('data', data=> {
console.log('stdout: ' + data);
});
ls.stderr.on('data', ()=> {
console.log('stderr: ' + data);
});
ls.on('close', code=> {
console.log('child process exited with code ' + code);
});
fork(modulePath, [arguments], [options])
//仅仅用于执行node
//parent.js
import {fork} from 'child_process';
const cp = fork('./child.js');
cp.on('message', msgobj=> {
console.log('Parent got message: ', msgobj.text);
});
cp.send({
text: 'Hello World!'
});
//child,js
process.on('message', function (msgobj) {
console.log('Child got message:', msgobj.text);
process.send({
text: msgobj.text + ' too'
});
});
-
exec(command, [options], callback)
//回调 -
execFile(file, [args], [options], [callback])
//直接执行特定的程序,参数作为数组传入,不会被bash解释,因此具有较高的安全性。
* 注意:如果命令参数是由用户来输入的,对于exec函数来说是有安全性风险的,因为Shell会运行多行命令,比如ls -l .;pwd
,如逗号分隔,之后的命令也会被系统运行。但使用exeFile命令时,命令和参数分来,防止了参数注入的安全风险。 -
对应的三个同步方法
spawnSync,execFileSync,execSync
; -
构建集群
//parent
import {fork} from 'child_process';
import net from 'net';
import {cpus} from 'os';
let children = [];
cpus().forEach((f, idx)=> {
children.push(fork('./child.js', [idx]));
});
net.createServer(socket=> {
const rand = Math.floor(Math.random() * children.length);
children[rand].send(null, socket);
}).listen(8080)
//child
var id = process.argv[2];
process.on('message', function (n, socket) {
socket.write('child' + id + ' was your server today.
');
socket.end();
});
cluster
- 基于
child_process
, 简化多进程并行化开,构建负载均衡的集群;
//简化上面的集群
import cluster from 'cluster';
import http from 'http';
import {cpus} from 'os';
const numCPUS = cpus().length;
if(cluster.isMaster) { //总控制节点
for (let i = 0; i < numCPUS; i++) {
cluster.fork(); //让所有cpu运行子进程
}
}
if(cluster.isWorker) { //运行节点
http.createServer((req, res)=> {
res.writeHead(200);
res.end(`Hello from ${cluster.worker.id}`)
}).listen(8080);
}
- 可以考虑其他多次封装的包cluster2
cluster
的负载均衡的策略是随机分配的;- 利用进程中的消息通知来共享数据;
- 防止上下文切换: 一般对于
n
核cpu
会开n-1
个进程;如果有多个应用还应该减少每个开的进程数; - 注意:使用
process.send
时,在master
进程中不存在该方法;- master->worker:
worker.send;
- worker->master:
process.send;
- master->worker:
//判断
var cluster = require('cluster');
if(cluster.isMaster){
var worker = cluster.fork()
worker.on('message', function(msg){
console.log(msg);
});
}else{
process.send({as: 'message’});
}
帮助工具util
util.inherits(constructor, superConstructor)
: 原型继承;util.inspect(object[, {showHidden, depth}])
: 将任意对象转换为字符串,用于调试和错误输出;util.isArray(object)
util.isRegExp(object)
util.isError(object)
util.isDate(object)
util.isBuffer(object)
util.deprecate(function, string)
//标明该方法不要再使用