zoukankan      html  css  js  c++  java
  • 基于asp.net core 从零搭建自己的业务框架(三)

    前言

    根据业务处理部分,单体马上就能得知错误与否,快速做出处理,而分布式系统,会因为各种原因,无法如同单体一样立刻处理,所以这个时候需要 处理异常 的,做 补偿转移人工干预

    当然也可以直接在消费端做重试/限流和熔断,但是个人理解,不建议,处理失败的转移到低优先顺序的队列,由专门处理失败消费的部分来处理问题,在实际操作中,可以是单独的服务器,不影响业务流程线,处理失败则发送消息,人为干预。


    消费/执行错误的处理流程

    无论的消费订阅还是Rpc远程调用,一旦处理失败,都应当有 转移错误补偿以及人工干预 的异常处理

    Masstransit默认是有重试的,所以可以根据获取到的重试次数和错误信息,选择是 转移 还是直接 人工干预补偿则一般应用于支付相关服务,失败则立刻退款/人工处理,这种失败了,重试还是失败


    实例编写

    基于发布订阅的失败处理

            private async Task Execute(ConsumeContext context, ExceptionInfo exceptionInfo)
            {
                await Console.Out.WriteLineAsync($"Type:{exceptionInfo.ExceptionType} Message:{exceptionInfo.Message}");
    
                if (exceptionInfo.InnerException != null)
                {
                    await Execute(context, exceptionInfo.InnerException);
                }
            }
    
            public async Task Consume(ConsumeContext<Fault<PayOrderEvent>> context)
            {
                var retryCount = context.GetRetryCount();
    
                if (retryCount == 5)
                {
                    if (EndpointConvention.TryGetDestinationAddress<PayOrderEvent>(out Uri endPointUrl))
                    {
                        await context.Forward(endPointUrl);
                    }
                }
                else
                {
                    var exceptions = context.Message.Exceptions;
    
                    foreach (var exceptionInfo in exceptions)
                    {
                        await Execute(context, exceptionInfo);
                    }
                }
            }

    这是一段很简单的业务逻辑,PayOrderEvent 的消息消费失败后,则会进入这个专门处理 PayOrderEvent 消费失败的方法

    当重试处理第三次,获取错误信息,尝试处理错误信息,当重试第五次则转移队列

    基于Rpc的消息异常处理

    以上这段代码,只适用于发布订阅,Masstransit的Rpc模式则 无法捕获异常,只能在代码段上做try catch处理,或者引入AspectCore做捕获到异常

    基于Rpc模式的异常捕获

    基于代码而言try catch足以,但是出于项目管理的角度,应该采用AOP(Aspect-Oriented Programming)做代码的职责分离,预留出足够多的通用层,这里AOP部分采用Lemon的作品 AspectCore

    AOP特性部分实现如下

        public class RpcConsumerAttribute : AbstractInterceptorAttribute
        {
            public override async Task Invoke(AspectContext context, AspectDelegate next)
            {
                try
                {
                    await next(context);
                }
                catch
                {
                    var _arg = context.Parameters[0];
    
                    if (_arg is ConsumeContext consume)
                    {
                        var consumeType = consume.GetType().GetGenericArguments()[1];
                        var property = consume.GetType().GetProperty("Message").GetReflector();
                        var arg = property.GetValue(consume);
                        var argType = arg.GetType();
    
                        var faultType = typeof(FaultMessage<>).MakeGenericType(argType);
                        var constructor = faultType.GetConstructor(new[] { argType });
                        var constructorInfo = constructor.GetReflector();
                        var instance = constructorInfo.Invoke(arg);
    
                        var busControl = context.ServiceProvider.GetRequiredService<IBusControl>();
                        await busControl.Publish(instance);
                    }
                }
            }
        }

    简单的try catch,在异常处理部分使用反射获取出未正常执行的实体,然后发布 FaultMessage<Message> ,其中 Message 是未正常处理的实体类型

    其中反射里携带的 GetReflector 取自于Lemon的 AspectCore.Extensions.Reflection ,原理是构建Emit代码,构建委托,然后换成下来,从单纯的反射,变成调用委托执行反射代码,避免的传统的反射调用的嵌套反射执行

        public class FaultMessage<TEntity>
            where TEntity:class
        {
            public TEntity Entity { get; }
    
            public FaultMessage(TEntity entity)
            {
                Entity = entity;
            }
        }

    处理业务部分代码如下

            [RpcConsumer]
    public virtual async Task Consume(ConsumeContext<PayOrderEvent> context) { //... 业务代码段不再赘述 } public Task Consume(ConsumeContext<FaultMessage<PayOrderEvent>> context) { var message = context.Message; var sourceMessage = message.Entity; return Task.CompletedTask; }

    调用部分遵循Rpc调用即可

                    var entity = new PayOrderEvent
                    {
                        SourceId = 1,
                        TargetId = 2,
                        Money = 2000
                    };
                    var response = await busControl.Request<PayOrderEvent, PayOrderResponse>(entity);

    消费 PayOrderEvent 的 部分,只要正常的Ef Core原封不动的写数据即可,执行两次,造成 主键冲突,则被 RpcConsumerAttribute 捕获异常,触发发布到 FaultMessage<PayOrderEvent> 的消费端即可


    后话

    引入 AspectCore 的这部分,踩了一些雷,感谢社区的 茶姨以及柠檬的帮助 深感个人实力非常薄弱,很多东西未能深入理解,学到用时方恨少

    打个小广告

    如果有技术交流可以加NCC的群 24791014、436035237,我在群里,有任何关于asp.net core/Masstransit的问题或者建议都可以与我交流,非常欢迎

    示例代码:

    https://github.com/htrlq/Crud.Sample

  • 相关阅读:
    stenciljs 学习四 组件装饰器
    stenciljs 学习三 组件生命周期
    stenciljs 学习二 pwa 简单应用开发
    stenciljs ionic 团队开发的方便web 组件框架
    stenciljs 学习一 web 组件开发
    使用npm init快速创建web 应用
    adnanh webhook 框架 hook rule
    adnanh webhook 框架 hook 定义
    adnanh webhook 框架request values 说明
    adnanh webhook 框架execute-command 以及参数传递处理
  • 原文地址:https://www.cnblogs.com/NCoreCoder/p/12585502.html
Copyright © 2011-2022 走看看