zoukankan      html  css  js  c++  java
  • node-amqp 使用fanout发布订阅rabbitmq消息

    publisher代码

    const amqp = require('amqp');
    
    let option = {
        host: 'server-ip',
        port: 5672,
        login: 'guest',
        password: 'guest',
        connectionTimeout: 10000,
        authMechanism: 'AMQPLAIN',
        vhost: '/',
        noDelay: true,
        ssl: {
            enabled: false
        }
    }
    const connection = amqp.createConnection(option);
    
    connection.on('error',function(e){
        console.log("Error from amqp: ", e);
    })
    let default_exchange = {};
    connection.on('ready', function(){
        default_exchange = connection.exchange('fans',{type:'fanout'}); //创建 fanout 类型的交换机
        let q = connection.queue('my-queue');
        q.bind(default_exchange,'my-queue');
        
        let qq = connection.queue('qqq');
        qq.bind(default_exchange, 'qqq');
        setInterval(publish_message, 2000);
    })
    
    let count= 0;
    publish_message = function() {
        let message = {
          hello: 'world',
          time: Date.now(),
          count: count++
        };
        default_exchange.publish('', message);
        return console.log("my-queue message published: " + (JSON.stringify(message)) + " to queue: my-queue");
      }; 
    

      receiver1 代码

    const amqp = require('amqp');
    
    let option = {
        host: 'server-ip',
        port: 5672,
        login: 'guest',
        password: 'guest',
        connectionTimeout: 10000,
        authMechanism: 'AMQPLAIN',
        vhost: '/',
        noDelay: true,
        ssl: {
            enabled: false
        }
    }
    const connection = amqp.createConnection(option);
    
    connection.on('error',function(e){
        console.log("Error from amqp: ", e);
    })
    
    connection.on('ready', function(){
        connection.queue('my-queue', function(q){
            console.log('my-queue is already subscribing');
            q.bind('logs','my-queue', function(){
                q.subscribe(function(message){
                    console.log('----receiveMessage: ',message);
                })
            });
        })
    })

    receiver2 代码

    const amqp = require('amqp');
    
    let option = {
        host: 'server-ip',
        port: 5672,
        login: 'guest',
        password: 'guest',
        connectionTimeout: 10000,
        authMechanism: 'AMQPLAIN',
        vhost: '/',
        noDelay: true,
        ssl: {
            enabled: false
        }
    }
    const connection = amqp.createConnection(option);
    
    connection.on('error',function(e){
        console.log("Error from amqp: ", e);
    })
    connection.on('ready', function(){
        connection.queue('qqq', function(q){
            console.log('my-queue is already subscribing');
            q.bind('fans','qqq',function(){
                q.subscribe(function(message){
                    console.log('----receiveMessage: ',message);
                })
            });
        })
    })

    初学,简单测试,理解不深,可能有潜在问题

  • 相关阅读:
    Django通过中间件配置解决跨域
    Kindeditor初始化、及初始化编辑内容
    Bootstrap免费后台管理模版
    微信小程序开发-网络请求-GET/POST不同方式等
    微信小程序开发-文件系统
    微信小程序开发学习记录-源码分享
    【转载】python实例手册
    【改良的选择排序 】
    【选择 插入 冒泡排序】
    【python基础】 Tkinter 之 几何管理器
  • 原文地址:https://www.cnblogs.com/lc-ant/p/9259240.html
Copyright © 2011-2022 走看看