zoukankan      html  css  js  c++  java
  • EasyNetQ(RabbitMQ)在处理消息时,如果抛出异常,继续不断发送到订阅队列,不断处理(也就是不自动确认消息已到达)

    默认情况下,EasyNetQ的消息处理过程中,如果throw exception,那么,依然是认为消息已经送达,不会再次推送,为了让RabbitMQ再次推送,可以这么实现:

            public sealed class AlwaysRequeueErrorStrategy : IConsumerErrorStrategy
            {
                public void Dispose()
                {
                }
    
                public AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
                {
                    return AckStrategies.NackWithRequeue;
                }
    
                public AckStrategy HandleConsumerCancelled(ConsumerExecutionContext context)
                {
                    return AckStrategies.NackWithRequeue;
                }
            }
    
            [TestMethod]
            public void test()
            {
                var hostName = _configuration["RabbitMQ:HostName"];
                var mqport = _configuration["RabbitMQ:Port"];
                var userName = _configuration["RabbitMQ:UserName"];
                var password = _configuration["RabbitMQ:Password"];
    
                var connectionConfiguration = new ConnectionConfiguration
                {
    
                    Hosts = new List<HostConfiguration>
                    {
                        new HostConfiguration
                        {
                            Host = hostName,
                            Port = Convert.ToUInt16(mqport)
                        }
                    },
                    Port = Convert.ToUInt16(mqport),
                    VirtualHost = "/",
                    UserName = userName,
                    Password = password
                };
                var _bus = RabbitHutch.CreateBus(connectionConfiguration , x=> {
                    x.Register<IConsumerErrorStrategy, AlwaysRequeueErrorStrategy>();
                });
    
                var exchange = _bus.Advanced.ExchangeDeclare("ExchangeNAME_test", ExchangeType.Topic);
                
                var nickNameQu = _bus.Advanced.QueueDeclare("queuename2222");
                int count = 0;
                _bus.Advanced.Bind(exchange, nickNameQu, "RouterKey1");
                _bus.Advanced.Consume(nickNameQu, (data, properties, info) =>
                {
                    count++;
                    if (count < 2)
                        throw new Exception("my error");
                     var msg = Encoding.UTF8.GetString(data);
                });
    
                Task.Run(() => {
                    _bus.PublishAsync(new UpdateAvaterMessage() {  Avater = "abcc22" }, "RouterKey1");
                });
               
                Thread.Sleep(100000);
            }

    原理是重写IConsumerErrorStrategy

    (如果不是使用EasyNetQ,传统的RabbitMQ库是用手动ack实现)

    另外,如果消息处理过程中,服务器死机,这种情况消息是会重发的,不需要担心

  • 相关阅读:
    SuperMap关联外部数据库
    617Tips
    第一次上课
    文本字段也可做自定义专题图
    Oracle笔记一
    Oracle笔记二
    复制追加数据集失败
    夏季恋语
    最浪漫的事
    爱了你那么多年
  • 原文地址:https://www.cnblogs.com/IWings/p/14681745.html
Copyright © 2011-2022 走看看