zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列实现30分钟订单自动取消功能(C#)

    目录:

    功能介绍

    消息队列简介及原理

    代码与实现

    消息队列常见问题

    功能介绍

    一 . 简单介绍一下要做的功能,用户前台下单之后,如果用户未支付,30分钟后订单会自动取消,订单状态和库存变回原来状态和库存,我们的后台使用asp.net core 2.0开发,而asp.net core后台的定时任务 需要添加服务 services.AddHostedService<DeadListener>(); 实现类直接继承IHostedService接口,接口会调用 启动 方法 StartAsync 。好了,不多介绍启动定时任务,(主要介绍的是消息队列RabbitMQ)然后 简单的逻辑就是 用户下单会把一条消息插入生产队列中,当然消息队列的配置是30分钟,30分钟之内如果用户支付,就会调用消费者接口,将消息消费掉,如果30分钟没有支付,超时消息会到死信队列中,然后后台任务会检查到死信队列中的消息,将消息消费掉,过程中会改订单状态等

    消息队列简介及原理

    二 . 再简单介绍一下消息队列吧,我也是第一次用,网上太多帖子了,哈哈

    • ConnectionFactory(连接管理器):应用程序与Rabbit之间建立连接的管理器,程序代码中使用;
    • Channel(信道):消息推送使用的通道;
    • Exchange(交换器):用于接受、分配消息;
    • Queue(队列):用于存储生产者的消息;
    • RoutingKey(路由键):用于把生成者的数据分配到交换器上;
    • BindingKey(绑定键):用于把交换器的消息绑定到队列上;

     

    三. 工作机制

    生产者、消费者和代理

    在了解消息通讯之前首先要了解3个概念:生产者、消费者和代理。

    生产者:消息的创建者,负责创建和推送数据到消息服务器;

    消费者:消息的接收方,用于处理数据和确认消息;

    代理:就是RabbitMQ本身,用于扮演“快递”的角色,本身不生产消息,只是扮演“快递”的角色。

    消息发送原理

    首先你必须连接到Rabbit才能发布和消费消息,那怎么连接和发送消息的呢?

    你的应用程序和Rabbit Server之间会创建一个TCP连接,一旦TCP打开,并通过了认证,认证就是你试图连接Rabbit之前发送的Rabbit服务器连接信息和用户名和密码,有点像程序连接数据库,使用Java有两种连接认证的方式,后面代码会详细介绍,一旦认证通过你的应用程序和Rabbit就创建了一条AMQP信道(Channel)。

    信道是创建在“真实”TCP上的虚拟连接,AMQP命令都是通过信道发送出去的,每个信道都会有一个唯一的ID,不论是发布消息,订阅队列或者介绍消息都是通过信道完成的。

    为什么不通过TCP直接发送命令?

    对于操作系统来说创建和销毁TCP会话是非常昂贵的开销,假设高峰期每秒有成千上万条连接,每个连接都要创建一条TCP会话,这就造成了TCP连接的巨大浪费,而且操作系统每秒能创建的TCP也是有限的,因此很快就会遇到系统瓶颈。

    如果我们每个请求都使用一条TCP连接,既满足了性能的需要,又能确保每个连接的私密性,这就是引入信道概念的原因。

    四.消息持久化

    Rabbit队列和交换器有一个不可告人的秘密,就是默认情况下重启服务器会导致消息丢失,那么怎么保证Rabbit在重启的时候不丢失呢?答案就是消息持久化。

    当你把消息发送到Rabbit服务器的时候,你需要选择你是否要进行持久化,但这并不能保证Rabbit能从崩溃中恢复,想要Rabbit消息能恢复必须满足3个条件:

    1. 投递消息的时候durable设置为true,消息持久化,代码:channel.queueDeclare(x, true, false, false, null),参数2设置为true持久化;
    2. 设置投递模式deliveryMode设置为2(持久),代码:channel.basicPublish(x, x, MessageProperties.PERSISTENT_TEXT_PLAIN,x),参数3设置为存储纯文本到磁盘;
    3. 消息已经到达持久化交换器上;
    4. 消息已经到达持久化的队列;

    持久化工作原理

    Rabbit会将你的持久化消息写入磁盘上的持久化日志文件,等消息被消费之后,Rabbit会把这条消息标识为等待垃圾回收。

    持久化的缺点

    消息持久化的优点显而易见,但缺点也很明显,那就是性能,因为要写入硬盘要比写入内存性能较低很多,从而降低了服务器的吞吐量,尽管使用SSD硬盘可以使事情得到缓解,但他仍然吸干了Rabbit的性能,当消息成千上万条要写入磁盘的时候,性能是很低的。

    所以使用者要根据自己的情况,选择适合自己的方式。

    代码与实现

    1.声明队列的参数说明

    1 //声明队列
    2  channel.QueueDeclare
    3  (
    4      queue: QueueName, //队列名称
    5      durable: false, //队列是否持久化.false:队列在内存中,服务器挂掉后,队列就没了;true:服务器重启后,队列将会重新生成.注意:只是队列持久化,不代表队列中的消息持久化!!!!
    6       exclusive: false, //队列是否专属,专属的范围针对的是连接,也就是说,一个连接下面的多个信道是可见的.对于其他连接是不可见的.连接断开后,该队列会被删除.注意,不是信道断开,是连接断开.并且,就算设置成了持久化,也会删除.
    7      autoDelete: true, //如果所有消费者都断开连接了,是否自动删除.如果还没有消费者从该队列获取过消息或者监听该队列,那么该队列不会删除.只有在有消费者从该队列获取过消息后,该队列才有可能自动删除(当所有消费者都断开连接,不管消息是否获取完)
    8      arguments: null //队列的配置
    9  );
    1 //加载消息队列(订单超时)
    2 //定时任务触发器
    3  services.AddHostedService<DeadListener>();
    4 或者
    5  services.AddTransient<IHostedService, DeadListener>();
    View Code

    2.消息队列(生产者)

     1        /// <summary>
     2         /// 订单超时未处理消息队列(生产者)
     3         /// </summary>
     4         /// <param name="routeKey"></param>
     5         /// <returns></returns>
     6         public Task PublisherOrder(string routeKey)
     7         {
     8             const string routingKeyDead = "queue-dead-routing-jd"; //死信队列路由
     9             var routingKeyDelay = "queue-delay-" + routeKey;//消息队列路由
    10             const string orderQueueName = "zzhelloJd"; //定义消息队列名
    11             const string orderQueueDeadName = "zzhello_dead_Jd"; //定义一个死信消息队列名
    12 
    13             var factory = new ConnectionFactory
    14             {
    15                 UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用户名
    16                 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密码
    17                 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip
    18             };
    19             using (var connection = factory.CreateConnection())
    20             {
    21                 using (var channel = connection.CreateModel())
    22                 {
    23                     //定义死信交换机
    24                     channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null);
    25                     //创建一个名叫"zzhello_dead"的消息队列
    26                     channel.QueueDeclare(orderQueueDeadName, true, false, false, null);
    27                     //将死信队列绑定到死信交换机
    28                     channel.QueueBind(orderQueueDeadName, "exchange-D", routingKeyDead);
    29                     var dic = new Dictionary<string, object>
    30                     {
    31                         {"x-message-ttl", 1800000},//队列上消息过期时间,应小于队列过期时间 60000 1800000
    32                         //{"x-message-ttl", 120000},//队列上消息过期时间,应小于队列过期时间 60000 1800000
    33                         {"x-dead-letter-exchange", "exchange-D"},//过期消息转向路由
    34                         {"x-dead-letter-routing-key", routingKeyDead}//过期消息转向路由相匹配routingkey
    35                     };
    36                     channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定义一个Direct类型交换机
    37                     //创建一个名叫"zzhello"的消息队列
    38                     channel.QueueDeclare(orderQueueName, true, false, false, dic);
    39                     //将队列绑定到交换机
    40                     channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic);
    41                     var body = Encoding.UTF8.GetBytes(routeKey.ToString());
    42                     //向该消息队列发送消息message
    43                     channel.BasicPublish("exchange-L",
    44                             routingKeyDelay,
    45                             null,
    46                             body);
    47                 }
    48             }
    49             return Task.CompletedTask;
    50         }

    3.消息队列(消费者)

     1      /// <summary>
     2         /// 支付成功后处理消费者
     3         /// </summary>
     4         /// <returns></returns>
     5         [Obsolete]
     6         public Task ConsumerOrder(string routeKey)
     7         {
     8             const string orderQueueName = "zzhelloJd"; //定义消息队列名
     9             var routingKeyDelay = "queue-delay-" + routeKey;//消息队列路由
    10             const string routingKeyDead = "queue-dead-routing-jd"; //死信队列路由
    11             var factory = new ConnectionFactory
    12             {
    13                 UserName = _configuration["RabbitMQConfig:RabbitUserName"],//用户名
    14                 Password = _configuration["RabbitMQConfig:RabbitPassword"],//密码
    15                 HostName = _configuration["RabbitMQConfig:RabbitHost"],//rabbitmq ip
    16             };
    17             using (var connection = factory.CreateConnection())
    18             {
    19                 using (var channel = connection.CreateModel())
    20                 {
    21                     var dic = new Dictionary<string, object>
    22                     {
    23                         {"x-message-ttl", 1800000},//队列上消息过期时间,应小于队列过期时间 60000 1800000
    24                         {"x-dead-letter-exchange", "exchange-D"},//过期消息转向路由
    25                         {"x-dead-letter-routing-key", routingKeyDead}//过期消息转向路由相匹配routingkey
    26                     };
    27                     channel.ExchangeDeclare("exchange-L", ExchangeType.Direct, true, false, null);//定义一个Direct类型交换机
    28                     //创建一个名叫"zzhello"的消息队列
    29                     channel.QueueDeclare(orderQueueName, true, false, false, dic);
    30                     //将队列绑定到交换机
    31                     channel.QueueBind(orderQueueName, "exchange-L", routingKeyDelay, dic);
    32                     //回调,当consumer收到消息后会执行该函数
    33                     //var consumer = new EventingBasicConsumer(channel);
    34                     //consumer.Received += (model, ea) =>
    35                     //{
    36                     //    var body = ea.Body;
    37                     //    var message = Encoding.UTF8.GetString(body);
    38                     //};
    39 
    40                     ////消费队列"hello"中的消息
    41                     //channel.BasicConsume(queue: name,
    42                     //                     autoAck: true,
    43                     //                     consumer: consumer);
    44 
    45                     var consumer = new QueueingBasicConsumer(channel);
    46                     //消费队列,并设置应答模式为程序主动应答
    47                     channel.BasicConsume(orderQueueName, false, consumer);
    48 
    49                     //阻塞函数,获取队列中的消息
    50                     var ea = consumer.Queue.Dequeue();
    51                     var bytes = ea.Body;
    52                     var str = Encoding.UTF8.GetString(bytes);
    53                     Console.WriteLine("队列消息:" + str);
    54                     //回复确认
    55                     channel.BasicAck(ea.DeliveryTag, false);
    56 
    57                 }
    58             }
    59             return Task.CompletedTask;
    60         }

    4.消费死信队列

     1     public class DeadListener : RabbitListener
     2     {
     3 
     4         #region Fileds
     5 
     6         // 因为Process函数是委托回调,直接将其他Service注入的话两者不在一个scope,
     7         // 这里要调用其他的Service实例只能用IServiceProvider CreateScope后获取实例对象
     8         private readonly IServiceProvider _services;
     9         private readonly ILogger<RabbitListener> _logger;
    10 
    11         #endregion
    12 
    13 
    14         #region Ctors
    15 
    16         public DeadListener(IServiceProvider services, IConfiguration configuration, ILogger<RabbitListener> logger) : base(configuration)
    17         {
    18             RouteKey = "queue-dead-routing-jd";
    19             QueueName = "zzhello_dead_Jd";
    20             _logger = logger;
    21             _services = services;
    22         }
    23 
    24         #endregion
    25 
    26 
    27         #region Methods
    28 
    29         protected override bool Process(string message)
    30         {
    31             var taskMessage = message;
    32             if (taskMessage == null)
    33             {
    34                 // 返回false 的时候回直接驳回此消息,表示处理不了
    35                 return false;
    36             }
    37             try
    38             {
    39                 using (var scope = _services.CreateScope())
    40                 {
    41                     var xxxService = scope.ServiceProvider.GetRequiredService<IOrderService>();
    42                     //_logger.LogInformation($"开始更新订单状态:UpdateOrderCancel,message:{message}");
    43                     //LoggerHelper.Write($"开始更新订单状态:UpdateOrderCancel,message:{message}");
    44                     var re= xxxService.UpdateOrderCancel(Guid.Parse(taskMessage)).Result;
    45                     //_logger.LogInformation($"结束更新订单状态:UpdateOrderCancel,message:{message},result:{re}");
    46                     //LoggerHelper.Write($"结束更新订单状态:UpdateOrderCancel,message:{message},result:{re}");
    47                     if (re)
    48                     {
    49                         return true;
    50                     }
    51                     else
    52                     {
    53                         return false;
    54                     }
    55                 }
    56             }
    57             catch (Exception ex)
    58             {
    59                 _logger.LogInformation($"Process fail,error:{ex.Message},stackTrace:{ex.StackTrace},message:{message}");
    60                 _logger.LogError(-1, ex, "Process fail");
    61                 LoggerHelper.Write($"DeadListener 自动更新订单状态报错,错误提示  :{ex}");
    62                 return false;
    63             }
    64         }
    65         #endregion
    66     }
     1     public class RabbitListener : IHostedService
     2     {
     3         private readonly IConnection _connection;
     4         private readonly IModel _channel;
     5 
     6         protected RabbitListener(IConfiguration configuration)
     7         {
     8             try
     9             {
    10                 var factory = new ConnectionFactory
    11                 {
    12                     // 这是我这边的配置,自己改成自己用就好
    13                     UserName = configuration["RabbitMQConfig:RabbitUserName"],//用户名
    14                     Password = configuration["RabbitMQConfig:RabbitPassword"],//密码
    15                     HostName = configuration["RabbitMQConfig:RabbitHost"]//rabbitmq ip
    16                     //Port = options.Value.RabbitPort,
    17                 };
    18                 _connection = factory.CreateConnection();
    19                 _channel = _connection.CreateModel();
    20             }
    21             catch (Exception ex)
    22             {
    23                 Console.WriteLine($"RabbitListener init error,ex:{ex.Message}");
    24             }
    25         }
    26 
    27         public Task StartAsync(CancellationToken cancellationToken)
    28         {
    29             Register();
    30             return Task.CompletedTask;
    31         }
    32         protected string RouteKey;
    33         protected string QueueName;
    34 
    35         // 处理消息的方法
    36         protected virtual bool Process(string message)
    37         {
    38             throw new NotImplementedException();
    39         }
    40 
    41         // 注册消费者监听在这里
    42         private void Register()
    43         {
    44             Console.WriteLine($"RabbitListener register,routeKey:{RouteKey}");
    45             // channel.ExchangeDeclare(exchange: "exchange-D", type: "topic");
    46             _channel.ExchangeDeclare("exchange-D", ExchangeType.Direct, true, false, null);
    47             _channel.QueueDeclare(QueueName, true, false, false, null);
    48             _channel.QueueBind(QueueName, "exchange-D", RouteKey);
    49 
    50             //启用QoS,每次预取10条,避免消费不过来导致消息堆积在本地缓存
    51             _channel.BasicQos(0, 10, false);
    52             var consumer = new EventingBasicConsumer(_channel);
    53             consumer.Received += (model, ea) =>
    54             {
    55                 var body = ea.Body;
    56                 var message = Encoding.UTF8.GetString(body);
    57                 var result = Process(message);
    58                 if (result)
    59                 {
    60                     _channel.BasicAck(ea.DeliveryTag, false);//启用手动ack机制后,没有及时ack导致的队列异常(Unacked过多)
    61                 }
    62                 else
    63                 {
    64                     _channel.BasicNack(ea.DeliveryTag, false, true);// 启用nack+重入队 机制后,导致的死循环(Ready过多)
    65                 }
    66 
    67             };
    68             _channel.BasicConsume(queue: QueueName, consumer: consumer);
    69         }
    70 
    71         public void DeRegister()
    72         {
    73             _connection.Close();
    74         }
    75 
    76 
    77         public Task StopAsync(CancellationToken cancellationToken)
    78         {
    79             _connection.Close();
    80             return Task.CompletedTask;
    81         }
    82     }

    消息队列常见问题

      在这里我先说一下我遇到的问题吧!不知道什么原因会产生异常消息,也就是业务失败产生的unasked消息,这个问题该如何处理

      处理方式是启用nack+重入队 机制后,但是这种方式会 导致的死循环(Ready过多),所以要启用Qos和ack机制后,没有及时ack导致的队列堵塞

      启用QoS,每次预取5条消息,避免消息处理不过来,全部堆积在本地缓存里

      channel.BasicQos(0, 5, false);

      开启QoS,当RabbitMQ的队列达到5条Unacked消息时,不会再推送消息给Consumer;

      这样问题就解决了!!!!!

      其他常见问题参考     https://www.cnblogs.com/sw008/p/11054331.html

  • 相关阅读:
    C/C++产生随机数
    BNUOJ34973Liserious战队
    oracle ebs 12.20 安装成功其过程失败日记及总结(1)
    HDU 2544 最短路 SPFA 邻接表 模板
    GridView编辑删除操作
    Hibernate_10_继承的例子_单表
    String不变性
    Mac在结构quick cocos2d-x编译环境
    Hash散列算法 Time33算法
    南京地图南京全套的卫星地图下载 百度高清卫星地图包括道路、标签信息叠加
  • 原文地址:https://www.cnblogs.com/zhao987/p/12532295.html
Copyright © 2011-2022 走看看