zoukankan      html  css  js  c++  java
  • nodejs操作消息队列RabbitMQ

    一. 什么是消息队列

    消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。
    其主要用途:不同进程Process/线程Thread之间通信。

    为什么会产生消息队列?有几个原因:

    不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;

    不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

    二. 常用的消息队列

    RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq

    三. 使用场景

    异步处理

    应用解耦

    流量削峰

    四 使用amqplib操作RabbitMQ

    安装 amqplib

    npm install amqplib
    生产者:

    let amqp = require('amqplib');

    class RabbitMQ {
    constructor() {
    this.hosts = [];
    this.index = 0;
    this.length = this.hosts.length;
    this.open = amqp.connect(this.hosts[this.index]);
    }
    sendQueueMsg(queueName, msg, errCallBack) {
    let self = this;

    self.open
    .then(function (conn) {
    return conn.createChannel();
    })
    .then(function (channel) {
    return channel.assertQueue(queueName).then(function (ok) {
    return channel.sendToQueue(queueName, new Buffer(msg), {
    persistent: true
    });
    })
    .then(function (data) {
    if (data) {
    errCallBack && errCallBack("success");
    channel.close();
    }
    })
    .catch(function () {
    setTimeout(() => {
    if (channel) {
    channel.close();
    }
    }, 500)
    });
    })
    .catch(function () {
    let num = self.index++;

    if (num <= self.length - 1) {
    self.open = amqp.connect(self.hosts[num]);
    } else {
    self.index == 0;
    }
    });
    }
    }

    let mq = new RabbitMQ();
    mq.sendQueueMsg('testQueue', '123', (error) => {
    console.log(error)
    })
    消费者

    let amqp = require('amqplib');

    class RabbitMQ {
    constructor() {
    this.hosts = [];
    this.index = 0;
    this.length = this.hosts.length;
    this.open = amqp.connect(this.hosts[this.index]);
    }

    receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
    let self = this;

    self.open
    .then(function (conn) {
    return conn.createChannel();
    })
    .then(function (channel) {
    return channel.assertQueue(queueName)
    .then(function (ok) {
    return channel.consume(queueName, function (msg) {
    if (msg !== null) {
    let data = msg.content.toString();
    channel.ack(msg);
    receiveCallBack && receiveCallBack(data);
    }
    })
    .finally(function () {
    setTimeout(() => {
    if (channel) {
    channel.close();
    }
    }, 500)
    });
    })
    })
    .catch(function () {
    let num = self.index++;
    if (num <= self.length - 1) {
    self.open = amqp.connect(self.hosts[num]);
    } else {
    self.index = 0;
    self.open = amqp.connect(self.hosts[0]);
    }
    });
    }
    }

    let mq = new RabbitMQ();
    mq.receiveQueueMsg('testQueue',(msg) =>
    {
    console.log(msg)//123
    })
    打开mq后台 http://127.0.0.1:15672/ 看到新增队列,接受一条消息

    当运行消费者代码时输入 123,消息队列消息为0


    ---------------------

  • 相关阅读:
    sublime使用及插件
    Unity 查找
    Unity 3D 的四种坐标系
    C#知识点<4>
    C#知识点<3>
    C#知识点<2>
    排序算法
    OOP的三大特性------封装、继承、多态
    C#常用函数
    C++-------------类和对象
  • 原文地址:https://www.cnblogs.com/ly570/p/11192628.html
Copyright © 2011-2022 走看看