zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(九)-通过Headers模式分发消息(.Net Core版)

    Headers类型的exchange使用的比较少,以至于官方文档貌似都没提到,它是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。 

    匹配有两种方式all和any。这两种方式是在接收端必须要用键值”x-mactch”来定义。all代表定义的多个键值对都要满足,any代表只要满足一个就可以。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。 

    那在.Net Core中怎么应用呢? 

    headers也是一种交换机类型,但是在rabbitmq官网中的教程中并没有说到。资料也很少,但是找一找总会有的。

    headers与direct的模式不同,不是使用routingkey去做绑定。而是通过消息headers的键值对匹配

    发布者  -- >  headersexchange  -->  (user:  “小明 ”) binding  --> queue

    也就是说 user: 小明 替代了之前的routingkey。在做绑定的时候有两种匹配方式供选择。x-match (all/any)

    意思就是键值对中所有的项都要匹配与只要有一个匹配就可以。下面就可以动手写代码了

    新建HeadersProduct用来发布新消息:

    using System;
    using System.Collections.Generic;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace HeadersProduct
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchangeHeaders";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                using (IConnection connection=factory.CreateConnection())
                {
                    using (IModel channel=connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: "headers", durable: true, autoDelete: false, arguments: null);
    
                        IBasicProperties properties = channel.CreateBasicProperties();
                        properties.Headers = new Dictionary<String, Object>()
                        {
                            {"user","wyt" },
                            {"password","wyt"}
                        };
    
                        Byte[] body = Encoding.UTF8.GetBytes("Hello World");
    
                        channel.BasicPublish(exchange: exchangeName, routingKey: String.Empty, basicProperties: properties, body: body);
                    }
                }
    
                Console.Write("发布成功!");
    
                Console.ReadKey();
            }
        }
    }
    View Code

    新建HeadersConsumerA来以正确的headers接收消息:

    using System;
    using System.Collections.Generic;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace HeadersConsumerA
    {
        class Program
        {
            static void Main(string[] args)
            {
    
    
    
                String exchangeName = "wytExchangeHeaders";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                bool flag = true;
                string pattern = "";
                while (flag)
                {
                    Console.WriteLine("请选择headers匹配模式  1(any)/2(all)");
                    pattern = Console.ReadLine();
                    if (pattern == "1" || pattern == "2")
                        flag = false;
                    else
                        Console.Write("请做出正确的选择");
                }
                //根据声明使用的队列
                var headersType = pattern == "1" ? "any" : "all";
    
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
    
                        String queueName = channel.QueueDeclare().QueueName;
    
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: new Dictionary<String, Object>
                        {
                            {"x-math",headersType },
                            {"user","wyt" },
                            {"password","wyt" }
                        });
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var msg = Encoding.UTF8.GetString(ea.Body);
    
                            Console.WriteLine($"{msg}");
                        };
    
                        channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    
    
                        Console.ReadKey();
    
                    }
                }
            }
        }
    }
    View Code

    新建HeadersConsumerB来以错误的headers接收消息:

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    
    namespace HeadersConsumerB
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchangeHeaders";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                bool flag = true;
                string pattern = "";
                while (flag)
                {
                    Console.WriteLine("请选择headers匹配模式  1(any)/2(all)");
                    pattern = Console.ReadLine();
                    if (pattern == "1" || pattern == "2")
                        flag = false;
                    else
                        Console.Write("请做出正确的选择");
                }
                //根据声明使用的队列
                var headersType = pattern == "1" ? "any" : "all";
    
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
    
                        String queueName = channel.QueueDeclare().QueueName;
    
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: new Dictionary<String, Object>
                        {
                            {"x-math",headersType },
                            {"user","xxx" },
                            {"password","xxx" }
                        });
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var msg = Encoding.UTF8.GetString(ea.Body);
    
                            Console.WriteLine($"{msg}");
                        };
    
                        channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    
    
                        Console.ReadKey();
    
                    }
                }
    
            }
        }
    }
    View Code

    运行结果

    一个可以接收到消息,另一个由于headers是不匹配的,所有接收不到消息

  • 相关阅读:
    LeetCode Merge Two Sorted Lists 归并排序
    LeetCode Add Binary 两个二进制数相加
    LeetCode Climbing Stairs 爬楼梯
    034 Search for a Range 搜索范围
    033 Search in Rotated Sorted Array 搜索旋转排序数组
    032 Longest Valid Parentheses 最长有效括号
    031 Next Permutation 下一个排列
    030 Substring with Concatenation of All Words 与所有单词相关联的字串
    029 Divide Two Integers 两数相除
    028 Implement strStr() 实现 strStr()
  • 原文地址:https://www.cnblogs.com/wyt007/p/9078647.html
Copyright © 2011-2022 走看看