zoukankan      html  css  js  c++  java
  • RabbitMQ学习之Routing(4)

    上一节,是广播日志message到很多的receivers.

    这节,我们讲订阅其中的一个子集。例如,我们想可以把危机的error message导到log file。而仍然可以打印所有的log messages到控制台。

    这里使用到Direct exchage

    Direct exchange

    在使用fanout exchange时,没有很多的灵活性,它只是广播。

    这节,我们将使用direct exchange . 在direct exchange背后的路由算法是简单的,即message会发送到一个binding key 正好匹配messagerouting keyqueue.

    如图

    我们可以看到,有两个queue绑定到exchange了。第一个queue是和binding keyorange的绑定的。并且第二个有两个bindings.一个是black,另一个是green.

    带有routing key 为orange的发送到exchangemessage将会发送到queue Q1

    routing keyblack greenmessages将会发送到Q2. 其他的messages会被丢弃。

    Multiple binding(多重绑定)

    如图,多重绑定,即一个binding key为black绑定到两个queue.

    Emitting logs

    我们会把日志严重级别(log severity)作为routing key. 那样,接收脚本将会选择它想要接收的严重级别进行接收。

    channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "direct_logs",
                         routingKey: severity,
                         basicProperties: null,
                         body: body);

    严重级别(Severity

    info, warning, error

    Subscribing

    接收程序跟之前大致一样,除了一个例外,我们将会为我们感兴趣的严重级别(serverity)创建一个新的binding. 

    var queueName = channel.QueueDeclare().QueueName;
    foreach(var severity in args)
    {
        channel.QueueBind(queue: queueName,
                          exchange: "direct_logs",
                          routingKey: severity);
    }

    代码

    EmitLogDirect.cs

    using System;using System.Linq;using RabbitMQ.Client;using System.Text;
    class EmitLogDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs", //声明direct类型exchange
                                        type: "direct");
    
                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1)
                              ? string.Join(" ", args.Skip( 1 ).ToArray())
                              : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "direct_logs", //发送routingkey 为severity的message
                                     routingKey: severity,
                                     basicProperties: null,
                                     body: body);
                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }
      }
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
    }

    运行示例:

    cd EmitLogDirect
    dotnet run error "Run. Run. Or it will explode."# => [x] Sent 'error':'Run. Run. Or it will explode.'

    ReceiveLogsDirect.cs

    using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
    class ReceiveLogsDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs",  //声明direct类型exchange
                                        type: "direct");
                var queueName = channel.QueueDeclare().QueueName; //声明带随机queue name的queue
    
                if(args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                            Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }
    
                foreach(var severity in args)
                {
                    channel.QueueBind(queue: queueName,  //绑定queue和exchange和特定值的routingkey
                                      exchange: "direct_logs",
                                      routingKey: severity);
                }
    
                Console.WriteLine(" [*] Waiting for messages.");
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey; //接收的message的routing key
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                                      routingKey, message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }

    使用示例:

    cd ReceiveLogsDirect
    dotnet run warning error > logs_from_rabbit.log

    示例2

    cd ReceiveLogsDirect
    dotnet run info warning error# => [*] Waiting for logs. To exit press CTRL+C

     参考网址:

    https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

  • 相关阅读:
    创建一个新的进程os.fork
    进程的特征
    进程的状态
    多进程概念
    IO多路复用
    Objective-C 和 C++中指针的格式和.方法 和内存分配
    生活需要奋斗的目标
    iOS 关于UITableView的dequeueReusableCellWithIdentifier
    哈哈,发现了刚毕业时发布的求职帖子
    iOS 和Android中的基本日期处理
  • 原文地址:https://www.cnblogs.com/Vincent-yuan/p/10941033.html
Copyright © 2011-2022 走看看