zoukankan      html  css  js  c++  java
  • RabbitMQ Node.js 示例

    RabbitQM 处理和管理消息队列的中间人(broker)。可简单理解为邮局,你在程序中写好消息,指定好收件人,剩下的事件就是 RabbitMQ 的工作了,它会保证收件人正确收到邮件。

    任何发送邮件的程序都是 Producer,消息队列可理解为邮筒,新件将堆积在此处。所有待处理的消息都以队列形式存储,总体上看来就是一个巨大的消息 buffer,至于存储量与设置的内存及硬件有关。任何应用都可以向队列添加消息,也可以多个消费者都在从队列中获取消息。

    consumer 即是消息队列中消息的应用,其处于等待接收来自 RabbitMQ 发送来的消息。

    消息生产者,消费者及 RabbitMQ 这个中间人三者不必同时存在于同一机器上,实际运用时也确实大部分不会部署在同一机器上,比如有专门的机器作为 RabbitMQ 实体,而应用程序会部署在其他的集群。应用程序可以是同时负责生产消息的,也同时是消费者。

    来自官方文档中关于 RabbitMQ 消息列队的示意图

    来自官方文档中关于 RabbitMQ 消息列队的示意图

    安装

    通过官网提供的地址下载相应平台的程序进行安装,Mac 可通过 Homebrew 进行安装

    $ brew update && brew install rabbitmq

    启动

    如果使用 Homebrew 安装,可通过 brew services start rabbitmq 命令来启动 RabbitMQ 服务。

    $ brew services start rabbitmq
    ==> Successfully started `rabbitmq` (label: homebrew.mxcl.rabbitmq)

    或直接运行 /usr/local/sbin/rabbitmq-server

    启动后,会有一个可视化的管理后台,可通过 http://localhost:15672/ 访问,用户名密码皆为 guest

    基于 Node.js 的 Hello World 示例

    通过 amqp.node 展示 RabbitMQ 在 Node.js 中应用的一个示例。

    RabbmitMQ 支持多种协议进行通信,amqp.node 使用的是 AMQP 0-9-1 这一开源协议,后者专门为处理消息而设计。作为客户端消费消息,使用的是 amqp.node client 模块,但 RabbitMQ 本身是支持多种客户端的。

    初始化一个 Node,js 项目然后通过以下命令安装 amqp.node 模块:

    $ mkdir rabbitmq-demo && yarn init -y
    $ yarn add amqplib

    发送消息

    创建 send.js 文件,在其中编写发送消息的逻辑,它将连接到 RabbitMQ 发送消息然后退出。

    首先建立到 RabbitMQ 服务的连接,

    #!/usr/bin/env node
    

    var amqp = require('amqplib/callback_api');
    amqp.connect('amqp://localhost', function(error0, connection) {});

    连接建立成功后,创建一个通道(channel),具体的发送将会在这个通道中进行。

    amqp.connect('amqp://localhost', function(error0, connection) {
      if (error0) {
        throw error0;
      }
      connection.createChannel(function(error1, channel) {});
    });

    发送消息前,需要先声明一个队列,然后将消息发送到该队列:

    amqp.connect('amqp://localhost', function(error0, connection) {
      if (error0) {
        throw error0;
      }
      connection.createChannel(function(error1, channel) {
        if (error1) {
          throw error1;
        }
        var queue = 'hello';
        var msg = 'Hello world';
    
    <span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
      durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
    });
    
    <span class="pl-smi">channel</span>.<span class="pl-en">sendToQueue</span>(queue, <span class="pl-smi">Buffer</span>.<span class="pl-en">from</span>(msg));
    <span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Sent %s<span class="pl-pds">"</span></span>, msg);
    

    });
    });

    队列的创建是一个幂等操作,只该队列不存在的情况才会新建。

    最后关闭连接并退出。

    setTimeout(function() {
        connection.close();
        process.exit(0);
    }, 500);
    完整的 send.js
    #!/usr/bin/env node
    

    var amqp = require('amqplib/callback_api');

    amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) {
    throw error0;
    }
    connection.createChannel(function(error1, channel) {
    if (error1) {
    throw error1;
    }

        <span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;
        <span class="pl-k">var</span> msg <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>Hello World!<span class="pl-pds">'</span></span>;
    
        <span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
            durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
        });
        <span class="pl-smi">channel</span>.<span class="pl-en">sendToQueue</span>(queue, <span class="pl-smi">Buffer</span>.<span class="pl-en">from</span>(msg));
    
        <span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Sent %s<span class="pl-pds">"</span></span>, msg);
    });
    <span class="pl-c1">setTimeout</span>(<span class="pl-k">function</span>() {
        <span class="pl-smi">connection</span>.<span class="pl-c1">close</span>();
        <span class="pl-c1">process</span>.<span class="pl-en">exit</span>(<span class="pl-c1">0</span>);
    }, <span class="pl-c1">500</span>);
    

    });

    接收消息

    下面开始编写消费者,消费者做的事情是监听来自 RabbitMQ 的消息并处理。

    创建 receive.js,引入 amqp.node 模块,流程和发送者一样,也是先创建连接,然后创建通道,在通道中声明需要监听的队列:

    #!/usr/bin/env node
    

    var amqp = require('amqplib/callback_api');

    amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) {
    throw error0;
    }
    connection.createChannel(function(error1, channel) {
    if (error1) {
    throw error1;
    }
    var queue = 'hello';

    <span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
      durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
    });
    

    });
    });

    这里的队列声明不会与发送者那边的冲突,因为上面提到过,队列只在不存在的情况下才会重新生成。这里再次声明可以保证监听前队列已经存在。并且实际场景下,消费者有可能是在发送者之前启动的。

    然后添加监听的逻辑:

     console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
    

    channel.consume(queue, function(msg) {
    console.log(" [x] Received %s", msg.content.toString());
    }, {
    noAck: true
    });

    完整的 receive.js
    #!/usr/bin/env node
    

    var amqp = require('amqplib/callback_api');

    amqp.connect('amqp://localhost', function(error0, connection) {
    if (error0) {
    throw error0;
    }
    connection.createChannel(function(error1, channel) {
    if (error1) {
    throw error1;
    }

        <span class="pl-k">var</span> queue <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>hello<span class="pl-pds">'</span></span>;
    
        <span class="pl-smi">channel</span>.<span class="pl-en">assertQueue</span>(queue, {
            durable<span class="pl-k">:</span> <span class="pl-c1">false</span>
        });
    
        <span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [*] Waiting for messages in %s. To exit press CTRL+C<span class="pl-pds">"</span></span>, queue);
    
        <span class="pl-smi">channel</span>.<span class="pl-en">consume</span>(queue, <span class="pl-k">function</span>(<span class="pl-smi">msg</span>) {
            <span class="pl-en">console</span>.<span class="pl-c1">log</span>(<span class="pl-s"><span class="pl-pds">"</span> [x] Received %s<span class="pl-pds">"</span></span>, <span class="pl-smi">msg</span>.<span class="pl-c1">content</span>.<span class="pl-c1">toString</span>());
        }, {
            noAck<span class="pl-k">:</span> <span class="pl-c1">true</span>
        });
    });
    

    });

    运行

    分别在命令行启动上面两个程序,查看打印的信息。

    $ node send.js
     [x] Sent Hello World!
    

    $ node receive.js
    [*] Waiting for messages in hello. To exit press CTRL+C
    [x] Received Hello World!

    另外,可通过 sudo rabbitmqctl list_queues 手动查看 RabbitMQ 中的消息。

    $ /usr/local/sbin/rabbitmqctl list_queues
    Timeout: 60.0 seconds ...
    Listing queues for vhost / ...
    name	messages
    hello	0

    如果发现 rabbitmqctl 命令不可用,需要添加 /usr/local/sbin 到环境变量中,

    export PATH=/usr/local/sbin:$PATH

    其中 fish shell 通过添加如下命令到 fish 的配置文件即可:

    set -gx PATH /usr/local/sbin $PATH

    相关资源

  • 相关阅读:
    微信小程序反编译
    Mac 绑定Gitlab或者GitHub帐号,从新生成公钥
    Vue调试工具vue-devtools安装及使用
    NPM和Yarn添加淘宝镜像
    权限菜单设计
    Axure RP 7.0注册码
    Mac用户抓包软件Charles 4.0 破解 以及 抓取Https链接设置
    [转]c++导出函数dll供c#调用
    ef(EntityFramework)动态传递数据库连接字符串
    [转]sqlserver查询系统表统计表行数和占用空间
  • 原文地址:https://www.cnblogs.com/Wayou/p/rabbitmq_nodejs_example.html
Copyright © 2011-2022 走看看