zoukankan      html  css  js  c++  java
  • Node.js amqplib 连接 Rabbitmq 学习笔记

     var amqp = require('amqplib');
     connect([url, [socketOptions]])
     
     var amqp = require('amqplib/callback_api');
     connect([url, [socketOptions]], function(err, conn) {...})
     
     url 中的参数
     可以在URI的查询部分中给出进一步的AMQP调整参数,例如,如'amqp://localhost?frameMax=0x1000'。这些是:
     
     frameMax
     
     连接上允许的最大帧的大小(以字节为单位)。0意味着没有限制(但是因为帧的大小字段是无符号的32位整数,所以它是perforce 2^32 - 1); 我将其默认为0x1000,即4kb,这是允许的最小值,适合多种用途,而不是通过Node.JS的缓冲池。
     
     channelMax
     
     允许的最大通道数。默认是 0,意思是2^16 - 1。
     
     heartbeat
     
     连接心跳的周期,以秒为单位。默认为0;
     
     locale
     
     错误消息的所需区域设置。RabbitMQ只使用过en_US; 幸运的是,这是默认的。
     
     url  也可以作为对象使用
     
     {
       protocol: 'amqp',
       hostname: 'localhost',
       port: 5672,
       username: 'guest',
       password: 'guest',
       locale: 'en_US',
       frameMax: 0,
       heartbeat: 0,
       vhost: '/',
     }


    //关闭连接
    connection.close()
    回调

    connection.close([function(err) {...}])

    //断言队列是否存在

    assertQueue

    //检查队列是否存在

    checkQueue

    //删除队列(队列不存在将会关闭通道)

    deleteQueue 

    RPC 服务端

    var amqp = require('amqplib/callback_api');
    //建立连接
    amqp.connect({protocol:'amqp(协议 有amqp 和 amqps)',hostname:'rabbit服务器地址',port:端口号,username:'用户名',password:'密码'},function(err,conn) {
    
        conn.createChannel(function(err,ch){
            var q = 'rpc_queue';
    
            ch.assertQueue(q,{ durable:false });
            ch.prefetch(1);
            console .log('[x]等待RPC请求');
            ch.consume(q,function  reply(msg) {
                var n = parseInt(msg.content.toString());
    
                console.log("[.] fib(%d)",n);
    
                var r = fibonacci(n);
    
                ch.sendToQueue(msg.properties.replyTo,new Buffer(r.toString()),{correlationId:msg.properties.correlationId});
                ch.ack(msg);
            });
        });
    });
    
    function  fibonacci(n) {
        if(n == 0 || n == 1){
            return n;
        }else{
            fibonacci(n  - 1)+ fibonacci(n  - 2);
        }
    }

    客户端代码

    var amqp = require('amqplib/callback_api');
    
    var args = process.argv.slice(2);
    console.log(args);
    if(args.length == 0){
        console.log("Usage:rpc_client.js num");
        process.exit(1);
    }
    
    amqp.connect({protocol:'amqp(协议有 amqp 和 amqps)',hostname:'rabbit服务器地址',port:端口号,username:'用户名',password:'密码'},function(err,conn) {
        conn.createChannel(function(err,ch) {
            ch.assertQueue('',{ exclusive:true },function(err,q) {
                //随机数唯一值
                var corr = generateUuid();
                var num = parseInt(args[0]);
    
                console.log('[x] Requesting fib(%d)',num);
    
                ch.consume(q.queue,function(msg) {
                    if(msg.properties.correlationId == corr){
                        console.log('[.] Got%s',msg.content.toString());
                        setTimeout(function() {conn.close(); process.exit(0)},500);
                    }
                },{ noAck:true });
                //replyTo:q.queue 设置
                ch.sendToQueue('rpc_queue',new Buffer(num.toString()),{correlationId:corr,replyTo:q.queue});
            });
        });
    });
    
    function  generateUuid() {
        return  Math.random().toString()+
            Math.random().toString()+
            Math.random().toString();
    }
  • 相关阅读:
    spring揭秘读书笔记----spring的ioc容器之BeanFactory
    spring启动加载过程源码分析
    java线程数过高原因分析
    spring揭秘读书笔记----ioc的基本概念
    git merge rebase的区别及应用场景
    spring实现定时任务
    jetty.xml解析
    Hackthebox--------irked
    CTF之信息泄漏
    CTF web题型解题技巧
  • 原文地址:https://www.cnblogs.com/objects/p/9424720.html
Copyright © 2011-2022 走看看