zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(五):Routing 消息路由[转]

    上篇文章中,我们构建了一个简单的日志系统。接下来,我们将丰富它:能够使用不同的severity(严重程度)来监听不同等级的log。比如我们希望只有error的log才保存到磁盘上。

    1. Bindings绑定

        上篇文章中我们是这么做的绑定:

    channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//const string ROUTING_KEY = "";

        绑定其实就是关联了exchange和queue。或者这么说:queue对exchange的内容感兴趣,exchange要把它的Message deliver(提供)到queue中。

        实际上,绑定可以带routing key这个参数,空字符串也是一个routing key的名字。其实这个参数的名称和basic_publish的参数名是相同了。第二篇有介绍当exchange的名称为空字符串的时候,创建queue的时候用到queue的名字和Producer的BasicPublish方法或Consuner的BasicConsume方法的routing key的名字可以是相同的。即queue的名字和routing key的名字是相同的。

       为了避免混淆,我们把这个routing key称为binding key(即在Exchange中的routing key)

        使用一个binding key来创建binding :

    channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);//string routingKey = "指定RoutingKey的名称";

    上一篇文章我们讲的是使用fanout类型的exchange,对于fanout的exchange来说,这个参数是被忽略的。

    2. Direct exchange

      Direct exchange的路由算法非常简单:通过binding key的完全匹配,可以通过下图来说明。


        exchange X和两个queue绑定在一起。Q1的binding key是orange。Q2的binding key是black和green。
        当P发布的key是orange时,exchange会把它放到Q1。如果P发布的key是black或者green那么就会到Q2。其余的Message都会被丢弃。本篇最后面(queue的名字是rabbitmq自己起的)和下一篇第一个例子(queue的名字是程序指定的)就是实现这上面这副图的代码——将多个routing key(或者称为binding key)绑定到同一个名称的queue。

    3. Multiple bindings

          多个queue绑定同一个key是可以的。对于下图的例子,Q1和Q2都绑定了black。也就是说,对于routing key是black的Message,会被deliver(提供)到Q1和Q2。其余的Message都会被丢弃。下一篇第二个例子就是讲这副图的例子,将同一个routing key(或者binding key)绑定到多个不同名称的queue上。
     
     

    4. Emitting logs

    首先是我们要创建一个direct的exchange:

    const string EXCHANGE_NAME = "direct_logs";
    channel.ExchangeDeclare(EXCHANGE_NAME, "direct");

    我们将使用log的severity(严重级别)作为routing key,这样Consumer可以针对不同severity(严重级别)的log进行不同的处理。

    channel.BasicPublish(EXCHANGE_NAME, routingKey, null, body);

    我们使用三种severity(严重级别):'info', 'warning', 'error'.

    5. Subscribing

    对于queue,我们需要绑定severity(严重级别):

    const string EXCHANGE_NAME = "direct_logs";
    channel.ExchangeDeclare(EXCHANGE_NAME, "direct");
    string queueName = channel.QueueDeclare();
    
    channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);

    6. 最终版本

    本例子是没有指定Queue的名称:
    Producer.cs
     1  /// <summary>
     2         /// 多个routing key指定同一个queue
     3         /// 接收端创建临时queue
     4         /// </summary>
     5         /// <param name="args">
     6         /// SendDemo5.exe direct_custom_routing_key_hello1
     7         /// SendDemo5.exe direct_custom_routing_key_hello2
     8         /// </param>
     9         static void Main(string[] args)
    10         {
    11             if (args.Length < 1)
    12             {
    13                 Console.Error.WriteLine("请指定一个新的Routing Key名称", Environment.GetCommandLineArgs()[0]);
    14                 Environment.ExitCode = 1;
    15                 return;
    16             }
    17             var factory = new ConnectionFactory() { HostName = "localhost" };
    18             using (var connection = factory.CreateConnection())
    19             {
    20                 using (var channel = connection.CreateModel())
    21                 {
    22                     const string EXCHANGE_NAME = "direct_logs";
    23                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//Direct :如果 routing key 匹配, 那么Message就会被传递到相应的queue中。
    24                     //Exchange在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。
    25                     var routingKey = args[0];
    26                     var message = "Hello World!";
    27                     var body = Encoding.UTF8.GetBytes(message);
    28                     
    29                     channel.BasicPublish(EXCHANGE_NAME, routingKey, null, body);
    30                     Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
    31                 }
    32             }
    33         }
    Producer.cs

    Consumer.cs

     1 /// <summary>
     2         /// 多个routing key指定同一个queue
     3         /// 接收端创建临时queue
     4         /// </summary>
     5         /// <param name="args">
     6         /// ReceiveDemo5.exe direct_custom_routing_key_hello1
     7         /// ReceiveDemo5.exe direct_custom_routing_key_hello2
     8         /// </param>
     9         static void Main(string[] args)
    10         {
    11             if (args.Length < 1)
    12             {
    13                 Console.Error.WriteLine("请指定一个新的Routing Key名称", Environment.GetCommandLineArgs()[0]);
    14                 Environment.ExitCode = 1;
    15                 return;
    16             }
    17             var factory = new ConnectionFactory() { HostName = "localhost" };
    18             using (var connection = factory.CreateConnection())
    19             {
    20                 using (var channel = connection.CreateModel())
    21                 {
    22                     const string EXCHANGE_NAME = "direct_logs";
    23                     channel.ExchangeDeclare(EXCHANGE_NAME, "direct");//接收端如果关闭之后,自动创建的Queue会自动被删除
    24                     string queueName = channel.QueueDeclare();//获取临时创建的Queue的名称
    25                    
    26                     foreach (var routingKey in args)
    27                     {
    28                         channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);
    29                     }
    30 
    31                     Console.WriteLine(" [*] Waiting for messages. " + "To exit press CTRL+C");
    32 
    33                     var consumer = new QueueingBasicConsumer(channel);
    34                     channel.BasicConsume(queueName, true, consumer);
    35 
    36                     while (true)
    37                     {
    38                         var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    39 
    40                         var body = ea.Body;
    41                         var message = Encoding.UTF8.GetString(body);
    42                         var routingKey = ea.RoutingKey;
    43                         Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
    44                     }
    45                 }
    46             }
    47         }
    Consumer.cs

    必须先运行Consumer,然后在运行Producer.

    转:
    http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html(官网)
    http://blog.csdn.net/anzhsoft/article/details/19630147(翻译)
  • 相关阅读:
    Apollo简介及项目集成
    Apollo源码打包及部署
    idea中的maven模块变成灰色的可能原因
    IDEA 不能显示项目里的文件结构
    Idea不能新建package的解决
    sourceTree 基础使用
    服务器负载均衡是什么意思?
    Spring中的代理模式
    ZooKeeper启动报错 JAVA_HOME is incorrectly set
    @Controller和@RestController的区别?
  • 原文地址:https://www.cnblogs.com/qiyebao/p/4205766.html
Copyright © 2011-2022 走看看