zoukankan      html  css  js  c++  java
  • 【c#】RabbitMQ学习文档(四)Routing(路由)

    (使用Net客户端)

    在上一个教程中,我们构建了一个简单的日志系统,我们能够向许多消息接受者广播发送日志消息。

    在本教程中,我们将为其添加一项功能 ,这个功能是我们将只订阅消息的一个子集成为可能。 例如,我们可以只将关键的错误消息输出到日志文件(以节省磁盘空间),同时仍然可以在控制台上打印所有日志消息。

    1、绑定

    在以前的例子中,我们已经创建了绑定。 你可能会记得如下代码:

    channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");


    【绑定】是【消息交换机】和【队列】之间的关系纽带,通过绑定把二者关联起来。 这可以简单地理解为:队列可以接收来自此【消息交换机】的消息。

    【绑定】可以占用额外的路由选择参数。 为了避免与BasicPublish参数混淆,我们将其称为【绑定键】。 以下代码就是如何用一个键值来创建一个绑定:

    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: "black");


    【绑定键】的含义取决于交换类型。 以前我们使用的【Fanout】类型的【消息交换机】忽略了它的取值。

    2、直接交换

       在上一个教程中,我们的日志记录系统向所有【消费者】发送所有消息。 我们希望将其扩展为允许基于其严重性过滤消息。 例如,我们可能希望将写入磁盘日志消息的脚本仅接受严重错误,而不会在警告或信息日志消息上浪费磁盘空间。

       我们正在使用一个【Fanout】类型的【消息交换机】,它不会给我们带来很大的灵活性 - 它只能无意识地发送。

       我们将使用一个【Direct】类型的【消息交换机】。 直接转换路由的背后的算法其实是很简单的 - 把消息传递到【绑定键 binding key】和消息的【路由键 routing key】完全匹配的队列中。

       为了说明,请考虑以下设置:

          

       在这个设置中,我们可以看到【Direct】类型的【消息交换机】X与两个队列相绑定。 第一个队列与【绑定键】的值是Orange相绑定的,第二个队列有两个绑定,一个【绑定键】的值是black,另一个【绑定键】的值是green。

       在这样的设置中,发布到具有【路由键】为orange的【消息交换机】的消息将被路由到队列Q1。 具有black或green【路由键】的消息将转到Q2。 所有其他消息将被丢弃。


    3、多重绑定

          

      使用相同的【绑定键】绑定多个队列是完全合法的。 在我们的示例中,我们可以在X和Q1之间添加【绑定键】是black的绑定。 在这种情况下,【direct】类型的【消息交换机】将表现得像【Fanout】类型的【消息交换机】,并将消息发送到所有匹配的队列。 具有【路由键】是black的消息将传送到Q1和Q2。


    4、发出日志

       我们将发送消息到【Direct】类型的【消息交换机】来替换【fanout】类型的【消息交换机】,在我们现在的日志系统将使用此模型。 我们将提供日志严重性作为【路由键】。 这样接收脚本就能够选择想要接收的严重性。 我们首先关注发出日志。

       像以前一样,我们首先要建立一个【消息交换机】:

    channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");

        现在,我们准备发送消息:

    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "direct_logs",
                         routingKey: severity,
                         basicProperties: null,
                         body: body);


        为了简化事情,我们假设“严重性”可以是“信息”,“警告”,“错误”之一。

    5、订阅

       接收消息将像上一个教程一样工作,除了一个例外 - 我们将为每个我们感兴趣的严重性创建一个新的绑定。

    复制代码
    var queueName = channel.QueueDeclare().QueueName;
    
    foreach(var severity in args)
    {
        channel.QueueBind(queue: queueName,
                          exchange: "direct_logs",
                          routingKey: severity);
    }
    复制代码


    6、整合

        

      以下是EmitLogDirect.cs类的代码:

    复制代码
     1 using System;
     2 using System.Linq;
     3 using RabbitMQ.Client;
     4 using System.Text;
     5 
     6 class EmitLogDirect
     7 {
     8     public static void Main(string[] args)
     9     {
    10         var factory = new ConnectionFactory() { HostName = "localhost" };
    11         using(var connection = factory.CreateConnection())
    12         using(var channel = connection.CreateModel())
    13         {
    14             channel.ExchangeDeclare(exchange: "direct_logs",
    15                                     type: "direct");
    16 
    17             var severity = (args.Length > 0) ? args[0] : "info";
    18             var message = (args.Length > 1)
    19                           ? string.Join(" ", args.Skip( 1 ).ToArray())
    20                           : "Hello World!";
    21             var body = Encoding.UTF8.GetBytes(message);
    22             channel.BasicPublish(exchange: "direct_logs",
    23                                  routingKey: severity,
    24                                  basicProperties: null,
    25                                  body: body);
    26             Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
    27         }
    28 
    29         Console.WriteLine(" Press [enter] to exit.");
    30         Console.ReadLine();
    31     }
    32 }
    复制代码


    以下是ReceiveLogsDirect.cs类的代码:

    复制代码
     1 using System;
     2 using RabbitMQ.Client;
     3 using RabbitMQ.Client.Events;
     4 using System.Text;
     5 
     6 class ReceiveLogsDirect
     7 {
     8     public static void Main(string[] args)
     9     {
    10         var factory = new ConnectionFactory() { HostName = "localhost" };
    11         using(var connection = factory.CreateConnection())
    12         using(var channel = connection.CreateModel())
    13         {
    14             channel.ExchangeDeclare(exchange: "direct_logs",
    15                                     type: "direct");
    16             var queueName = channel.QueueDeclare().QueueName;
    17 
    18             if(args.Length < 1)
    19             {
    20                 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
    21                                         Environment.GetCommandLineArgs()[0]);
    22                 Console.WriteLine(" Press [enter] to exit.");
    23                 Console.ReadLine();
    24                 Environment.ExitCode = 1;
    25                 return;
    26             }
    27 
    28             foreach(var severity in args)
    29             {
    30                 channel.QueueBind(queue: queueName,
    31                                   exchange: "direct_logs",
    32                                   routingKey: severity);
    33             }
    34 
    35             Console.WriteLine(" [*] Waiting for messages.");
    36 
    37             var consumer = new EventingBasicConsumer(channel);
    38             consumer.Received += (model, ea) =>
    39             {
    40                 var body = ea.Body;
    41                 var message = Encoding.UTF8.GetString(body);
    42                 var routingKey = ea.RoutingKey;
    43                 Console.WriteLine(" [x] Received '{0}':'{1}'",
    44                                   routingKey, message);
    45             };
    46             channel.BasicConsume(queue: queueName,
    47                                  noAck: true,
    48                                  consumer: consumer);
    49 
    50             Console.WriteLine(" Press [enter] to exit.");
    51             Console.ReadLine();
    52         }
    53     }
    54 }
    复制代码


    如果您只想将“警告”和“错误”(而不是“信息”)保存到文件中,只需打开控制台并键入:

    cd ReceiveLogsDirect
    dotnet run warning error > logs_from_rabbit.log


    如果您想查看屏幕上的所有日志消息,请打开一个新终端,然后执行以下操作:

    cd ReceiveLogsDirect
    dotnet run info warning error
    # => [*] Waiting for logs. To exit press CTRL+C


    而且,例如,要发出错误日志消息,只需键入:

    cd EmitLogDirect
    dotnet run error "Run. Run. Or it will explode."
    # => [x] Sent 'error':'Run. Run. Or it will explode.'


    以下是原文地址:http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

    今天这篇文章终于翻译完了,整个系列还有几篇没翻译。英文水平有限,错误在所难免,欢迎大家提出来,共同学习。

  • 相关阅读:
    设计模式系列
    设计模式系列
    【ABAP系列】SAP ABAP DYNP_VALUES_UPDATE 更新屏幕字段的函数及用法
    【HANA系列】SAP HANA 2.0简介
    【HANA系列】SAP HANA Studio使用insufficient privilege 问题
    【HANA系列】SAP HANA Studio出现"Fetching Children..."问题
    【ABAP系列】SAP ABAP smartforms设备类型CNSAPWIN不支持页格式ZXXX
    【BW系列】SAP BW实时抽取ECC数据的实现
    【ABAP系列】SAP ABAP 行列转换的方法
    【HANA系列】SAP HANA数据处理的理解与分析一
  • 原文地址:https://www.cnblogs.com/wyt007/p/9066940.html
Copyright © 2011-2022 走看看