zoukankan      html  css  js  c++  java
  • MQ消费失败,自动重试思路

    在遇到与第三方系统做对接时,MQ无疑是非常好的解决方案(解耦、异步)。但是如果引入MQ组件,随之要考虑的问题就变多了,如何保证MQ消息能够正常被业务消费。所以引入MQ消费失败情况下,自动重试功能是非常重要的。这里不过细讲MQ有哪些原因会导致失败。

    MQ重试,网上有方案一般采用的是,本地消息表+定时任务,不清楚的可以自行了解下。

    我这里提供一种另外的思路,供大家参考。方案实现在RabbitMQ(安装延迟队列插件)+.NET CORE 3.1

    设计思路为:

    内置一个专门做重试的队列,这个队列是一个延迟队列,当业务队列消费失败时,将原始消息投递至重试队列,并设置延迟时间,当延迟时间到达后。重试队列消费会自动将消息重新投递会业务队列,如此便可以实现消息的重试,而且可以根据重试次数来自定义重试时间,比如像微信支付回调一样(第一次延迟3S,第二次延迟10S,第三次延迟60S),上面方案当然要保证MQ消费采用ACK机制。

    那么如何让重试队列知道原来的业务队列是哪个,我们定义业务队列时,可以通过MQ的消息头内置一些信息:队列类型(业务队列也有可能是延迟队列)、重试次数(默认为 0)、交换机名称、路由键。业务队列消费失败时,将消息投递至重试队列时,则可以把业务队列的消息头传递至重试队列,那么重试队列消费,重新将消息发送给业务队列时,则可以知道业务队列所需要的所有参数(需要将重试次数+1)。

    下面结合代码讲下具体实现:

    我们先看看业务队列发送消息时,如何定义

    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //初始化,需要内置一些消费异常,自动重试参数 
                    if (headers == null)
                    {
                        headers = new Dictionary<string, object>();
                    }
                    //ttlSecond 有值表示消息将投递到延迟队列
                    //因为可以自建延迟队列,ttlSecond是业务标识 
                    if (ttlSecond.HasValue)
                    {
                        if (!headers.ContainsKey("x-delay"))
                        {
                            headers.Add("x-delay", ttlSecond * 1000);
                        }
                        else
                        {
                            headers["x-delay"] = ttlSecond * 1000;
                        }
                        //queueType = 1表示延迟队列 
                        //框架内部重试机制需要此参数,因为重新投递到原始队列时,需要区分普通队列还是延迟队列
                        if (!headers.ContainsKey("queueType"))
                        {
                            headers.Add("queueType", 1);
                        }
                    }
                    else
                    {
                        //queueType = 0表示普通队列
                        if (!headers.ContainsKey("queueType"))
                        {
                            headers.Add("queueType", 0);
                        }
                    }
                    //重试次数
                    if (!headers.ContainsKey("retryCount"))
                    {
                        headers.Add("retryCount", 0);
                    }
                    //原始交换机名称
                    if (!headers.ContainsKey("retryExchangeName"))
                    {
                        headers.Add("retryExchangeName", exchangeName);
                    }
                    //原始路由键
                    if (!headers.ContainsKey("retryRoutingKey"))
                    {
                        headers.Add("retryRoutingKey", routingKey);
                    }
                    properties.Headers = headers;
                    channel.BasicPublish(exchangeName, routingKey, properties, Encoding.UTF8.GetBytes(message));

     这里会内置上面描述的重试队列需要的参数

    再来看看业务队列消费如何处理,这里因为会自动重试,所以保证业务队列每次都是消费成功的(MQ才会将消息从队列中删除)

           //每次消费一条
                channel.BasicQos(0, 1, false);
    
                //定义消费者
                EventingBasicConsumer eventingBasicConsumer = new EventingBasicConsumer(channel);
                eventingBasicConsumer.Received += async (sender, basicConsumer) =>
                {
                    string body = Encoding.UTF8.GetString(basicConsumer.Body.ToArray());
                    Deadletter deadletter = null;
                    try
                    {
                        string errorMsg = await action(body);
                        if (!errorMsg.IsNullOrWhiteSpace())
                        {
                            deadletter = new Deadletter() { Body = body, ErrorMsg = errorMsg };
                            _logger.LogError($"业务队列消费异常(已知),消息头:{JsonUtils.Serialize(basicConsumer.BasicProperties.Headers)}{Environment.NewLine}原始消息:{body}{Environment.NewLine}错误:{errorMsg}");
                        }
                    }
                    catch (Exception ex)
                    {
                        deadletter = new Deadletter() { Body = body, ErrorMsg = ex.Message };
                        _logger.LogError(ex, $"业务队列消费异常(未知),消息头:{JsonUtils.Serialize(basicConsumer.BasicProperties.Headers)}{Environment.NewLine}原始消息:{body}");
                    }
                    //必定应答,不管消费成功还是失败
                    channel.BasicAck(basicConsumer.DeliveryTag, false);
                    //消费失败,投递消息至重试队列
                    if (deadletter != null)
                    {
                        PublishRetry(deadletter, basicConsumer.BasicProperties.Headers);
                    }
                };

     我们再看看PublishRetry重试队列的推送方法如何实现

    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //x-delay为延迟队列的延迟时间
                    //如果第一次进行重试,请求头中是不存在延迟时间的,需要新增
                    //因为可以进行多次重试,所以第二次时,就会存在延迟时间
                    //但因为可以自建用于业务的延迟队列,所以自建的延迟队列,第一次重试也会存在x-delay,但是如果自建的延迟队列失败进行重试时,不能还使用自身的延迟时间,所以需要重新设置为系统默认的失败重试时间
                    if (!headers.ContainsKey("x-delay"))
                    {
                        headers.Add("x-delay", 0);
                    } 
                    //重试次数
                    int retryCount = Convert.ToInt32(headers["retryCount"]);
                    //可以根据重试次数,实现上面说描述的微信回调的重试时间变长效果
                    headers["x-delay"] = retryCount * 1000;
                    properties.Headers = headers;
                    channel.BasicPublish(RETRY_EXCHANGE_NAME, string.Empty, properties, Encoding.UTF8.GetBytes(JsonUtils.Serialize(deadletter)));

    重试队列的消费者实现

    channel.BasicQos(0, 1, false); 
                EventingBasicConsumer eventingBasicConsumer = new EventingBasicConsumer(channel);
                eventingBasicConsumer.Received += async (sender, basicConsumer) =>
                {
                    string message = Encoding.UTF8.GetString(basicConsumer.Body.ToArray());
                    Deadletter deadletter = JsonUtils.Deserialize<Deadletter>(message); 
                    IDictionary<string, object> headers = basicConsumer.BasicProperties.Headers;
                    //请求头中肯定会有如下参数,因为在框架代码中已经内置
                    //重试次数
                    int retryCount = Convert.ToInt32(headers["retryCount"]);
                    //原队列类型,如果原队列本身为延迟队列,重试投递的时候,必须也要为延迟队列,只是不需要延迟时间,投递回原队列后,会立马重新消费
                    int queueType = Convert.ToInt32(headers["queueType"]);
                    //原队列名称
                    string retryExchangeName = Encoding.UTF8.GetString((byte[])headers["retryExchangeName"]);
                    //原路由键
                    string retryRoutingKey = Encoding.UTF8.GetString((byte[])headers["retryRoutingKey"]);
                    if (retryCount <= 10)
                    {
                        headers["retryCount"] = retryCount + 1;
                        //原有队列为普通队列,重新投递时,也需要投递为普通队列类型
                        if (queueType == 0)
                        {
                            PublishMessage(retryExchangeName, retryRoutingKey, deadletter.Body, basicConsumer.BasicProperties.Headers);
                        }
                        //原有队列为延迟队列,重新投递时,也需要投递为延迟队列类型
                        else
                        {
                            PublishMessage(retryExchangeName, retryRoutingKey, deadletter.Body, basicConsumer.BasicProperties.Headers, 0);
                        }
                    }
                    //超过重试最大次数不再处理,交由外部委托来处理死信
                    else
                    {
                        await deadLetterTask(retryExchangeName, deadletter.Body, deadletter.ErrorMsg);
                    }
                    //应答
                    channel.BasicAck(basicConsumer.DeliveryTag, false);
                };
                //开启监听
                channel.BasicConsume(RETRY_QUEUE_NAME, false, eventingBasicConsumer);

    然后在系统中,内置重试队列消费者

    //注册框架内自动重试
                _rabbitMQClient.SubscribeRetry(async (exchangeName, message, errorMsg) =>
                {
                    string content = $"原始交换机名称:{exchangeName}{Environment.NewLine}" +
                                 $"原始消息内容:{message}{Environment.NewLine}" +
                                 $"错误消息:{errorMsg}";
    
                    await PushWeChatMessage(content);
                });

     上述为我们MQ实现自动重试的一种方案,当然中间包括每次如果消费失败都可以发送通知,来通知业务人员关注消费失败的情况。可以自定义最大重试次数、重试间隔时间、死信的处理,这里仅仅是MQ重试机制的一种思路而已,大家如果有更好的方案,欢迎多多沟通。

    作者:LyIng.Net
    出处:http://www.cnblogs.com/jiangbiao/
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

  • 相关阅读:
    用pygame实现打飞机游戏-3-显示飞机和控制飞机移动
    用pygame实现打飞机游戏-2-检测键盘
    最好听的钢琴曲排行榜 世界上最好听的钢琴曲
    使用gulp构建nodejs,你只需要记住5个函数
    Linux删除文件夹命令
    前端构建工具gulpjs的使用介绍及技巧
    HTML5 LocalStorage 本地存储
    jquery新窗口打开链接
    Sublime text 3 如何格式化HTML代码
    jquery滚动条加载数据
  • 原文地址:https://www.cnblogs.com/jiangbiao/p/15748007.html
Copyright © 2011-2022 走看看