zoukankan      html  css  js  c++  java
  • node-amqp 使用fanout发布订阅rabbitmq消息

    publisher代码

    const amqp = require('amqp');
    
    let option = {
        host: 'server-ip',
        port: 5672,
        login: 'guest',
        password: 'guest',
        connectionTimeout: 10000,
        authMechanism: 'AMQPLAIN',
        vhost: '/',
        noDelay: true,
        ssl: {
            enabled: false
        }
    }
    const connection = amqp.createConnection(option);
    
    connection.on('error',function(e){
        console.log("Error from amqp: ", e);
    })
    let default_exchange = {};
    connection.on('ready', function(){
        default_exchange = connection.exchange('fans',{type:'fanout'}); //创建 fanout 类型的交换机
        let q = connection.queue('my-queue');
        q.bind(default_exchange,'my-queue');
        
        let qq = connection.queue('qqq');
        qq.bind(default_exchange, 'qqq');
        setInterval(publish_message, 2000);
    })
    
    let count= 0;
    publish_message = function() {
        let message = {
          hello: 'world',
          time: Date.now(),
          count: count++
        };
        default_exchange.publish('', message);
        return console.log("my-queue message published: " + (JSON.stringify(message)) + " to queue: my-queue");
      }; 
    

      receiver1 代码

    const amqp = require('amqp');
    
    let option = {
        host: 'server-ip',
        port: 5672,
        login: 'guest',
        password: 'guest',
        connectionTimeout: 10000,
        authMechanism: 'AMQPLAIN',
        vhost: '/',
        noDelay: true,
        ssl: {
            enabled: false
        }
    }
    const connection = amqp.createConnection(option);
    
    connection.on('error',function(e){
        console.log("Error from amqp: ", e);
    })
    
    connection.on('ready', function(){
        connection.queue('my-queue', function(q){
            console.log('my-queue is already subscribing');
            q.bind('logs','my-queue', function(){
                q.subscribe(function(message){
                    console.log('----receiveMessage: ',message);
                })
            });
        })
    })

    receiver2 代码

    const amqp = require('amqp');
    
    let option = {
        host: 'server-ip',
        port: 5672,
        login: 'guest',
        password: 'guest',
        connectionTimeout: 10000,
        authMechanism: 'AMQPLAIN',
        vhost: '/',
        noDelay: true,
        ssl: {
            enabled: false
        }
    }
    const connection = amqp.createConnection(option);
    
    connection.on('error',function(e){
        console.log("Error from amqp: ", e);
    })
    connection.on('ready', function(){
        connection.queue('qqq', function(q){
            console.log('my-queue is already subscribing');
            q.bind('fans','qqq',function(){
                q.subscribe(function(message){
                    console.log('----receiveMessage: ',message);
                })
            });
        })
    })

    初学,简单测试,理解不深,可能有潜在问题

  • 相关阅读:
    Selenium2学习-002-Selenium2 Web 元素定位及 XPath 编写演示示例
    Selenium2学习-001-Selenium2 WebUI自动化Java开发 Windows 环境配置
    Selenium2学习-000-Selenium2初识
    000-沉沦,幸福
    JMeter学习-005-JMeter 主要组件概要介绍及执行顺序
    JMeter学习-004-WEB脚本入门实战
    JMeter学习-003-JMeter与LoadRunner的异曲同工
    C#设计模式之十四命令模式(Command Pattern)【行为型】
    C#设计模式之十三模板方法模式(Template Method Pattern)【行为型】
    C#设计模式之十二代理模式(Proxy Pattern)【结构型】
  • 原文地址:https://www.cnblogs.com/lc-ant/p/9259240.html
Copyright © 2011-2022 走看看