zoukankan      html  css  js  c++  java
  • 架构设计之NodeJS操作消息队列RabbitMQ

    一. 什么是消息队列?

    消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

    消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

    二. 常用的消息队列有哪些?

    RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq。

    甚至现在部分NoSQL也可做消息队列,如Redis。

    三. 消息队列的使用场景?

    • 异步处理

    • 应用解耦

    • 流量削峰

    四. 使用案例

    上规模的公司都会有自己的日志分析系统,日志系统是怎么实现的呢?

    图解:用户在访问应用的时候,我们要记录下用户的操作记录和系统的异常日志,常规的做法是将系统产生的日志保存到服务器磁盘,在服务器中开启定时任务,定时将磁盘的日志信息传入mq中(生产者),也定时将mq中的消息取出并存到相应的数据库,如ElasticSearch或Hive中。

    五. 如何安装RabbitMQ?

    上面的案例介绍了MQ的一个使用场景,我这里是用RabbitMQ举例,现实项目中可能用到的是Kafka。

    1. 首先安装brew(mac为例)

      /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 
    2. 安装RabbitMQ

      brew install rabbitmq
    3. 运行RabbitMQ

      进入到 /usr/local/Cellar/rabbitmq/3.7.7,执行

      sbin/rabbitmq-server
    4. 启动插件

      进入到 /usr/local/Cellar/rabbitmq/3.7.7/sbin

      ./rabbitmq-plugins enable rabbitmq_management
    5. 登陆管理界面

      打开浏览器输入:http://localhost:15672,RabbitMQ默认15672端口六. Nodejs操作RabbitMQ

         

    网上可以找到好几个相应的Node SDK,这里推荐amqplib

    1. 生产者

    /**
     * 对RabbitMQ的封装
     */
    let amqp = require('amqplib');
    
    class RabbitMQ {
        constructor() {
            this.hosts = [];
            this.index = 0;
            this.length = this.hosts.length;
            this.open = amqp.connect(this.hosts[this.index]);
        }
        sendQueueMsg(queueName, msg, errCallBack) {
            let self = this;
    
            self.open
                .then(function (conn) {
                    return conn.createChannel();
                })
                .then(function (channel) {
                    return channel.assertQueue(queueName).then(function (ok) {
                        return channel.sendToQueue(queueName, new Buffer(msg), {
                            persistent: true
                        });
                    })
                        .then(function (data) {
                            if (data) {
                                errCallBack && errCallBack("success");
                                channel.close();
                            }
                        })
                        .catch(function () {
                            setTimeout(() => {
                                if (channel) {
                                    channel.close();
                                }
                            }, 500)
                        });
                })
                .catch(function () {
                    let num = self.index++;
    
                    if (num <= self.length - 1) {
                        self.open = amqp.connect(self.hosts[num]);
                    } else {
                        self.index == 0;
                    }
                });
        }
    }

    2. 消费者

    /**
     * 对RabbitMQ的封装
     */
    let amqp = require('amqplib');
    
    class RabbitMQ {
        constructor() {
            this.open = amqp.connect(this.hosts[this.index]);
        }
        receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
            let self = this;
    
            self.open
                .then(function (conn) {
                    return conn.createChannel();
                })
                .then(function (channel) {
                    return channel.assertQueue(queueName)
                        .then(function (ok) {
                            return channel.consume(queueName, function (msg) {
                                if (msg !== null) {
                                    let data = msg.content.toString();
                                    channel.ack(msg);
                                    receiveCallBack && receiveCallBack(data);
                                }
                            })
                                .finally(function () {
                                    setTimeout(() => {
                                        if (channel) {
                                            channel.close();
                                        }
                                    }, 500)
                                });
                        })
                })
                .catch(function () {
                    let num = self.index++;
                    if (num <= self.length - 1) {
                        self.open = amqp.connect(self.hosts[num]);
                    } else {
                        self.index = 0;
                        self.open = amqp.connect(self.hosts[0]);
                    }
                });
        }

    3. 通过生产者向MQ发送一个消息,并创建队列

    let mq = new RabbitMQ();
    mq.sendQueueMsg('testQueue', 'my first message', (error) => {
        console.log(error)
    })

    执行之后,我们打开管理平台,发现RabbbitMQ已经接受到了一条消息:

    并且RabbbitMQ新增了一个队列testQueue

    4. 获取指定队列的消息

    let mq = new RabbitMQ();
    mq.receiveQueueMsg('testQueue',(msg) => 
    {    
       console.log(msg)
    })
    // 输出结果:my first message复制代码

    此时打开RabbitMQ管理平台,消息数量已经变为0

    综上:我们简单讲述了消息队列及RabbitMQ相关的一些知识,以及我们如何通过nodejs来生产与消费消息,上面讲的比较简单,之后会发表更多文章讲述消息队列集群搭建及容灾的实现。

  • 相关阅读:
    yii主题
    aptana studio 使用技巧整理
    big database url
    yii表单输入元素
    下载,和scp上传问题
    对缓存的思考——提高命中率
    php用户名密码
    openx -书表添加字段
    搜索
    python——常用模块2
  • 原文地址:https://www.cnblogs.com/wukong-holmes/p/9306733.html
Copyright © 2011-2022 走看看