zoukankan      html  css  js  c++  java
  • RabbitMQ系列教程之四:路由(Routing)(转载)

    (使用Net客户端)

    在上一个教程中,我们构建了一个简单的日志系统,我们能够向许多消息接受者广播发送日志消息。

    在本教程中,我们将为其添加一项功能 ,这个功能是我们将只订阅消息的一个子集成为可能。 例如,我们可以只将关键的错误消息输出到日志文件(以节省磁盘空间),同时仍然可以在控制台上打印所有日志消息。

    1、绑定

    在以前的例子中,我们已经创建了绑定。 你可能会记得如下代码:

    channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");


    【绑定】是【消息交换机】和【队列】之间的关系纽带,通过绑定把二者关联起来。 这可以简单地理解为:队列可以接收来自此【消息交换机】的消息。

    【绑定】可以占用额外的路由选择参数。 为了避免与BasicPublish参数混淆,我们将其称为【绑定键】。 以下代码就是如何用一个键值来创建一个绑定:

    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: "black");


    【绑定键】的含义取决于交换类型。 以前我们使用的【Fanout】类型的【消息交换机】忽略了它的取值。

    2、直接交换

       在上一个教程中,我们的日志记录系统向所有【消费者】发送所有消息。 我们希望将其扩展为允许基于其严重性过滤消息。 例如,我们可能希望将写入磁盘日志消息的脚本仅接受严重错误,而不会在警告或信息日志消息上浪费磁盘空间。

       我们正在使用一个【Fanout】类型的【消息交换机】,它不会给我们带来很大的灵活性 - 它只能无意识地发送。

       我们将使用一个【Direct】类型的【消息交换机】。 直接转换路由的背后的算法其实是很简单的 - 把消息传递到【绑定键 binding key】和消息的【路由键 routing key】完全匹配的队列中。

       为了说明,请考虑以下设置:

           

       在这个设置中,我们可以看到【Direct】类型的【消息交换机】X与两个队列相绑定。 第一个队列与【绑定键】的值是Orange相绑定的,第二个队列有两个绑定,一个【绑定键】的值是black,另一个【绑定键】的值是green。

       在这样的设置中,发布到具有【路由键】为orange的【消息交换机】的消息将被路由到队列Q1。 具有black或green【路由键】的消息将转到Q2。 所有其他消息将被丢弃。


    3、多重绑定

           

      使用相同的【绑定键】绑定多个队列是完全合法的。 在我们的示例中,我们可以在X和Q1之间添加【绑定键】是black的绑定。 在这种情况下,【direct】类型的【消息交换机】将表现得像【Fanout】类型的【消息交换机】,并将消息发送到所有匹配的队列。 具有【路由键】是black的消息将传送到Q1和Q2。


    4、发出日志

       我们将发送消息到【Direct】类型的【消息交换机】来替换【fanout】类型的【消息交换机】,在我们现在的日志系统将使用此模型。 我们将提供日志严重性作为【路由键】。 这样接收脚本就能够选择想要接收的严重性。 我们首先关注发出日志。

       像以前一样,我们首先要建立一个【消息交换机】:

    channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

        现在,我们准备发送消息:

    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "direct_logs",
                         routingKey: severity,
                         basicProperties: null,
                         body: body);


        为了简化事情,我们假设“严重性”可以是“信息”,“警告”,“错误”之一。

    5、订阅

       接收消息将像上一个教程一样工作,除了一个例外 - 我们将为每个我们感兴趣的严重性创建一个新的绑定。

    复制代码
    var queueName = channel.QueueDeclare().QueueName;
    
    foreach(var severity in args)
    {
        channel.QueueBind(queue: queueName,
                          exchange: "direct_logs",
                          routingKey: severity);
    }
    复制代码


    6、整合

          

      以下是EmitLogDirect.cs类的代码:

    复制代码
     1 using System;
     2 using System.Linq;
     3 using RabbitMQ.Client;
     4 using System.Text;
     5 
     6 class EmitLogDirect
     7 {
     8     public static void Main(string[] args)
     9     {
    10         var factory = new ConnectionFactory() { HostName = "localhost" };
    11         using(var connection = factory.CreateConnection())
    12         using(var channel = connection.CreateModel())
    13         {
    14             channel.ExchangeDeclare(exchange: "direct_logs",
    15                                     type: "direct");
    16 
    17             var severity = (args.Length > 0) ? args[0] : "info";
    18             var message = (args.Length > 1)
    19                           ? string.Join(" ", args.Skip( 1 ).ToArray())
    20                           : "Hello World!";
    21             var body = Encoding.UTF8.GetBytes(message);
    22             channel.BasicPublish(exchange: "direct_logs",
    23                                  routingKey: severity,
    24                                  basicProperties: null,
    25                                  body: body);
    26             Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
    27         }
    28 
    29         Console.WriteLine(" Press [enter] to exit.");
    30         Console.ReadLine();
    31     }
    32 }
    复制代码


    以下是ReceiveLogsDirect.cs类的代码:

    复制代码
     1 using System;
     2 using RabbitMQ.Client;
     3 using RabbitMQ.Client.Events;
     4 using System.Text;
     5 
     6 class ReceiveLogsDirect
     7 {
     8     public static void Main(string[] args)
     9     {
    10         var factory = new ConnectionFactory() { HostName = "localhost" };
    11         using(var connection = factory.CreateConnection())
    12         using(var channel = connection.CreateModel())
    13         {
    14             channel.ExchangeDeclare(exchange: "direct_logs",
    15                                     type: "direct");
    16             var queueName = channel.QueueDeclare().QueueName;
    17 
    18             if(args.Length < 1)
    19             {
    20                 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
    21                                         Environment.GetCommandLineArgs()[0]);
    22                 Console.WriteLine(" Press [enter] to exit.");
    23                 Console.ReadLine();
    24                 Environment.ExitCode = 1;
    25                 return;
    26             }
    27 
    28             foreach(var severity in args)
    29             {
    30                 channel.QueueBind(queue: queueName,
    31                                   exchange: "direct_logs",
    32                                   routingKey: severity);
    33             }
    34 
    35             Console.WriteLine(" [*] Waiting for messages.");
    36 
    37             var consumer = new EventingBasicConsumer(channel);
    38             consumer.Received += (model, ea) =>
    39             {
    40                 var body = ea.Body;
    41                 var message = Encoding.UTF8.GetString(body);
    42                 var routingKey = ea.RoutingKey;
    43                 Console.WriteLine(" [x] Received '{0}':'{1}'",
    44                                   routingKey, message);
    45             };
    46             channel.BasicConsume(queue: queueName,
    47                                  noAck: true,
    48                                  consumer: consumer);
    49 
    50             Console.WriteLine(" Press [enter] to exit.");
    51             Console.ReadLine();
    52         }
    53     }
    54 }
    复制代码


    如果您只想将“警告”和“错误”(而不是“信息”)保存到文件中,只需打开控制台并键入:

    cd ReceiveLogsDirect
    dotnet run warning error > logs_from_rabbit.log


    如果您想查看屏幕上的所有日志消息,请打开一个新终端,然后执行以下操作:

    cd ReceiveLogsDirect
    dotnet run info warning error
    # => [*] Waiting for logs. To exit press CTRL+C


    而且,例如,要发出错误日志消息,只需键入:

    cd EmitLogDirect
    dotnet run error "Run. Run. Or it will explode."
    # => [x] Sent 'error':'Run. Run. Or it will explode.'


    以下是原文地址:http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

    今天这篇文章终于翻译完了,整个系列还有几篇没翻译。英文水平有限,错误在所难免,欢迎大家提出来,共同学习。

    天下国家,可均也;爵禄,可辞也;白刃,可蹈也;中庸不可能也
     
    分类: 消息队列
  • 相关阅读:
    896. Monotonic Array单调数组
    865. Smallest Subtree with all the Deepest Nodes 有最深节点的最小子树
    489. Robot Room Cleaner扫地机器人
    JavaFX
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
    《Python CookBook2》 第一章 文本
  • 原文地址:https://www.cnblogs.com/zengpeng/p/8426844.html
Copyright © 2011-2022 走看看