zoukankan      html  css  js  c++  java
  • RabbitMQ 官方NET教程(三)【发布/订阅】

    上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者。在这部分中,我们会做一些完全不同的事情 - 我们会向多个消费者传递信息。这种模式被称为“发布/订阅”。

    为了说明这个模式,我们要建立一个简单的日志记录系统。它将包括两个程序 - 第一个将发出日志消息,第二个将接收并打印它们。

    在我们的日志系统中,每一个运行的接收者程序都会收到日志。这样我们就可以实现一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。

    本质上来说,就是已发布的日志消息将被广播到所有接收者。

    转发器(Exchanges)

    前面的博客中我们主要的介绍都是发送者发送消息给队列,接收者从队列接收消息。下面我们会引入Exchanges,展示RabbitMQ的完整的消息模型。

    让我们快速了解我们在以前的教程中介绍的内容:

    生产者是发送消息的用户应用程序。
    队列是存储消息的缓冲区。
    消费者是接收消息的用户应用程序。
    

    RabbitMQ中的消息传递模型的核心思想是,生产者从不将任何消息直接发送到队列。实际上,生产者通常甚至不知道是否将消息传递到任何队列。

    相反,生产者只能将信息发送到exchange。交换是一件非常简单的事情。一方面,它收到来自生产者的消息,另一方将它们推送到队列。交换机必须准确知道接收到的消息如何处理。应该追加到特定队列吗?应该追加到很多队列吗?或者应该丢弃。其规则由exchange 类型定义。
    这里写图片描述
    有几种交换类型可用: direct, topic, headers 和fanout。 我们将重点关注最后一个 - fanout。 我们创建一个这种类型的exchange,并称它为logs

    channel.ExchangeDeclare("logs", "fanout");
    

    fanout交换器非常简单。 正如您可以从名称猜出,它只是将所有收到的消息广播到所有知道的队列。 这正是我们需要的记录器。

    列出交换机

    要列出服务器上的交换机,您可以运行有用的rabbitmqctl:

    sudo rabbitmqctl list_exchanges
    

    在这个列表中会有一些amq.*交换和默认(未命名)交换。 这些是默认创建的,但是不太可能需要使用它们。

    默认交换

    在本教程的前面部分,我们对交换没有任何了解,但仍然能够将消息发送到队列。 这是可能的,因为我们使用默认交换,我们通过空字符串("")标识。

    回想一下我们之前发布的消息:

    var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "",
                             routingKey: "hello",
                             basicProperties: null,
                             body: body);
    

    第一个参数是交换的名称。 空字符串表示默认或无名交换:消息通过路由Key指定的名称路由到队列(如果存在)。

    现在,我们可以发布到我们命名的交换机:

    var message = GetMessage(args);
    var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "logs",
                         routingKey: "",
                         basicProperties: null,
                         body: body);
    

    临时队列(Temporary queues)

    前面我们使用的是具有指定名称的队列(记得hellotask_queue?)。 能够命名队列对我们而言至关重要 - 我们需要将工作者指向同一个队列。 当您想要在生产者和消费者之间共享队列时,给队列一个名字很重要。

    但是我们的日志系统我们并不关心队列的名称。 我们希望接收到所有的日志消息,而不仅仅是它们的一部分。 而且我们也只对当前正在传递的数据的感兴趣。 要解决我们的需,要做两件事情。

    首先,每当我们连接到Rabbit,我们需要一个新的空的队列。 为此,我们可以创建一个具有随机名称的队列,或者甚至更好 - 让服务器为我们选择一个随机队列名称。

    其次,一旦消费者与Rabbit断开,队列应该被自动删除。

    在.NET客户端中,当我们没有为queueDeclare()提供参数时,我们创建了一个不可持续的,独占的,自动删除的队列,其中包含一个生成的名称:

    var queueName = channel.QueueDeclare().QueueName;
    

    此时,queueName包含一个随机队列名称。 例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

    绑定(Bindings)

    这里写图片描述

    我们已经创建了一个扇出交换和一个队列。 现在我们需要告诉交换机发送消息到我们的队列。 交换和队列之间的关系称为绑定(binding)。

    channel.QueueBind(queue: queueName,
                      exchange: "logs",
                      routingKey: "");
    

    从现在开始,logs 交换器将追加消息到我们的队列。

    列出绑定

    你可以列出现有的绑定:

    rabbitmqctl list_bindings
    

    完整的例子

    这里写图片描述
    发出日志消息的生产者程序与上一个教程并没有太大的区别。 最重要的变化是我们现在想将消息发布到我们的logs交换器,而不是无名的。 发送时需要提供一个routingKey,但是对于fanout 交换来说,它的值被忽略。 这里是EmitLog.cs文件的代码:

    using System;
    using RabbitMQ.Client;
    using System.Text;
    
    class EmitLog
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    
                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "logs",
                                     routingKey: "",
                                     basicProperties: null,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    
        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0)
                   ? string.Join(" ", args)
                   : "info: Hello World!");
        }
    }
    

    如你所见,建立连接后,我们声明交换。 此步骤是必须的,因为禁止发布到不存在的交换机。

    如果没有任何队列绑定到交换机,消息将丢失,但是对我们来说没关系; 如果没有消费者正在收听,我们可以放心地放弃信息。

    ReceiveLogs.cs的代码:

    using System;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    
    class ReceiveLogs
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");
    
                var queueName = channel.QueueDeclare().QueueName;
                channel.QueueBind(queue: queueName,
                                  exchange: "logs",
                                  routingKey: "");
    
                Console.WriteLine(" [*] Waiting for logs.");
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     noAck: true,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
    

    按照教程一的安装说明生成EmitLogsReceiveLogs项目。

    如果要将日志保存到文件,只需打开控制台并键入:

    cd ReceiveLogs
    dotnet run > logs_from_rabbit.log
    

    如果您希望在屏幕上看到日志,则产生一个新的终端并运行:

    cd ReceiveLogs
    dotnet run
    

    当然,要发射日志类型:

    cd EmitLog
    dotnet run
    

    使用rabbitmqctl list_bindings,您可以验证代码是否按照我们想要的方式创建绑定和队列。 运行两个ReceiveLogs.cs程序时,您应该看到如下所示:

    sudo rabbitmqctl list_bindings
    # => Listing bindings ...
    # => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
    # => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
    # => ...done.
    

    对结果的解释很简单:来自交换机logs 的数据转到具有服务器分配名称的两个队列。 这正是我们的意图。

  • 相关阅读:
    TableExport导出失败问题
    gitlab备份、恢复、升级
    读书笔记一【加密——替换法】
    读书笔记一【加密——换位法】
    解决Kettle ETL数据乱码
    SQL中exsit和in
    Centos下搭建邮件服务器
    2018总结及2019计划
    mac安装gcc
    Vue.js学习 Item11 – 组件与组件间的通信
  • 原文地址:https://www.cnblogs.com/Wulex/p/6965059.html
Copyright © 2011-2022 走看看