zoukankan      html  css  js  c++  java
  • .NET Core RabbitMQ探索(2)——RabbitMQ的Exchange

      实际上,RabbitMQ的生产者并不会直接把消息发送给队列,甚至生产者都不知道消息是否会被发送给一个队列。对于生产者而言,它们只能把消息发送到Exchange,一个Exchange所完成的工作相当简单,一方面,它从生产者那里接收消息;另一方面它将消息存入队列中。一个Exchange需要准确的知道它要如何处理它接收到的消息,例如,它需要把消息转发到特定的队列,还是进行广播处理,或者直接将它丢弃。可以通过exchange type来定义Exchange处理消息的规则。
      整个框架结构图如图所示。

      Exchange types有以下几种:direct、topic、headers和fanout。如果我们没有定义Exchange,那么系统就会默认使用一个默认的Exchange,名为:"",就像我们入门篇里的一样,它会自己创建一个""的默认Exchange,然后将消息转发给特定routingKey的队列。

    • Direct Exchange

      使用direct exchange时,会将exchange与特定的队列进行绑定,转发时由routingkey进行队列的匹配,如图所示。

      在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binding进行消息路由,代码示例如下所示:

    1. 首先我们需要将exchange和queue进行binding
    channel.QueueBind(queue: "create_pdf_queue",
                        exchange: "pdf_events",
                        routingKey: "pdf_create",
                        arguments: null);

      绑定时需要设置:队列名、exchange名和它们的routingkey。

    1. 在发送消息到exchange时会设置对应的routingkey
    channel.BasicPublish(exchange: "pdf_events",
                            routingKey: "pdf_create",
                            basicProperties: properties,
                            body: body);

      生产者发布消息时,需要设置exchange名和routingKey,如果exchange名和routingKey都与上述绑定的完全一致,那么该exchange就会将这条消息路由到队列。

    • Topic Exchange

      此类exchange与direct类似,唯一不同的地方是,direct类型要求routingKey完全一致,而这里可以可以使用通配符进行模糊匹配,符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“JiangYuZhou.#”能够匹配到“JiangYuZhou.pets.cat”,但是“JiangYuZhou.*” 只会匹配到“JiangYuZhou.money”。
      所以,Topic Exchange 使用非常灵活,topic exchange如图所示。

      例如,我们首先声明一个topic exchange,它的名称为"agreements":

    // Topic类型的exchange, 名称 agreements
        channel.ExchangeDeclare(exchange: "agreements",
                                type: ExchangeType.Topic,
                                durable: true,
                                autoDelete: false,
                                arguments: null);

      然后,我们声明三个队列,它们分别如下:

    // 创建berlin_agreements队列
        channel.QueueDeclare(queue: "berlin_agreements",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);
    
        //创建 all_agreements 队列
        channel.QueueDeclare(queue: "all_agreements",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);
    
        //创建 headstore_agreements 队列
        channel.QueueDeclare(queue: "headstore_agreements",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);

      最后,我们将agreements exchange分别与上面的三个队列以不同通配符的routingKey进行绑定:

    //绑定 agreements --> berlin_agreements 使用routingkey:agreements.eu.berlin.#
        channel.QueueBind(queue: "berlin_agreements",
                            exchange: "agreements",
                            routingKey: "agreements.eu.berlin.#",
                            arguments: null);
    
        //绑定 agreements --> all_agreements 使用routingkey:agreements.#
        channel.QueueBind(queue: "all_agreements",
                            exchange: "agreements",
                            routingKey: "agreements.#",
                            arguments: null);
    
        //绑定 agreements --> headstore_agreements 使用routingkey:agreements.eu.*.headstore
        channel.QueueBind(queue: "headstore_agreements",
                            exchange: "agreements",
                            routingKey: "agreements.eu.*.headstore",
                            arguments: null);

      这时我们如果发送下列消息:

     var message = "hello world";
     var body = Encoding.UTF8.GetBytes(message);
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true;
    
    channel.BasicPublish(exchange: "agreements",
        routingKey: "agreements.eu.berlin",
        basicProperties: properties,
        body: body);

      该消息设置的exchange为"agreements",routingKey为"agreements.eu.berlin",所以它可以匹配上面的"agreements.eu.berlin.#"和"agreements.#",消息被转发到了"berlin_agreements"和"all_agreements"队列。

    • Fanout Exchange

      该exchange无需对routingKey进行匹配操作,而是很简单的直接将消息路由到所有绑定的队列中,如图所示。

    • Header Exchange

      此类型的路由规是根据header来判断的,首先需要以键值对的形式设置header的参数,在绑定exchange的时候将header以arguments的形式传递进去,传递参数时,键为"x-match"的header可以设置它的值为all或any,其中,all表示只有当发布的消息匹配该header中除"x-match"以外的所有值时,消息才会被转发到该队列;any表示当发布的消息匹配该header种除"x-match"外的任意值时,该消息会被转发到匹配队列。

    代码操练

      最后我们以header exchange为例,演示我们的Exchange。首先我们创建四个项目,其中一个作为生产者,另作三个均作为消费者,并且使用:

    dotnet add package RabbitMQ.Client

      给四个项目均安装上RabbitMQ的.NET包,并进行restore,项目结构如图所示:

      开始编写Send端的代码,其中,RabbitMQ还是使用我们在上一章种使用的Docker中RabbitMQ,程序如下:

    using System;
    using System.Collections.Generic;
    using System.Text;
    using RabbitMQ.Client;
    
    namespace Send
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "148.70.210.208" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //声明Headers类型的exchange,名称为agreements
                        channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Headers,
                            autoDelete: false,
                            arguments: null);
    
                        //创建队列queue.A
                        channel.QueueDeclare(queue: "queue.A",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);
    
                        //创建队列queue.B
                        channel.QueueDeclare(queue: "queue.B",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);
    
                        //创建队列queue.C
                        channel.QueueDeclare(queue: "queue.C",
                            durable: true,
                            exclusive: false,
                            autoDelete: false,
                            arguments: null);
    
                        //绑定agreements=>queue.A,使用arguments(format=pdf、type=report、x-match=all)
                        //只有当header中同时满足format=pdf、type=report时,消息才会被转发到队列A
                        Dictionary<string, object> aHeader = new Dictionary<string, object>();
                        aHeader.Add("format", "pdf");
                        aHeader.Add("type", "report");
                        aHeader.Add("x-match", "all");
                        channel.QueueBind(queue: "queue.A",
                            exchange: "agreements",
                            routingKey: string.Empty,
                            arguments: aHeader);
    
                        //绑定agreements=>queue.B,使用arguments(format=pdf、type=log、x-match=any)
                        //当header中满足format=pdf或type=log任意一个时,消息就会被转发到队列B
                        Dictionary<string, object> bHeader = new Dictionary<string, object>();
                        bHeader.Add("format", "pdf");
                        bHeader.Add("type", "log");
                        bHeader.Add("x-match", "any");
                        channel.QueueBind(queue: "queue.B",
                            exchange: "agreements",
                            routingKey: string.Empty,
                            arguments: bHeader);
    
                        //绑定agreements=>queue.C,使用arguments(format=zip、type=report、x-match=all)
                        //当header中同时满足format=zip和type=report时,消息会被转发到队列C
                        Dictionary<string, object> cHeader = new Dictionary<string, object>();
                        cHeader.Add("format", "zip");
                        cHeader.Add("type", "report");
                        cHeader.Add("x-match", "all");
                        channel.QueueBind(queue: "queue.C",
                            exchange: "agreements",
                            routingKey: string.Empty,
                            arguments: cHeader);
    
                        string message1 = "hello world From 1";
                        var body = Encoding.UTF8.GetBytes(message1);
                        var properties1 = channel.CreateBasicProperties();
                        properties1.Persistent = true;
                        Dictionary<string, object> mHeader1 = new Dictionary<string, object>();
                        mHeader1.Add("format", "pdf");
                        mHeader1.Add("type", "report");
                        properties1.Headers = mHeader1;
    
                        //这条消息会被转发到queue.A和queue.B
                        //queue.A 的binding (format=pdf, type=report, x-match=all)
                        //queue.B 的binding (format=pdf, type=log, x-match=any)
                        channel.BasicPublish(exchange: "agreements",
                                        routingKey: string.Empty,
                                        basicProperties: properties1,
                                        body: body);
    
                        string message2 = "hello world From 2";
                        body = Encoding.UTF8.GetBytes(message2);
                        var properties2 = channel.CreateBasicProperties();
                        properties2.Persistent = true;
                        Dictionary<string, object> mHeader2 = new Dictionary<string, object>();
                        mHeader2.Add("type", "log");
                        properties2.Headers = mHeader2;
    
                        //这条消息会被转发到queue.B 
                        //queue.B 的binding (format = pdf, type = log, x-match = any)
                        channel.BasicPublish(exchange: "agreements",
                                        routingKey: string.Empty,
                                        basicProperties: properties2,
                                        body: body);
    
                        string message3 = "hello world From 3";
                        body = Encoding.UTF8.GetBytes(message3);
                        var properties3 = channel.CreateBasicProperties();
                        properties3.Persistent = true;
                        Dictionary<string, object> mHeader3 = new Dictionary<string, object>();
                        mHeader3.Add("format", "zip");
                        properties3.Headers = mHeader3;
    
                        //这条消息不会被路由
                        //队列C要求同时满足两个条件,这里只满足了一个,没有匹配的队列
                        channel.BasicPublish(exchange: "agreements",
                                        routingKey: string.Empty,
                                        basicProperties: properties3,
                                        body: body);
                    }
                }
            }
        }
    }

      运行程序后,可以看到,queue.A中匹配了三条消息、queue.B中匹配了两条、queue.C中没有匹配到任何消息。

      可以看到,队列A中匹配了一条信息,即Message 1,队列B中匹配了两条信息,即Message 1和Message2,队列C中没有匹配信息,符合我们程序的编写,下面用接收端进行接收。
      接收端分别写了三个程序,分别接收队列A、B、C的消息,它们除了绑定队列名称不同外,其余全部相同,下面是绑定队列A的接收程序:

    using System;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace Recieve1
    {
        class Program
        {
            static void Main(string[] args)
            {
                var factory = new ConnectionFactory() { HostName = "148.70.210.208" };
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //注意要与发送端的声明一致
                        channel.ExchangeDeclare(exchange: "agreements",
                            type: ExchangeType.Headers,
                            autoDelete: false,
                            arguments: null);
    
                        //绑定了queue.C和agreements Exchange
                        channel.QueueBind(queue: "queue.A",
                            exchange: "agreements",
                            routingKey: string.Empty);
    
                        Console.WriteLine("Waiting for messages");
    
                        var consumer = new EventingBasicConsumer(channel);
    
                        //绑定接收完成事件
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine($"Recieve Message:{message}");
                        };
    
                        channel.BasicConsume(queue: "queue.A",
                            autoAck: true,
                            consumer: consumer);
    
                        Console.WriteLine("Press [enter] to exit");
                        Console.ReadLine();
                    }
                }
            }
        }
    }

      最后,我们分别运行这三个接收程序:

      符合程序设计。

      参考:JulyLuo——https://www.cnblogs.com/julyluo/p/6265775.html

  • 相关阅读:
    DNNClassifier 深度神经网络 分类器
    浏览器对MP4视频 帧宽度 高度的兼容性
    UnicodeEncodeError:'latin-1' codec can't encode character
    文件夹下 文件计数
    the largest value you actually can transmit between the client and server is determined by the amount of available memory and the size of the communications buffers.
    the “identity” of an object
    广告特征 用户特征
    如果一个维度全覆盖,则有效维度应该对该维度全覆盖
    a high-level neural networks AP
    使用 LDA 挖掘的用户喜好主题
  • 原文地址:https://www.cnblogs.com/cquptjyz/p/10888923.html
Copyright © 2011-2022 走看看