var amqp = require('amqplib'); connect([url, [socketOptions]]) var amqp = require('amqplib/callback_api'); connect([url, [socketOptions]], function(err, conn) {...}) url 中的参数 可以在URI的查询部分中给出进一步的AMQP调整参数,例如,如'amqp://localhost?frameMax=0x1000'。这些是: frameMax 连接上允许的最大帧的大小(以字节为单位)。0意味着没有限制(但是因为帧的大小字段是无符号的32位整数,所以它是perforce 2^32 - 1); 我将其默认为0x1000,即4kb,这是允许的最小值,适合多种用途,而不是通过Node.JS的缓冲池。 channelMax 允许的最大通道数。默认是 0,意思是2^16 - 1。 heartbeat 连接心跳的周期,以秒为单位。默认为0; locale 错误消息的所需区域设置。RabbitMQ只使用过en_US; 幸运的是,这是默认的。 url 也可以作为对象使用 { protocol: 'amqp', hostname: 'localhost', port: 5672, username: 'guest', password: 'guest', locale: 'en_US', frameMax: 0, heartbeat: 0, vhost: '/', }
//关闭连接
connection.close()
回调
connection.close([function(err) {...}])
//断言队列是否存在
assertQueue
//检查队列是否存在
checkQueue
//删除队列(队列不存在将会关闭通道)
deleteQueue
RPC 服务端
var amqp = require('amqplib/callback_api'); //建立连接 amqp.connect({protocol:'amqp(协议 有amqp 和 amqps)',hostname:'rabbit服务器地址',port:端口号,username:'用户名',password:'密码'},function(err,conn) { conn.createChannel(function(err,ch){ var q = 'rpc_queue'; ch.assertQueue(q,{ durable:false }); ch.prefetch(1); console .log('[x]等待RPC请求'); ch.consume(q,function reply(msg) { var n = parseInt(msg.content.toString()); console.log("[.] fib(%d)",n); var r = fibonacci(n); ch.sendToQueue(msg.properties.replyTo,new Buffer(r.toString()),{correlationId:msg.properties.correlationId}); ch.ack(msg); }); }); }); function fibonacci(n) { if(n == 0 || n == 1){ return n; }else{ fibonacci(n - 1)+ fibonacci(n - 2); } }
客户端代码
var amqp = require('amqplib/callback_api'); var args = process.argv.slice(2); console.log(args); if(args.length == 0){ console.log("Usage:rpc_client.js num"); process.exit(1); } amqp.connect({protocol:'amqp(协议有 amqp 和 amqps)',hostname:'rabbit服务器地址',port:端口号,username:'用户名',password:'密码'},function(err,conn) { conn.createChannel(function(err,ch) { ch.assertQueue('',{ exclusive:true },function(err,q) { //随机数唯一值 var corr = generateUuid(); var num = parseInt(args[0]); console.log('[x] Requesting fib(%d)',num); ch.consume(q.queue,function(msg) { if(msg.properties.correlationId == corr){ console.log('[.] Got%s',msg.content.toString()); setTimeout(function() {conn.close(); process.exit(0)},500); } },{ noAck:true }); //replyTo:q.queue 设置 ch.sendToQueue('rpc_queue',new Buffer(num.toString()),{correlationId:corr,replyTo:q.queue}); }); }); }); function generateUuid() { return Math.random().toString()+ Math.random().toString()+ Math.random().toString(); }