zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(八)-通过Topic主题模式分发消息(.Net Core版)

    前两章我们讲了RabbitMQ的direct模式和fanout模式,本章介绍topic主题模式的应用。如果对direct模式下通过routingkey来匹配消息的模式已经有一定了解那fanout也很好理解。简单的可以理解成direct是通过routingkey精准匹配的,而topic是通过routingkey来模糊匹配。 
    在topic模式下支持两个特殊字符的匹配。

    * (星号) 代表任意 一个单词
    # (井号) 0个或者多个单词

    注意:上面说的是单词不是字符。

    如下图所示,RabbitMQ direct模式通过RoutingKey来精准匹配,RoutingKey为red的投递到Queue1,RoutingKey为black和white的投递到Queue2。 

    我们可以假设一个场景,我们要做一个日志模块来收集处理不同的日志,日志区分包含三个维度的标准:模块、日志紧急程度、日志重要程度。模块分为:red、black、white;紧急程度分为:critical、normal;把重要程度分为:medium、low、high在RoutingKey字段中我们把这三个维度通过两个“.“连接起来。 
    现在我们需要对black模块,紧急程度为critical,重要程度为high的日志分配到队列1打印到屏幕;对所以模块重要程度为high的日志和white紧急程度为critical的日志发送到队列2持久化到硬盘。如下示例:

    • RoutingKey为“black.critical.high”的日志会投递到queue1和queue2,。

    • RoutingKey为“red.critical.high”的日志会只投递到queue2。

    • RoutingKey为“white.critical.high”的日志会投递到queue2,并且虽然queue2的两个匹配规则都符合但只会向queue2投递一份。

     新建TopicProduct用来发布三种routingkey的消息。

    using System;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace TopicProduct
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchangeTopic";
                String routeKeyName1 = "black.critical.high";
                String routeKeyName2 = "red.critical.high";
                String routeKeyName3 = "white.critical.high";
                
                String message1 = "black-critical-high!";
                String message2 = "red-critical-high!";
                String message3 = "white-critical-high!";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                using (IConnection connection=factory.CreateConnection())
                {
                    using (IModel channel=connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);
    
                        IBasicProperties properties = channel.CreateBasicProperties();
                        properties.Persistent = true;
    
                        Byte[] body1 = Encoding.UTF8.GetBytes(message1);
                        Byte[] body2 = Encoding.UTF8.GetBytes(message2);
                        Byte[] body3 = Encoding.UTF8.GetBytes(message3);
    
                        //消息推送
                        channel.BasicPublish(exchange: exchangeName, routingKey:routeKeyName1,basicProperties: properties, body: body1);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body2);
                        channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body3);
    
                        Console.WriteLine(" [x] Sent {0}", message1);
                        Console.WriteLine(" [x] Sent {0}", message2);
                        Console.WriteLine(" [x] Sent {0}", message3);
                    }
                }
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
    View Code

    新建TopicCustomerA接收一种消息

    using System;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace TopicCustomerA
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchangeTopic";
                String routeKeyName1 = "black.critical.high";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                using (IConnection connection=factory.CreateConnection())
                {
                    using (IModel channel=connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);
    
                        String queueName = channel.QueueDeclare().QueueName;
    
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            var routingKey = ea.RoutingKey;
                            Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
    
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        };
    
                        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    View Code

    新建TopicCustomerB接收两种消息

    using System;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace TopicCustomerB
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchangeTopic";
                String routeKeyName1 = "red.critical.*";
                String routeKeyName2 = "white.critical.*";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null);
    
                        String queueName = channel.QueueDeclare().QueueName;
    
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null);
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null);
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            var routingKey = ea.RoutingKey;
                            Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
    
                            channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                        };
    
                        channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
    
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }
    }
    View Code

    先运行TopicCustomerA和TopicCustomerB保持订阅状态。然后执行TopicProduct发布消息。TopicCustomerA和TopicCustomerB收到的消息如下:

    如上截图,验证了我们之前的结论。

    另外还有一些特殊情况例如:

    如果binding_key 是 “#” - 它会接收所有的Message,不管routing_key是什么,就像是fanout 
    exchange。
    如果 “*” and “#” 没有被使用,那么topic exchange就变成了direct exchange。
  • 相关阅读:
    使用nodejs消费SAP Cloud for Customer上的Web service
    如何在SAP Cloud for Customer自定义BO中创建访问控制
    SAP云平台运行环境Cloud Foundry和Neo的区别
    SAP成都研究院马洪波:提升学习力,增强竞争力,收获一生乐趣
    SAP Netweaver的负载均衡消息服务器 vs CloudFoundry的App Router
    写在Github被微软收购之际
    在SAP云平台的CloudFoundry环境下消费ABAP On-Premise OData服务
    Java实现 LeetCode 517 超级洗衣机
    Java实现 LeetCode 517 超级洗衣机
    Java实现 LeetCode 517 超级洗衣机
  • 原文地址:https://www.cnblogs.com/wyt007/p/9078177.html
Copyright © 2011-2022 走看看