在本教程中,我们将添加一个功能 - 我们将只能订阅一部分消息。例如,我们只能将重要的错误消息导向日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息
绑定
在前面的例子中,我们已经创建了绑定。你可能会记得像这样的代码
channel.QueueBind(queue: queueName, exchange: "logs", routingKey: "");
绑定是交换和队列之间的关系。这可以简单地理解为:队列对来自这个交换的消息感兴趣。
绑定可以采用额外的routingKey参数。为了避免与BasicPublish参数混淆,我们将其称为 绑定键。这是我们如何创建一个关键的绑定:
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "black");
绑定键的含义取决于交换类型。我们之前使用的 粉丝交换机,简单地忽略了它的价值。
直接交换
我们之前教程的日志记录系统将所有消息广播给所有消费者。我们希望扩展到允许根据严重性过滤消息。例如,我们可能希望将日志消息写入磁盘的脚本仅接收严重错误,而不会在警告或信息日志消息上浪费磁盘空间。
我们正在使用一个扇出式的交换机,这并没有给我们太大的灵活性 - 它只能够无意识地播放。
我们将使用直接交换。直接交换背后的路由算法很简单 - 消息进入绑定密钥与消息的路由密钥完全匹配的队列 。
为了说明这一点,请考虑以下设置:
在这个设置中,我们可以看到两个队列绑定的直接交换X. 第一个队列用绑定键橙色绑定,第二个队列有两个绑定,一个绑定键为黑色,另一个为绿色。
在这种设置中,通过路由键橙色发布到交换机的消息 将被路由到队列Q1。带有黑色 或绿色的路由键的消息将进入Q2。所有其他消息将被丢弃。
多个绑定
绑定多个队列和绑定键是完全合法的。在我们的例子中,我们可以使用绑定键黑色添加X和Q1之间的绑定。在这种情况下,直接交换就像扇出一样,将消息广播到所有的匹配队列。路由密钥为黑色的消息将传送到 Q1和Q2。
发射日志
我们将把这个模型用于我们的日志系统。取而代之的扇出,我们将消息发送到直接交流。我们将提供日志严重性作为路由键。这样接收脚本将能够选择想要接收的严重性。我们先关注发射日志。
与往常一样,我们需要先创建一个交换:
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
我们准备发送一条消息:
var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body);
为了简化事情,我们将假定“严重性”可以是“信息”,“警告”,“错误”之一。
订阅
接收邮件的方式与上一个教程中的一样,除了一个例外 - 我们将为每个我们感兴趣的严重级别创建一个新的绑定。
var queueName = channel.QueueDeclare().QueueName; foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); }
把它放在一起
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var severity = "info"; var message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
ReceiveLogsDirect.cs的代码:
public static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: "direct"); var queueName = channel.QueueDeclare().QueueName; if(args.Length < 1) { Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; } foreach(var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
如果只想保存“警告”和“错误”(而不是“信息”),则将消息记录到文件中,只需打开控制台并输入:
cd ReceiveLogsDirect dotnet run warning error > logs_from_rabbit.log
如果您想要查看屏幕上的所有日志消息,请打开一个新终端并执行以下操作:
cd ReceiveLogsDirect dotnet run info warning error # => [*] Waiting for logs. To exit press CTRL+C
而且,例如,要输出错误日志消息,只需键入:
cd EmitLogDirect dotnet run error "Run. Run. Or it will explode." # => [x] Sent 'error':'Run. Run. Or it will explode.'