zoukankan      html  css  js  c++  java
  • nodejs rabbitmq使用

    var amqp = require('amqp')

    1.一个发送者,多个消费者。exchange发送3条消息,每个队列接收3条消息相同

    connection.on('ready', function () {

    var exchange = connection.exchange('exchange_name', {type: 'fanout',autoDelete:false});


    connection.queue("queue_name",{autoDelete:false}, function(queue){

    queue.bind('exchange_name','queue_name', function() {
    exchange.publish('queue_name', 'this is message is testing ......');
    exchange.publish('queue_name', 'this is message is testing3 ......')

    setTimeout(function() {
    console.log("Single queue bind callback succeeded");
    //exchange.destroy();
    //queue.destroy();node
    connection.end();
    connection.destroy();
    }, 5000);

    })
    //同一个消息多次订阅同一个队列,只会收到一次消息
    /*
    ack:true,确认收到一条消息后,后续对列中的消息接收不到
    prefetchCount:每个消费者接收的消息平均,
    */
    // queue.subscribe({ack:true,prefetchCount:1},function (message) {
    // console.log('2At 5 second recieved message is:'+ message.data);
    // });
    // queue.subscribe({ack:true,prefetchCount:1},function (message) {
    // console.log('At 5 second recieved message is:'+ message.data);
    // });

    queue.subscribe(function (message) {
    console.log('2At 5 second recieved message is:'+ message.data);
    });
    // queue.subscribe(function (message) {
    // console.log('1 5 second recieved message is:'+ message.data);
    // });

    })
    connection.queue("queue_name1",{autoDelete:false}, function(queue){
    queue.bind('exchange_name','queue_name1', function() {
    exchange.publish('queue_name1', 'this is queue_name1 is testing4 ......')
    })

    queue.subscribe(function (message) {
    console.log('queue_name1 5 second recieved message is:'+ message.data);
    });
    })
    })

    打印:

    queue_name1 5 second recieved message is:this is message is testing ......
    2At 5 second recieved message is:this is message is testing ......
    2At 5 second recieved message is:this is message is testing3 ......
    queue_name1 5 second recieved message is:this is message is testing3 ......
    2At 5 second recieved message is:this is queue_name1 is testing4 ......
    queue_name1 5 second recieved message is:this is queue_name1 is testing4 ......

    2.exchange的type为direct,每个个队列,只能收到指定的队列消息

    connection.on('ready', function () {

    var exchange = connection.exchange('exchange_name2', {type: 'direct',autoDelete:false});
    connection.queue("queue_name",{autoDelete:false}, function(queue){
    queue.bind('exchange_name2','queue_name', function() {
    exchange.publish('queue_name', 'this is message is testing ......');
    exchange.publish('queue_name', 'this is message is testing2222 ......');

    setTimeout(function() {
    console.log("Single queue bind callback succeeded");
    //exchange.destroy();
    //queue.destroy();
    connection.end();
    connection.destroy();
    }, 5000)

    })
    queue.subscribe(function (message) {
    console.log('At 2:'+ message.data);
    })
    queue.subscribe(function (message) {
    console.log('At 1:'+ message.data);
    })
    })

    connection.queue('q2',{autoDelete:false},function(q){
    q.bind('exchange_name2','q2',function(){
    exchange.publish('q2','is form q2')
    })
    q.subscribe(function(msg){
    console.log("q2: "+msg.data)
    })
    })


    })

    打印:

    At 2:this is message is testing ......
    At 1:this is message is testing2222 ......
    q2: is form q2

    3.参数尝试,echange,queue,publish持久化

    const amqp = require('amqp');
    var connection = amqp.createConnection( {
    host: '127.0.0.1'
    , port: 5672
    , login: 'test'
    , password: '123'
    , vhost: '/test'
    });
    connection.on('error', function(err) {
    console.log('Connection error', err);
    });
    connection.on('ready', function () {
    // Options
    // - type 'fanout', 'direct', or 'topic' (default)
    // - passive (boolean)
    // - durable (boolean)
    // - autoDelete (boolean, default true)
    var exchange = connection.exchange('exchange_name', {type: 'fanout',autoDelete:false});
    // - passive (boolean)
    // - durable (boolean)持久化
    // - exclusive (boolean)
    // - autoDelete (boolean, default true)
    //'arguments': {'x-expires': 3600000}
    connection.queue("queue_name8",{autoDelete:false,durable:true}, function(queue){
    queue.bind('exchange_name','queue_name8', function() {
    // the third argument can specify additional options
    // - mandatory (boolean, default false)
    // - immediate (boolean, default false)
    // - contentType (default 'application/octet-stream')
    // - contentEncoding
    // - headers
    // - deliveryMode
    // - priority (0-9)
    // - correlationId
    // - replyTo
    // - expiration
    // - messageId
    // - timestamp
    // - userId
    // - appId
    // - clusterId
    //
    exchange.publish('queue_name8', 'this is message is testing ......',{deliveryMode:2});
    exchange.publish('queue_name8', 'this is message is testing3 ......',{deliveryMode:2})

    setTimeout(function() {
    console.log("Single queue bind callback succeeded");
    //exchange.destroy();
    //queue.destroy();node
    connection.end();
    connection.destroy();
    }, 5000);

    })
    //同一个消息多次订阅同一个队列,只会收到一次消息
    /*
    ack:true,确认收到一条消息后,后续对列中的消息接收不到,如果需要下条消息,调用q.shift()
    prefetchCount:每个消费者接收的消息平均,
    */
    // queue.subscribe({ack:true,prefetchCount:1},function (message) {
    // console.log('2At 5 second recieved message is:'+ message.data);
    // });
    // queue.subscribe({ack:true,prefetchCount:1},function (message) {
    // console.log('At 5 second recieved message is:'+ message.data);
    // });

    queue.subscribe(function (message) {
    console.log('2At 5 second recieved message is:'+ message.data);
    });
    // queue.subscribe(function (message) {
    // console.log('1 5 second recieved message is:'+ message.data);
    // });
    })
    connection.queue("queue_name9",{autoDelete:false,durable:true}, function(queue){
    queue.bind('exchange_name','queue_name9', function() {
    exchange.publish('queue_name9', 'this is queue_name1 is testing4 ......',{deliveryMode:2})
    })
    queue.subscribe(function (message) {
    console.log('queue_name1 7 second recieved message is:'+ message.data);
    });
    })
    })

     4.确认收到消息,继续接收消息

    var queName = 'test_q3',ex='test_ex';
    connection.on('ready',function(){
    var ex = connection.exchange(`${ex}`,{type:'fanout',durable:true,autoDelete:false});
    connection.queue(`${queName}`,{durable:true,autoDelete:false},function(q){

    q.bind(ex,queName,function(){
    ex.publish(queName,'test msg1',{deliveryMode:2})
    ex.publish(queName,'test msg2 msg2',{deliveryMode:2})
    })
    setTimeout(function() {
    console.log("Single queue bind callback succeeded");
    //exchange.destroy();
    //queue.destroy();node
    connection.end();
    connection.destroy();
    },1000);
    q.subscribe({ack:true,prefetchCount:1},function(msg){
    q.shift()
    console.log('receive msg : '+msg.data);
    })
    })
    })

    注意:同一个exhcange中的queuename,exhcange,type方式一旦在程序中确定,就不能改变

  • 相关阅读:
    使用IOCP完成端口队列做任务队列
    对Zlib单元进行再封装
    XmlReader/XmlWriter 类
    TextReader/TextWriter 的类
    LINQ to XML
    Xml序列化
    动态Linq(结合反射)
    设计模式总结
    深入了解.Net上下文
    应用程序域
  • 原文地址:https://www.cnblogs.com/qiyc/p/10892197.html
Copyright © 2011-2022 走看看