zoukankan      html  css  js  c++  java
  • C#队列学习笔记:RabbitMQ延迟队列

        一、引言

        日常生活中,很多的APP都有延迟队列的影子。比如在手机淘宝上,经常遇到APP派发的限时消费红包,一般有几个小时或24小时不等。假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效。假如上述行为使用RabbitMQ延时队列来理解的话,就是在你收到限时消费红包的时候,手机淘宝会自动发一条延时消息到队列中以供消费。在规定时间内,则可正常消费,否则依TTL自动失效。

        在RabbitMQ中,有两种方式来实现延时队列:一种是基于队列方式,另外一种是基于消息方式。

        二、示例

        2.1、发送端(生产端)

        新建一个控制台项目Send,并添加一个类RabbitMQConfig。

        class RabbitMQConfig
        {
            public static string Host { get; set; }
    
            public static string VirtualHost { get; set; }
    
            public static string UserName { get; set; }
    
            public static string Password { get; set; }
    
            public static int Port { get; set; }
    
            static RabbitMQConfig()
            {
                Host = "192.168.2.242";
                VirtualHost = "/";
                UserName = "hello";
                Password = "world";
                Port = 5672;
            }
        }
    RabbitMQConfig.cs
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("C# RabbitMQ实现延迟队列有以下两种方式:");
                Console.WriteLine("1、基于队列方式实现延迟队列,请按1开始生产。");
                Console.WriteLine("2、基于消息方式实现延迟队列,请按2开始生产。");
    
                string chooseChar = Console.ReadLine();
                if (chooseChar == "1")
                {
                    DelayMessagePublishByQueueExpires();
                }
                else if (chooseChar == "2")
                {
                    DelayMessagePublishByMessageTTL();
                }
                Console.ReadLine();
            }
    
            /// <summary>
            /// 基于队列方式实现延迟队列
            /// 将队列中所有消息的TTL(Time To Live,即过期时间)设置为一样
            /// </summary>
            private static void DelayMessagePublishByQueueExpires()
            {
                const string MessagePrefix = "message_";
                const int PublishMessageCount = 6;
                const int QuequeExpirySeconds = 1000 * 30;
                const int MessageExpirySeconds = 1000 * 10;
    
                var factory = new ConnectionFactory()
                {
                    HostName = RabbitMQConfig.Host,
                    Port = RabbitMQConfig.Port,
                    VirtualHost = RabbitMQConfig.VirtualHost,
                    UserName = RabbitMQConfig.UserName,
                    Password = RabbitMQConfig.Password,
                    Protocol = Protocols.DefaultProtocol
                };
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        //当同时指定了queue和message的TTL值,则两者中较小的那个才会起作用。
                        Dictionary<string, object> dict = new Dictionary<string, object>
                        {
                            { "x-expires", QuequeExpirySeconds },//队列过期时间
                            { "x-message-ttl", MessageExpirySeconds },//消息过期时间
                            { "x-dead-letter-exchange", "dead exchange 1" },//过期消息转向路由
                            { "x-dead-letter-routing-key", "dead routing key 1" }//过期消息转向路由的routing key
                        };
    
                        //声明队列
                        channel.QueueDeclare(queue: "delay1", durable: true, exclusive: false, autoDelete: false, arguments: dict);
    
    
                        //向该消息队列发送消息message
                        for (int i = 0; i < PublishMessageCount; i++)
                        {
                            var message = MessagePrefix + i.ToString();
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "", routingKey: "delay1", basicProperties: null, body: body);
                            Thread.Sleep(1000 * 2);
                            Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}");
                        }
                    }
                }
            }
    
            /// <summary>
            /// 基于消息方式实现延迟队列
            /// 对队列中消息进行单独设置,每条消息的TTL可以不同。
            /// </summary>
            private static void DelayMessagePublishByMessageTTL()
            {
                const string MessagePrefix = "message_";
                const int PublishMessageCount = 6;
                int MessageExpirySeconds = 0;
    
                var factory = new ConnectionFactory()
                {
                    HostName = RabbitMQConfig.Host,
                    Port = RabbitMQConfig.Port,
                    VirtualHost = RabbitMQConfig.VirtualHost,
                    UserName = RabbitMQConfig.UserName,
                    Password = RabbitMQConfig.Password,
                    Protocol = Protocols.DefaultProtocol
                };
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        Dictionary<string, object> dict = new Dictionary<string, object>
                        {
                            { "x-dead-letter-exchange", "dead exchange 2" },//过期消息转向路由
                            { "x-dead-letter-routing-key", "dead routing key 2" }//过期消息转向路由的routing key
                        };
    
                        //声明队列
                        channel.QueueDeclare(queue: "delay2", durable: true, exclusive: false, autoDelete: false, arguments: dict);
    
                        //向该消息队列发送消息message
                        Random random = new Random();
                        for (int i = 0; i < PublishMessageCount; i++)
                        {
                            MessageExpirySeconds = i * 1000;
                            var properties = channel.CreateBasicProperties();
                            properties.Expiration = MessageExpirySeconds.ToString();
                            var message = MessagePrefix + i.ToString();
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish(exchange: "", routingKey: "delay2", basicProperties: properties, body: body);
                            Console.WriteLine($"{DateTime.Now.ToString()} Send {message} MessageExpirySeconds {MessageExpirySeconds / 1000}");
                        }
                    }
                }
            }
        }
    Program.cs

        2.2、接收端(消费端)

        新建一个控制台项目Receive,按住Alt键,将发送端RabbitMQConfig类拖一个快捷方式到Receive项目中。

        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("C# RabbitMQ实现延迟队列有以下两种方式:");
                Console.WriteLine("1、基于队列方式实现延迟队列,请按1开始消费。");
                Console.WriteLine("2、基于消息方式实现延迟队列,请按2开始消费。");
    
                string chooseChar = Console.ReadLine();
                if (chooseChar == "1")
                {
                    DelayMessageConsumeByQueueExpires();
                }
                else if (chooseChar == "2")
                {
                    DelayMessageConsumeByMessageTTL();
                }
                Console.ReadLine();
            }
    
            public static void DelayMessageConsumeByQueueExpires()
            {
                var factory = new ConnectionFactory()
                {
                    HostName = RabbitMQConfig.Host,
                    Port = RabbitMQConfig.Port,
                    VirtualHost = RabbitMQConfig.VirtualHost,
                    UserName = RabbitMQConfig.UserName,
                    Password = RabbitMQConfig.Password,
                    Protocol = Protocols.DefaultProtocol
                };
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "dead exchange 1", type: "direct");
                        string name = channel.QueueDeclare().QueueName;
                        channel.QueueBind(queue: name, exchange: "dead exchange 1", routingKey: "dead routing key 1");
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var message = Encoding.UTF8.GetString(ea.Body);
                            Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                        };
                        channel.BasicConsume(queue: name, noAck: true, consumer: consumer);
                        Console.ReadKey();
                    }
                }
            }
    
            public static void DelayMessageConsumeByMessageTTL()
            {
                var factory = new ConnectionFactory()
                {
                    HostName = RabbitMQConfig.Host,
                    Port = RabbitMQConfig.Port,
                    VirtualHost = RabbitMQConfig.VirtualHost,
                    UserName = RabbitMQConfig.UserName,
                    Password = RabbitMQConfig.Password,
                    Protocol = Protocols.DefaultProtocol
                };
    
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.ExchangeDeclare(exchange: "dead exchange 2", type: "direct");
                        string name = channel.QueueDeclare().QueueName;
                        channel.QueueBind(queue: name, exchange: "dead exchange 2", routingKey: "dead routing key 2");
    
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var message = Encoding.UTF8.GetString(ea.Body);
                            Console.WriteLine($"{DateTime.Now.ToString()} Received {message}");
                        };
                        channel.BasicConsume(queue: name, noAck: true, consumer: consumer);
                        Console.ReadKey();
                    }
                }
            }
        }
    Program.cs

        2.3、运行结果

    -----------------------------------------------------------------------------------------------------------

  • 相关阅读:
    Sublime Text 3 Build 3143 可用License
    npm安装cnpm报错
    使用proxy来简单的实现一个观察者
    时间倒计时提醒
    JavaScript设计模式
    异步方法(promise版)出错自调用
    co模块源码学习笔记
    go new() 和 make() 的区别
    广度优先搜索算法
    并发和并行有什么区别?(转)
  • 原文地址:https://www.cnblogs.com/atomy/p/12678074.html
Copyright © 2011-2022 走看看