zoukankan      html  css  js  c++  java
  • RabbitMQ系列教程之五:主题(Topic)

    (本实例都是使用的Net的客户端,使用C#编写),说明,中文方括号【】表示名词。

       在上一个教程中,我们改进了我们的日志记录系统。 没有使用只能够进行虚拟广播的【Fanout】交换机,而是使用了【Direct】类型的交换机,这样做就可以让我们有可能选择性地接收日志。

      虽然使用【Direct】类型的【消息交换机】改进了我们的系统,但它仍然有限制 - 它不能基于多个标准进行路由选择。


      在我们的日志记录系统中,我们可能不仅要根据严重性订阅日志,还可以基于发出日志的源进行订阅。 您可能会从syslog unix工具中了解这一概念,该工具根据严重性(info/warn/crit...)和设施(auth / cron / kern ...)路由日志。

       这将给我们很大的灵活性 - 我们可能既想监听来自“cron”的重要错误,也可以监听“kern”的所有日志。

       要在我们的日志记录系统中实现,我们需要了解一个更为复杂的topic类型的【消息交换机】。

    1、Topic类型的【消息交换机】

       发送到【Topic】类型【消息交换机】的消息不能有任意的routing_key - 它必须是由点分隔的单词列表。 这些词可以是任何东西,但通常它们指定与消息相关联的一些功能。 几个有效的路由关键字示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。 路由关键字中可以有任意多的单词,最多可达255个字节。

       绑定键也必须是相同的形式。【Topic】类型的【消息交换机】背后的逻辑类似于【Direct】类型的【消息交换机】 - 使用特定【路由键】发送的消息将被传递到与匹配的【绑定键】绑定的所有队列。 但是,【绑定键】有两个重要的特殊情况:

         *(星)可以替代一个字。
         #(井号)可以替换零个或多个单词。

        在一个例子中最简单的解释一下:
       

        在这个例子中,我们将发送所有描述动物的消息。消息将使用由三个字(两个点)组成的【路由键】发送。【路由键】中的第一个字将描述速度,第二个颜色和第三个种类:“<speed>.<color>.<species>”。

        我们创建了三个绑定:Q1绑定键“*.orange.*”和Q2是“*.*.rabbit”和“lazy.#”绑定。

        这些【绑定键】所要表达意思可以总结为:

        Q1对所有的橙色动物感兴趣。

        Q2想听听有关兔子的一切,以及关于lazy动物的一切。

        将【路由键】设置为“quick.orange.rabbit”的消息将传递给两个队列。消息“lazy.orange.elephant”也会发送他们那两个队列。另一方面,“quick.orange.fox”只会发送到第一个队列,而“lazy.brown.fox”只能发送到第二个队列。 “lazy.pink.rabbit”将被传递到第二个队列只有一次,即使它匹配两个绑定。 “quick.brown.fox”不匹配任何绑定,所以它将被丢弃。

       如果我们违反约定并发送一个或四个字的消息,如“orange”或“quick.orange.male.rabbit”,会发生什么?那么这些消息将不会匹配任何绑定,并将丢失。

       另一方面,“lazy.orange.male.rabbit”即使它有四个字,将匹配上一个绑定,并将被传递到第二个队列。

       说明:【Topic】类型的【消息交换机】

         此类型的【消息交换机】是强大的,可以像其他【消息交换机】一样行事。

         当队列用“#”(哈希)【绑定键】绑定时,它将接收所有消息,而不管【路由键】,就像使用【Fanout】类型的【消息交换机】。

         当特殊字符“*”(星号)和“#”(哈希)不用于绑定时,【Topic】类型的【消息交换机】将表现得像一个使用【Direct】类型的【消息交换机】。


    2、代码整合

      我们将在我们的日志记录系统中使用【Topic】类型【消息交换机】。 我们将从一个工作假设开始,日志的【路由键】将有两个单词组成:“<facility>.<severity>”。

    代码与上一个教程几乎相同。

    EmitLogTopic.cs的代码:

     1 using System;
     2 using System.Linq;
     3 using RabbitMQ.Client;
     4 using System.Text;
     5 
     6 class EmitLogTopic
     7 {
     8     public static void Main(string[] args)
     9     {
    10         var factory = new ConnectionFactory() { HostName = "localhost" };
    11         using(var connection = factory.CreateConnection())
    12         using(var channel = connection.CreateModel())
    13         {
    14             channel.ExchangeDeclare(exchange: "topic_logs",
    15                                     type: "topic");
    16 
    17             var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
    18             var message = (args.Length > 1)
    19                           ? string.Join(" ", args.Skip( 1 ).ToArray())
    20                           : "Hello World!";
    21             var body = Encoding.UTF8.GetBytes(message);
    22             channel.BasicPublish(exchange: "topic_logs",
    23                                  routingKey: routingKey,
    24                                  basicProperties: null,
    25                                  body: body);
    26             Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
    27         }
    28     }
    29 }


    ReceiveLogsTopic.cs的代码:

     1 using System;
     2 using RabbitMQ.Client;
     3 using RabbitMQ.Client.Events;
     4 using System.Text;
     5 
     6 class ReceiveLogsTopic
     7 {
     8     public static void Main(string[] args)
     9     {
    10         var factory = new ConnectionFactory() { HostName = "localhost" };
    11         using(var connection = factory.CreateConnection())
    12         using(var channel = connection.CreateModel())
    13         {
    14             channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
    15             var queueName = channel.QueueDeclare().QueueName;
    16 
    17             if(args.Length < 1)
    18             {
    19                 Console.Error.WriteLine("Usage: {0} [binding_key...]",
    20                                         Environment.GetCommandLineArgs()[0]);
    21                 Console.WriteLine(" Press [enter] to exit.");
    22                 Console.ReadLine();
    23                 Environment.ExitCode = 1;
    24                 return;
    25             }
    26 
    27             foreach(var bindingKey in args)
    28             {
    29                 channel.QueueBind(queue: queueName,
    30                                   exchange: "topic_logs",
    31                                   routingKey: bindingKey);
    32             }
    33 
    34             Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
    35 
    36             var consumer = new EventingBasicConsumer(channel);
    37             consumer.Received += (model, ea) =>
    38             {
    39                 var body = ea.Body;
    40                 var message = Encoding.UTF8.GetString(body);
    41                 var routingKey = ea.RoutingKey;
    42                 Console.WriteLine(" [x] Received '{0}':'{1}'",
    43                                   routingKey,
    44                                   message);
    45             };
    46             channel.BasicConsume(queue: queueName,
    47                                  noAck: true,
    48                                  consumer: consumer);
    49 
    50             Console.WriteLine(" Press [enter] to exit.");
    51             Console.ReadLine();
    52         }
    53     }
    54 }


    3、运行以下示例:

    收到所有的日志:

    cd ReceiveLogsTopic
    dotnet run“#”


    从设备“kern”接收所有日志:

    cd ReceiveLogsTopic
    dotnet run “kern.*”

    或者如果您只想听到关于“critical”日志的信息:

    ReceiveLogsTopic.exe“* .critical”


    您可以创建多个绑定:

    cd ReceiveLogsTopic
    dotnet run“kern.*”“*.critical”


    并使用【路由键】“kern.critical”类型发出日志:

    cd emitLogTopic
    dotnet run“kern.critical”“A critial kernel error”

    写这些程序很有趣。 请注意,代码不会对【路由键】或【绑定键】做任何假设,您可能希望使用两个以上的【路由键】参数进行操作。

    今天就到此为止了,如果英文比较好的,可以查看原文,原文地址是:http://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html

  • 相关阅读:
    说一说Java的Unsafe类
    阿里云CentOS下安装jdk
    LeetCode 5
    五种方法实现Java的Singleton单例模式
    聊聊Java的final关键字
    LeetCode 4
    Java9都快发布了,Java8的十大新特性你了解多少呢?
    【Spring】mvc:annotation-driven 使用
    【gradle】【maven】gradle 转 maven pom.xml
    [GIT]比较不同分支的差异
  • 原文地址:https://www.cnblogs.com/PatrickLiu/p/7126147.html
Copyright © 2011-2022 走看看