zoukankan      html  css  js  c++  java
  • 【译】RabbitMQ:发布-订阅(Publish/Subscribe)

    在前一篇教程中,我们创建了一个工作队列,我们假设在工作队列后的每一个任务都只被调度给一个消费者。在这一部分,我们将做一些完全不一样的事情,调度同一条消息给多个消费者,也就是有名的“发布-订阅”模式。为了阐述这种模式,我们将构建一个简单的日志系统。该系统将由两部分组成:一部分发送日志消息,另一部分接收并且打印日志消息,在这个日志系统中,每一份运行着的接收程序都将会收到消息。这样我们可以运行一个接收者把日志写入到磁盘中,同时可以运行另一个接收者将日志打印到显示器上面。也就是说,发布的日志消息会被广播到所有的接收者。

    交换器

    在前面的教程中,我们发送消息到队列,然后从队列中接收消息。现在开始介绍RabbitMQ完整的消息模式。

    让我们快速的复习一下在前面的教程中讲过的内容:

    • 生产者是一个发送消息的应用程序。
    • 队列是存储消息的缓存。
    • 消费者是一个接收消息的应用程序。

     

    RabbitMQ消息模式的核心是生产者从不直接发送消息到队列。事实上,生产者往往不知道他产生的消息会被分发到哪些队列,它只能将消息发送到一个交换器。交换器非常简单,它一方面从生产者接收消息,另一方面又将消息压入队列中。交换器必须清楚的知道要用接收到的消息做什么,是应当追加到某个指定的队列?或者追加到很多队列?或者应当丢弃?要完成这些的规则都被定义在交换器的类型中。

                                                     

    有几种可用的交换器类型:directtopicheadersfanout。本文主要关注最后一种类型:fanout,让我们创建一个这种类型的交换器,命名为logs:

    1 channel.ExchangeDeclare("logs", "fanout");

    类型为fanout的交换器非常简单,顾名思义,它会广播所有收到的消息到它知道的所有的队列,而这也正是我们的日志系统所需要的。

    交换器清单

    为了展示服务器上交换器的清单,你可以运行在任何时候都特别有用的rabbitmqctl:

     1 $ sudo rabbitmqctl list_exchanges
     2 Listing exchanges ...
     3         direct
     4 amq.direct      direct
     5 amq.fanout      fanout
     6 amq.headers     headers
     7 amq.match       headers
     8 amq.rabbitmq.log        topic
     9 amq.rabbitmq.trace      topic
    10 amq.topic       topic
    11 logs    fanout
    12 ...done.

    在清单里,有一些amp.*样式的交换器和一个默认(未命名)的交换器,这些都是默认创建的,但并不是说你马上就需要使用它们。

    匿名交换器

    在前面的教程中我们并不知晓交换器的任何信息,但是任然可以将消息发送到队列中,那是因为我们使用了默认的交换器,使用空字符串表示("")。

    回忆一下之前是如何发布消息的:

    1 var message = GetMessage(args);
    2 var body = Encoding.UTF8.GetBytes(message);
    3 channel.BasicPublish(exchange: "",
    4                      routingKey: "hello",
    5                      basicProperties: null,
    6                      body: body);

    第一个参数就是交换器的名称,空字符串指代的是默认交换器或者是匿名交换器,如果队列存在,消息将通过指定的routingKey路由到队列。

    现在我们可以将消息发布到上面定义的命名交换器了:

    1 var message = GetMessage(args);
    2 var body = Encoding.UTF8.GetBytes(message);
    3 channel.BasicPublish(exchange: "logs",
    4                      routingKey: "",
    5                      basicProperties: null,
    6                      body: body);

    临时队列

    你或许还记得我们之前使用的有指定名称的队列(还记得hellotask_queue么?)。能为队列命名对我们来说是至关重要的,我们需要指定给消费者相同的队列。当你想在生产者和消费者间共享队列时,给队列指定一个名字就显得特别重要了。

    但是这并不是我们日志系统的问题。我们希望能监听到所有消息,而不仅仅是其中一个子集;我们对当前流入的消息感兴趣而不是之前的旧信息。为了解决这个问题,我们需要做两件事:第一、无论何时连接到RabbitMQ,我们需要一个新的空队列,为此我们可以创建一个拥有随机名称的队列或者更好的是直接让RabbitMQ服务替我们生成一个随机名称;第二、一旦消费者断开连接,队列应当被自动删除。

    .NET 客户端,我们通过提供无参数的QueueDeclare()函数可以创建一个不持久化、独占的、自动删除的拥有随机名称的队列:

    1 var queueName = channel.QueueDeclare().QueueName;

    这样queueName就是一个随机的队列名称,看起来会是这样的:amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    绑定

                                                     

    我们已经创建了一个fanout类型的交换器和一个队列,现在需要告诉交换器把消息发送到我们的队列。交换器和队列的关系就叫做绑定。

    1 channel.QueueBind(queue: queueName,
    2                   exchange: "logs",
    3                   routingKey: "");

    到目前为止,交换器logs将能添加消息到我们的队列中了。

    绑定清单

    你可以通过rabbitmqctl list_bingdings命令查看绑定清单。

    组合在一起

                                                     

    发送日志的生产者程序和之前教程里面的没有太多不同,最重要的改变是现在我们希望将消息发送到logs交换器,而不是之前的匿名交换器。当发送消息的时候,我们需要指定一个routingKey,但是在使用fanout类型交换器的时候,它的值将被忽略。下面是EmitLog.cs文件里面的代码:

     1 using System;
     2 using RabbitMQ.Client;
     3 using System.Text;
     4 
     5 class EmitLog
     6 {
     7     public static void Main(string[] args)
     8     {
     9         var factory = new ConnectionFactory() { HostName = "localhost" };
    10         using(var connection = factory.CreateConnection())
    11         using(var channel = connection.CreateModel())
    12         {
    13             channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    14 
    15             var message = GetMessage(args);
    16             var body = Encoding.UTF8.GetBytes(message);
    17             channel.BasicPublish(exchange: "logs",
    18                                  routingKey: "",
    19                                  basicProperties: null,
    20                                  body: body);
    21             Console.WriteLine(" [x] Sent {0}", message);
    22         }
    23 
    24         Console.WriteLine(" Press [enter] to exit.");
    25         Console.ReadLine();
    26     }
    27 
    28     private static string GetMessage(string[] args)
    29     {
    30         return ((args.Length > 0)
    31                ? string.Join(" ", args)
    32                : "info: Hello World!");
    33     }
    34 }

    如你所见,在创建链接之后我们申明了交换器,这一步用于禁止发布到不存在的交换器是很有必要的。如果没有队列绑定到交换器发布的消息将会丢失,这是没有问题的;如果没有消费者监听消息,我们可以安全的销毁它。

    ReceiveLog.cs中的代码:

     1 using System;
     2 using RabbitMQ.Client;
     3 using RabbitMQ.Client.Events;
     4 using System.Text;
     5 
     6 class ReceiveLogs
     7 {
     8     public static void Main()
     9     {
    10         var factory = new ConnectionFactory() { HostName = "localhost" };
    11         using(var connection = factory.CreateConnection())
    12         using(var channel = connection.CreateModel())
    13         {
    14             channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    15 
    16             var queueName = channel.QueueDeclare().QueueName;
    17             channel.QueueBind(queue: queueName,
    18                               exchange: "logs",
    19                               routingKey: "");
    20 
    21             Console.WriteLine(" [*] Waiting for logs.");
    22 
    23             var consumer = new EventingBasicConsumer(channel);
    24             consumer.Received += (model, ea) =>
    25             {
    26                 var body = ea.Body;
    27                 var message = Encoding.UTF8.GetString(body);
    28                 Console.WriteLine(" [x] {0}", message);
    29             };
    30             channel.BasicConsume(queue: queueName,
    31                                  noAck: true,
    32                                  consumer: consumer);
    33 
    34             Console.WriteLine(" Press [enter] to exit.");
    35             Console.ReadLine();
    36         }
    37     }
    38 }

    像之前那样编译,工作就完成了。

    1 $ csc /r:"RabbitMQ.Client.dll" EmitLogs.cs
    2 $ csc /r:"RabbitMQ.Client.dll" ReceiveLogs.cs

    如果你想将日志保存到文件中,打开控制台然后输入:

    1 $ ReceiveLogs.exe > logs_from_rabbit.log

    如果你想在屏幕上看到日志,打开一个新的终端,执行下面的代码:

    1 $ ReceiveLogs.exe

    当然,发送日志输入:

    1 $ EmitLog.exe

    使用rabbitmqctl list_bindings命令,可以看到代码确如我们希望的那样创建了绑定和队列。如果同时运行两个消费者(ReceiveLogs.cs)你将能看到下面这样的信息:

    1 $ sudo rabbitmqctl list_bindings
    2 Listing bindings ...
    3 logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
    4 logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
    5 ...done.

    结果非常的直观:数据从交换器logs发送到两个服务自动指定名称的队列,这正是我们之前预期的。

    要了解如何监听消息的子集,让我们进入下一篇。

    原文链接:http://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

  • 相关阅读:
    李洪强iOS开发之上传照片时英文改中文
    李洪强iOS开发之让您的Xcode键字如飞
    李洪强iOS开发之initWithFrame,initWithCoder和aweakFormNib
    跟我学设计模式视频教程——管擦者模式(下),责任链模式(上)
    leetcode
    POJ 3071 Football(概率DP)
    小贝_mysql数据库备份与恢复
    第2次实验——算法基本功 与 综合思考
    加密壳之ACProtect之OEP的处理
    C语言的各种位运算符的操作简述
  • 原文地址:https://www.cnblogs.com/chen108/p/4957357.html
Copyright © 2011-2022 走看看