zoukankan      html  css  js  c++  java
  • RabbitMQ学习之Publish/Subscribe(3)

     上一个教程中,我们创建了一个work queue. 其中的每个task都会被精确的传送到一个worker. 这节,我们将会讲把一个message传送到多个consumers. 这种模式叫做publish/subscribe(发布/订阅).

    为了说明这种模式,我们将创建一个简单的日志系统(logging system. 它由两个程序组成,一个是发送日志message并且另一个接收。

    最重要的,发布的日志message将会被广播到所有的receivers

    Exchangs

    前面我们讲的包含下面的:producer,queue,consumer

    它的主要思想是producer绝不直接发送任何messagequeue. 很多情况下,producer甚至不知道一个message是否会被发送到任何queue.

    如图,它会直接发送messages到一个exchange. 而对于exchange,一方面它接收来自producermessage,另一方面它把这些message推送到queues. 至于,messages是否会被发送一个特定的queue或者发送到很多queue或者丢弃,这些规则都由exchange type定义。

    Exchange type: direct , topic , headers , fanout.

    我们这节主要讲fanout,它会控制广播。

    channel.ExchangeDeclare("logs", "fanout");

    对于fanout exchange ,它会广播它收到的所有的messages 到它知道的所有的queue.

    Listing exchanges

    对于列出服务器上的exchanges , 你可以使用rabbitmqctl

    sudo rabbitmqctl list_exchanges
    The default exchange

    在前面的教程中,我们不知道exchanges,但是我们仍然可以发送messages queues. 因为我们使用到了一个默认的exchange(a default exchange).这个默认的exchange是被空字符串(“”)定义。

    回想下,我们之前怎样发送message

     var message = GetMessage(args);
        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: "",  //默认的exchange
                             routingKey: "hello",
                             basicProperties: null,
                             body: body);

    此时,messages会根据指定的routingKey被路由到queue.

    现在,我们可以发布到指定的exchange.

    var message = GetMessage(args);var body = Encoding.UTF8.GetBytes(message);
    channel.BasicPublish(exchange: "logs",
                         routingKey: "",
                         basicProperties: null,
                         body: body);

    Temporary queues

    之前我们使用过很多指定名称的queues(例如hellotask_queue). 可以命名一个queue是很重要的,我们可以指定workers到同一个queue。 而且使你可以在多个producersconsumers之前共享这个queue. 

    We’re also interested only in currently flowing messages not in the old ones. 我们想要最新的message而不是仅仅之前的。

    这需要解决两个事情。

    1. 首先,无论什么时候我们连接Rabbit,我们需要一个新的,空的queue。为了达到这个目的,我们可以创建一个带随机名称的queue。更好的办法,我们可以让服务器给我们选择一个随机的queue名称。
    2. 第二,一旦我们断开与consumer的连接,这个queue应该被自动删除。 

    .NET客户端中,我们使用下面的语句创建一个带随机名称的queue (when we supply no parameters to QueueDeclare() we create a non-durable, exclusive, autodelete queue with a generated name)

    var queueName = channel.QueueDeclare().QueueName;

    Bindings

    我们已经创建好了exchangequeue,它们之间的关系我们叫做binding. 用来告诉exchange发送messagesqueue. 

    channel.QueueBind(queue: queueName,  //绑定
                      exchange: "logs",
                      routingKey: "");

    现在,在logs exchange上会把messages发到我们的queue

    Listing bindings
    rabbitmqctl list_bindings

    代码

    这种fanout exchanges ,在发送时,会忽视routingKey的值。

    EmitLog.cs(发送)

    using System;using RabbitMQ.Client;using System.Text;
    class EmitLog
    {
        public static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout");  //声明exchange
    
                var message = GetMessage(args);
                var body = Encoding.UTF8.GetBytes(message);
                channel.BasicPublish(exchange: "logs",  //发送到logs exchange
                                     routingKey: "",
                                     basicProperties: null,
                                     body: body);
                Console.WriteLine(" [x] Sent {0}", message);
            }
    
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    
        private static string GetMessage(string[] args)
        {
            return ((args.Length > 0)
                   ? string.Join(" ", args)
                   : "info: Hello World!");
        }
    }

    不允许发送到一个不存在的exchange.

    如果没有queue绑定到exchangemessages将会丢失。如果没有consumer正在监听,我们可以安全的丢弃这些message.

    ReceiveLogs.cs

    using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;
    class ReceiveLogs
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchange: "logs", type: "fanout"); //声明exchange
    
                var queueName = channel.QueueDeclare().QueueName;  //获得随机queue name
                channel.QueueBind(queue: queueName,  //定义queue和exchange的关系
                                  exchange: "logs",
                                  routingKey: "");
    
                Console.WriteLine(" [*] Waiting for logs.");
    
                var consumer = new EventingBasicConsumer(channel);  //回调
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine(" [x] {0}", message);
                };
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }

    参考网址:

    https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html

  • 相关阅读:
    【题解】洛谷 P3942 将军令【20201017 CSP 模拟赛】【贪心】
    ASP.NET上传文件的三种基本方法
    Android 最火的快速开发框架XUtils
    asp.net 上传文件到一般处理程序中
    Android版本:使用findViewById()用字符串/在一个循环
    android 调用系统图库查看指定路径的图片
    Android中实现日期时间选择器(DatePicker和TimePicker)
    Android自定义ListView的Item无法响应OnItemClick的解决办法
    Android开发配置,消除SDK更新时的“https://dl-ssl.google.com refused”异常
    mysql 修改密码
  • 原文地址:https://www.cnblogs.com/Vincent-yuan/p/10940726.html
Copyright © 2011-2022 走看看