zoukankan      html  css  js  c++  java
  • C#队列学习笔记:RabbitMQ优先级队列

        一、引言

        在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理。在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x-max-priority属性,然后在publish消息时,设置消息的优先级即可。

        RabbitMQ优先级队列注意事项:

        1)RabbitMQ3.5以后才支持优先级队列。

        2)只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。

        3)优先级取值范围在0~9之间,数值越大则优先级越高。

        二、示例

        2.1、发送端(生产端)

        新建一个控制台项目Send,并添加一个类RabbitMQConfig。

        class RabbitMQConfig
        {
            public static string Host { get; set; }
    
            public static string VirtualHost { get; set; }
    
            public static string UserName { get; set; }
    
            public static string Password { get; set; }
    
            public static int Port { get; set; }
    
            static RabbitMQConfig()
            {
                Host = "192.168.2.242";
                VirtualHost = "/";
                UserName = "hello";
                Password = "world";
                Port = 5672;
            }
        }
    RabbitMQConfig.cs
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("按任意键开始生产。");
                Console.ReadLine();
                PriorityMessagePublish();
                Console.ReadLine();
            }
    
            private static void PriorityMessagePublish()
            {
                const string MessagePrefix = "message_";
                const int PublishMessageCount = 6;
                byte messagePriority = 0;
    
                var factory = new ConnectionFactory()
                {
                    HostName = RabbitMQConfig.Host,
                    Port = RabbitMQConfig.Port,
                    VirtualHost = RabbitMQConfig.VirtualHost,
                    UserName = RabbitMQConfig.UserName,
                    Password = RabbitMQConfig.Password,
                    Protocol = Protocols.DefaultProtocol
                };
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //设置队列优先级,取值范围在0~255之间。
                        Dictionary<string, object> dict = new Dictionary<string, object>
                        {
                            { "x-max-priority", 255 }
                        };
    
                        //声明队列
                        channel.QueueDeclare(queue: "priority", durable: true, exclusive: false, autoDelete: false, arguments: dict);
    
    
                        //向该消息队列发送消息message
                        Random random = new Random();
                        for (int i = 0; i < PublishMessageCount; i++)
                        {
                            var properties = channel.CreateBasicProperties();
                            messagePriority = (byte)random.Next(0, 9);
                            properties.Priority = messagePriority;//设置消息优先级,取值范围在0~9之间。
                            var message = MessagePrefix + i.ToString();
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "", routingKey: "priority", basicProperties: properties, body: body);
                            Console.WriteLine($"{DateTime.Now.ToString()} Send {message} , Priority {messagePriority}");
                        }
                    }
                }
            }
        }
    Program.cs

        2.2、接收端(消费端)

        新建一个控制台项目Receive,按住Alt键,将发送端RabbitMQConfig类拖一个快捷方式到Receive项目中。

        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("按任意键开始消费。");
                Console.ReadLine();
                PriorityMessageSubscribe();
            }
    
            public static void PriorityMessageSubscribe()
            {
                var factory = new ConnectionFactory()
                {
                    HostName = RabbitMQConfig.Host,
                    UserName = RabbitMQConfig.UserName,
                    Password = RabbitMQConfig.Password
                };
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += async (model, ea) =>
                        {
                            await Task.Run(() =>
                            {
                                var message = Encoding.UTF8.GetString(ea.Body);
                                Thread.Sleep(1000 * 2);
                                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);//手动消息确认
                                Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                            });
                        };
                        channel.BasicConsume(queue: "priority", noAck: false, consumer: consumer);//需要启用消息响应,否则priority无效。
                        Console.ReadKey();
                    }
                }
            }
        }
    Program.cs

        2.3、运行结果

        从消费情况可以看出,message_2及message_3由于priority优先级最高都是7,所以它们会被最早消费,而message_5的priority是0,所以最后才被消费。

  • 相关阅读:
    x64 平台开发 Mapxtreme 编译错误
    hdu 4305 Lightning
    Ural 1627 Join(生成树计数)
    poj 2104 Kth Number(可持久化线段树)
    ural 1651 Shortest Subchain
    hdu 4351 Digital root
    hdu 3221 Bruteforce Algorithm
    poj 2892 Tunnel Warfare (Splay Tree instead of Segment Tree)
    hdu 4031 Attack(BIT)
    LightOJ 1277 Looking for a Subsequence
  • 原文地址:https://www.cnblogs.com/atomy/p/12672396.html
Copyright © 2011-2022 走看看