zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列(九)-通过Headers模式分发消息(.Net Core版)

    Headers类型的exchange使用的比较少,以至于官方文档貌似都没提到,它是忽略routingKey的一种路由方式。是使用Headers来匹配的。Headers是一个键值对,可以定义成Hashtable。发送者在发送的时候定义一些键值对,接收者也可以再绑定时候传入一些键值对,两者匹配的话,则对应的队列就可以收到消息。 

    匹配有两种方式all和any。这两种方式是在接收端必须要用键值”x-mactch”来定义。all代表定义的多个键值对都要满足,any代表只要满足一个就可以。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange则没有这个要求,因为键值对的值可以是任何类型。 

    那在.Net Core中怎么应用呢? 

    headers也是一种交换机类型,但是在rabbitmq官网中的教程中并没有说到。资料也很少,但是找一找总会有的。

    headers与direct的模式不同,不是使用routingkey去做绑定。而是通过消息headers的键值对匹配

    发布者  -- >  headersexchange  -->  (user:  “小明 ”) binding  --> queue

    也就是说 user: 小明 替代了之前的routingkey。在做绑定的时候有两种匹配方式供选择。x-match (all/any)

    意思就是键值对中所有的项都要匹配与只要有一个匹配就可以。下面就可以动手写代码了

    新建HeadersProduct用来发布新消息:

    using System;
    using System.Collections.Generic;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace HeadersProduct
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchangeHeaders";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                using (IConnection connection=factory.CreateConnection())
                {
                    using (IModel channel=connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: "headers", durable: true, autoDelete: false, arguments: null);
    
                        IBasicProperties properties = channel.CreateBasicProperties();
                        properties.Headers = new Dictionary<String, Object>()
                        {
                            {"user","wyt" },
                            {"password","wyt"}
                        };
    
                        Byte[] body = Encoding.UTF8.GetBytes("Hello World");
    
                        channel.BasicPublish(exchange: exchangeName, routingKey: String.Empty, basicProperties: properties, body: body);
                    }
                }
    
                Console.Write("发布成功!");
    
                Console.ReadKey();
            }
        }
    }
    View Code

    新建HeadersConsumerA来以正确的headers接收消息:

    using System;
    using System.Collections.Generic;
    using System.Text;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    
    namespace HeadersConsumerA
    {
        class Program
        {
            static void Main(string[] args)
            {
    
    
    
                String exchangeName = "wytExchangeHeaders";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                bool flag = true;
                string pattern = "";
                while (flag)
                {
                    Console.WriteLine("请选择headers匹配模式  1(any)/2(all)");
                    pattern = Console.ReadLine();
                    if (pattern == "1" || pattern == "2")
                        flag = false;
                    else
                        Console.Write("请做出正确的选择");
                }
                //根据声明使用的队列
                var headersType = pattern == "1" ? "any" : "all";
    
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
    
                        String queueName = channel.QueueDeclare().QueueName;
    
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: new Dictionary<String, Object>
                        {
                            {"x-math",headersType },
                            {"user","wyt" },
                            {"password","wyt" }
                        });
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var msg = Encoding.UTF8.GetString(ea.Body);
    
                            Console.WriteLine($"{msg}");
                        };
    
                        channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    
    
                        Console.ReadKey();
    
                    }
                }
            }
        }
    }
    View Code

    新建HeadersConsumerB来以错误的headers接收消息:

    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Collections.Generic;
    using System.Text;
    
    namespace HeadersConsumerB
    {
        class Program
        {
            static void Main(string[] args)
            {
                String exchangeName = "wytExchangeHeaders";
    
                ConnectionFactory factory = new ConnectionFactory();
                factory.HostName = "192.168.63.129";
                factory.Port = 5672;
                factory.VirtualHost = "/wyt";
                factory.UserName = "wyt";
                factory.Password = "wyt";
    
                bool flag = true;
                string pattern = "";
                while (flag)
                {
                    Console.WriteLine("请选择headers匹配模式  1(any)/2(all)");
                    pattern = Console.ReadLine();
                    if (pattern == "1" || pattern == "2")
                        flag = false;
                    else
                        Console.Write("请做出正确的选择");
                }
                //根据声明使用的队列
                var headersType = pattern == "1" ? "any" : "all";
    
                using (IConnection connection = factory.CreateConnection())
                {
                    using (IModel channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);
    
                        String queueName = channel.QueueDeclare().QueueName;
    
                        channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: new Dictionary<String, Object>
                        {
                            {"x-math",headersType },
                            {"user","xxx" },
                            {"password","xxx" }
                        });
    
                        EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var msg = Encoding.UTF8.GetString(ea.Body);
    
                            Console.WriteLine($"{msg}");
                        };
    
                        channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
    
    
                        Console.ReadKey();
    
                    }
                }
    
            }
        }
    }
    View Code

    运行结果

    一个可以接收到消息,另一个由于headers是不匹配的,所有接收不到消息

  • 相关阅读:
    請問各位大大,我要將listview顯示的縮圖加入到listview2,請問該如何做呢
    一个可设置窗口透明属性的控件,可让窗口透明、半透明
    laravel he stream or file "..laravel-2019-02-14.log" could not be opened: failed to open stream: Permission denied
    每日学习-20190721
    linux centos无法删除网站根目录下的.user.ini解决办法
    laravel在使用Composer安装插件时要求输入授权用户名密码解决办法
    Centos7 日志查看工具
    Centos7 Putty SSH密钥登录
    阿里云Centos7用putty ssh链接掉线
    阿里云 centos 无法执行moodle cron
  • 原文地址:https://www.cnblogs.com/wyt007/p/9078647.html
Copyright © 2011-2022 走看看