zoukankan      html  css  js  c++  java
  • 2.6.8 Masstransit异常处理和总结

    一、异常处理

    1.1异常与重试

     Exception

    public class SubmitOrderConsumer :
        IConsumer<SubmitOrder>
    {
        public Task Consume(ConsumeContext<SubmitOrder> context)
        {
            throw new Exception("Very bad things happened");
        }
    }
    

    UseMessageRetry  

    var sessionFactory = CreateSessionFactory();
     
    var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        cfg.Host("rabbitmq://localhost/");
     
        cfg.ReceiveEndpoint("submit-order", e =>
        {
            e.UseMessageRetry(r => r.Immediate(5));
     
            e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
        });
    });

    1.2重试配置

     

    // 立即重试:一共连续重试10次
    ep.UseMessageRetry(r => r.Immediate(10));
     
    // 间隔重试:一共重试10次,每次间隔10秒
    ep.UseMessageRetry(r => r.Interval(10, TimeSpan.FromSeconds(10)));
     
    // 多个间隔重试:5秒后第一次,5+10秒后第二次,5+10+15秒后第三次
    ep.UseMessageRetry(r => r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(15)));
     
    // 指数级间隔重试:共10次,每次间隔:当前重试次数 * 60秒
    ep.UseMessageRetry(r => r.Exponential(10, TimeSpan.FromSeconds(60), TimeSpan.FromHours(24), TimeSpan.FromSeconds(60)));
     
    // 每次叠加50秒
    ep.UseMessageRetry(r => r.Incremental(10, TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(50)));

    1.3重试条件

    e.UseMessageRetry(r => 
    {
        r.Handle<ArgumentNullException>();
        r.Ignore(typeof(InvalidOperationException), typeof(InvalidCastException));
        r.Ignore<ArgumentException>(t => t.ParamName == "orderTotal");
    }); 

    过滤某些异常类型不进行重试

    1.4重新投递信息

    cfg.ReceiveEndpoint("submit-order", e =>
    {
        e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
        e.UseMessageRetry(r => r.Immediate(5));
        e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
    });

    消息冲队列移除之后,在一定时间之后重新投入消息队列。需要配置调度模块(scheduling)

    1.5信箱

    cfg.ReceiveEndpoint("submit-order", e =>
    {
        e.UseScheduledRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
        e.UseMessageRetry(r => r.Immediate(5));
        e.UseInMemoryOutbox();
     
        e.Consumer(() => new SubmitOrderConsumer(sessionFactory));
    });

    有些消息是在 consume 方法中发送或发布的,如果在发送之后 consume 中产生了异常,那原来发出去的消息就需要撤回,如果使用信箱之后,在 consume 中要发布/发送的消息就会先暂存在内存中直到 consume 方法成功之后才真正发出去

    二、其他

    2.1Fault

    public interface Fault<T>
        where T : class
    {
        Guid FaultId { get; }
        Guid? FaultedMessageId { get; }
        DateTime Timestamp { get; }
        ExceptionInfo[] Exceptions { get; }
        HostInfo Host { get; }
        T Message { get; }
    }

    Fault 消息在异常的时候会发布出来

    2.2Consuming Faults

    public class DashboardFaultConsumer :
        IConsumer<Fault<SubmitOrder>>
    {
        public async Task Consume(ConsumeContext<Fault<SubmitOrder>> context)
        {
            // update the dashboard
        }
    }

    Fault 消息也是可以进行订阅的

    2.3Error Pipe

    cfg.ReceiveEndpoint("input-queue", ec =>
    {
        ec.DiscardFaultedMessages();
    });

    默认情况下错误的消息会被投递到了 _error 队列,可以配置直接抛弃错误信息

    2.4Dead-Letter Pipe

    cfg.ReceiveEndpoint("input-queue", ec =>
    {
        ec.DiscardSkippedMessages();
    });

    死信队列:没有消费者的消息会被移到 _skipped 队列,但可以配置为不移到 _skipped 队列

    三、高级功能

    • 持久化
    • Saga 事件串
    • 调度
    • Courier 最终一致性
    • 监控
  • 相关阅读:
    首篇
    typedef 的几种用法
    ftp 命令
    (zt)STL中的map与hash_map
    (zt)关于UDP网络游戏服务器的一些探讨
    (zt)UDP编程的时候,一次发送多少bytes好?
    (zt)界面技术概述
    (zt)这是对目前大部分平台都适用的内存对齐规则的定义
    (zt)高性能I/O设计模式Reactor和Proactor
    (zt)ACE高效PROACTOR编程框架一ClientHandle
  • 原文地址:https://www.cnblogs.com/duyao/p/14329270.html
Copyright © 2011-2022 走看看