zoukankan      html  css  js  c++  java
  • RabbitMQ 官方NET教程(四)【路由选择】

    在上一个教程中,我们构建了一个简单的日志记录系统。 我们能够广播日志消息给所有你的接收者。

    在本教程中,我们将为其添加一个功能 - 我们将让日志接收者可以仅订阅一部分消息。 例如,我们将能够仅将关键的错误消息写入到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

    绑定(Bindings)

    在以前的例子中,我们已经使用过绑定。类似下面的代码:

    channel.QueueBind(queue: queueName,
                      exchange: "logs",
                      routingKey: "");
    

    绑定表示转发器与队列之间的关系。我们也可以简单的认为:队列对该转发器上的消息感兴趣。

    绑定可以附带一个额外的参数routingKey。 为了避免与BasicPublish参数混淆,我们将其称为binding key。 这就是我们如何用一个键创建一个绑定:

    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: "black");
    

    绑定键的意义依赖于转发器的类型。对于fanout类型,忽略此参数。

    直接转发(Direct exchange)

    我们从上一个教程的日志记录系统向所有消费者广播所有消息。 我们希望将其扩展为允许基于其严重性进行过滤日志消息。 例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。

    我们正在使用一个fanout的交换机,它不给我们很大的灵活性 - 它只能无意识地转发。

    我们会使用direct 转发器。 direct类型的转发器背后的路由算法很简单 - 消息传递到binding key与消息的routing key完全匹配的队列。

    为了说明,请考虑以下设置:
    这里写图片描述
    在这个设置中,我们可以看到direct 类型的转发器X与两个队列绑定。 第一个队列与绑定键orange绑定,第二个队列与转发器间有两个绑定,一个与绑定键black绑定,另一个与green绑定键绑定。

    在这样的设置中,发布附带一个选择键(routing key) orange的消息至交换机,将被导向到队列Q1。 消息附带一个选择键 (routing key)black或者green将会被导向到Q2。 所有其他消息将被丢弃。

    多重绑定(multiple bindings)

    这里写图片描述
    使用相同的绑定键绑定多个队列是完全合法的。 在我们的示例中,我们可以在XQ1之间添加绑定键black。 在这种情况下,direct交换将表现得像fanout,并将消息广播到所有匹配的队列。 附带选择键black的消息将传送到Q1和Q2。

    发送日志(Emittinglogs)

    我们将此模型用于日志记录系统。我们将消息发送到direct类型的转发器而不是fanout类型。这样的话, 接收程序可以根据严重性来选择接收。 我们首先关注发送日志的代码:

    一如以往,我们需要先创建一个转发器:

    channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
    

    然后我们准备发送一条消息:

    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "direct_logs",
                         routingKey: severity,
                         basicProperties: null,
                         body: body);
    

    为了简化代码,我们假定severityinfowarningerror中的一个。

    订阅

    接收消息将像上一个教程类似,只有一点不同 - 我们将为每个我们感兴趣的严重性类型的日志创建一个新的绑定。

    var queueName = channel.QueueDeclare().QueueName;
    
    foreach(var severity in args)
    {
        channel.QueueBind(queue: queueName,
                          exchange: "direct_logs",
                          routingKey: severity);
    }
    

    完整的实例

    这里写图片描述

    EmitLogDirect.cs 类的代码:

    using System;
    using System.Linq;
    using RabbitMQ.Client;
    using System.Text;
    
    class EmitLogDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs",
                                        type: "direct");
    
                var severity = (args.Length > 0) ? args[0] : "info";
                var message = (args.Length > 1)
                              ? string.Join(" ", args.Skip( 1 ).ToArray())
                              : "Hello World!";
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "direct_logs",
                                     routingKey: severity,
                                     basicProperties: null,
                                     body: body);
                Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
            }
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
    

    ReceiveLogsDirect.cs的代码:

    using System;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    
    class ReceiveLogsDirect
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "direct_logs",
                                        type: "direct");
                var queueName = channel.QueueDeclare().QueueName;
    
                if(args.Length < 1)
                {
                    Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                            Environment.GetCommandLineArgs()[0]);
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                    Environment.ExitCode = 1;
                    return;
                }
    
                foreach(var severity in args)
                {
                    channel.QueueBind(queue: queueName,
                                      exchange: "direct_logs",
                                      routingKey: severity);
                }
    
                Console.WriteLine(" [*] Waiting for messages.");
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'",
                                      routingKey, message);
                };
                channel.BasicConsume(queue: queueName,
                                     noAck: true,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
    

    如果您只想将warningerror(而不是info)保存到文件中,只需打开控制台并键入:

    cd ReceiveLogsDirect
    dotnet run warning error > logs_from_rabbit.log
    

    如果您想查看屏幕上的所有日志消息,请打开一个新终端,然后执行以下操作:

    cd ReceiveLogsDirect
    dotnet run info warning error
    # => [*] Waiting for logs. To exit press CTRL+C
    

    而且,例如,要发出error日志消息,只需键入:

    cd EmitLogDirect
    dotnet run error "Run. Run. Or it will explode."
    # => [x] Sent 'error':'Run. Run. Or it will explode.'
    
  • 相关阅读:
    Door Frames CodeForces
    POJ 3090 Visible Lattice Points (ZOJ 2777)
    从斐波那契到矩阵快速幂
    Recursive sequence (矩阵快速幂)2016ACM/ICPC亚洲区沈阳站
    c++ 类实现 AVL树容器(包含迭代器)
    c++ 链表类的实现(包含迭代器)
    HDU
    【几何+模拟】二次元变换 计蒜客
    【bfs+链式向前星】防御僵尸(defend)计蒜客
    deque in Python
  • 原文地址:https://www.cnblogs.com/Wulex/p/6965063.html
Copyright © 2011-2022 走看看