zoukankan      html  css  js  c++  java
  • [2017-10-25]Abp系列——集成消息队列功能(基于Rebus.Rabbitmq)


    本系列目录:Abp介绍和经验分享-目录

    前言

    由于提交给ABP作者的集成消息队列机制的PR还未Review完成,本篇以Abplus中的代码为基准来介绍ABP集成消息队列机制的方案。

    Why

    为什么需要消息队列机制?

    1. 发布-订阅模式,解耦业务
    2. 非必须强一致性的业务场景,借助消息队列剥离到离线处理
    3. 各种通知,站内通知、邮件通知、手机短信、微信推送

    以上几点,并非互相独立,是几个互相联系的特点。
    开发框架拥有消息队列机制的好处,可以通过以下几个典型场景举例来说明:

    订单支付成功

    当第三方支付平台回调通知订单支付成功时,如果没有消息队列,那么:

    • 我们必须把后续的业务逻辑和修改订单支付状态的代码都写在一起,根据业务的复杂程度,这个响应时间可能会非常长,容易造成超时
    • 或者第三方支付平台短时间重试多次,造成业务逻辑重复执行
    • 也可能业务逻辑比较复杂,后续其他逻辑(通知推送之类)处理异常,导致整个支付成功逻辑全部回滚

    而当我们有消息队列机制支持时:

    • 我们一接收到第三方支付平台回调,立马仅处理订单状态和核心的业务逻辑(比如支付后扣库存),其他业务逻辑通过订阅消息去处理
    • 甚至可以一接收到第三方支付平台回调,立即构建订单支付成功消息放入消息队列,这样对于第三方支付平台可以立即收到处理成功的响应
    • 后续其他类似通知推送的非关键需求,其成败不影响关键逻辑,而且可以一直重试直到成功

    物流信息同步

    做电商网站时,如果订单比较多,每个订单都需要订阅第三方物流信息,类似上面支付,当物流信息(一个订单多条运送路径记录)推送过来时,可能会比较密集(短时间内几千上万个请求),直接写入数据库(需要先比对,只写入增量部分)可能不是一个好方案。
    这个时候,放到消息队列里,由消费端按自己的节奏一条条处理最为保险。

    类似的数据同步方案,对实时性要求不太高,单个处理逻辑比较复杂,短时间内数量较大,都可以考虑排队处理。

    批量业务处理

    复杂业务的批量处理一直是比较头疼的事情(查询条件、事务等),如果有消息队列,只要查询得到所有可能需要处理的对象,放队列排队,转变成单个业务处理,可以避免很多麻烦,并且可以控制处理进度,也不必担心其中可能发生几个异常影响了其他正常处理。而且,转成单个业务处理,可以继续发布事件和消息,进行后续流程和业务的触发,比如邮件通知或短信通知等供应商限制了调用频率的服务。

    高负载时,削峰填谷

    如题,这一点是指将请求进行排队的能力,可以临时增加处理端以提高请求处理能力,此类方案大多涉及UI交互的变更,具体场景不展开。

    这一块,如果UI交互采用异步模式(比如用户提交订单,但不立马告诉用户订单是否创建成功,只反馈创建订单的请求提交成功,订单真正创建成功以其他方式通知用户,再让用户去支付),则属于上面提到过的离线处理,本文介绍的集成方式可以直接支持;

    如果采用同步模式(提交请求,经过较长时间后响应,需要用户等待,处理依然可以通过消息队列排队),则可能需要具体挖掘下RabbitMQ或您采用的其他消息中间件的Request-Response 模式用法,当然也需要注意一下Http请求处理超时的问题。

    其他好处

    总的来说:

    业务逻辑编码方面,思维方式逐渐脱离面向数据 ,可以更加关注事件关注消息,更加面向领域面向对象 ,核心业务逻辑可维护性增强、可扩展性增强。

    系统架构风格方面,支持事件驱动消息驱动可监控性可扩展性,更利于系统演化,后期任何时候可通过订阅消息无侵入式的采集数据,拓展功能。

    Tips:
    消息队列的可靠性,比如RabbitMQ,是经过电信行业检验的,集群、持久化、自动故障转移等特性都支持,而且集群配置和集群升级方案都比较简单方便且成熟。

    RabbitMQ安装请参考RabbitMQ Notes

    How

    前面啰嗦了一大堆,下面进入正文,ABP如何集成消息队列机制。

    如果嫌本文排版不好,也可以直接看提交给ABP作者的集成消息队列机制的PR,PR中的说明和用法示例基本和abplus扩展库中的一致。

    1.增加命名空间

    这里强调一下概念区别,消息(Messages)和事件(Events)是不同的。

    1. 在ABP中,有个EventBus体系,Events和各种Handlers,重点是进程内的业务逻辑解耦,Handlers的代码是可以共享工作单元(UnitOfWork)的,对于允许延后验证的逻辑,是可以随时添加一个单独的Handler去校验并抛出异常,即可以回滚工作单元(事务);
    2. 而Messages体系,在Abp框架内,目前并未集成。Messages的重点是向进程外的系统(外部系统)进行通知,所以有工作单元和事务的时候,一定是事务提交成功后才对外通知消息,否则一旦对外通知了,又发生了事务回滚,再追回消息或者撤销消息就会比较麻烦(消息关联事务或者分布式事务是另一种情况,这里不做介绍)。

    所以,我们首先在Abplus程序集中引入消息发布器的抽象概念:

    IMqMessagePublisher

    namespace Abp.MqMessages
    {
        /// <summary>
        /// 消息发布接口
        /// </summary>
        public interface IMqMessagePublisher : ITransientDependency
        {
            /// <summary>
            /// 发布
            /// </summary>
            /// <param name="mqMessages"></param>
            void Publish(object mqMessages);
    
            /// <summary>
            /// 发布
            /// </summary>
            /// <param name="mqMessages"></param>
            /// <returns></returns>
            Task PublishAsync(object mqMessages);
        }
    }
    

    并且提供空实现:

    NullMqMessagePublisher

    限于篇幅,除简单示意和关键代码外,本文只给github源码链接,不再贴出详细代码。

    2.提供IMqMessagePublisher的RebusRabbitMQ实现

    我们集成RabbitMQ,借用了Rebus框架的实现,Rebus在对接RabbitMQ上做了比较好的抽象和封装,Rebus Wiki

    程序集Abplus.MqMessages.RebusCore,核心实现:

    RebusRabbitMqPublisher

    其中,以反射方式,抓取了一些Session信息:

    private void TryFillSessionInfo(object mqMessages)
    {
        if (AbpSession.UserId.HasValue)
        {
            var operatorUserIdProperty = mqMessages.GetType().GetProperty("OperatorUserId");
            if (operatorUserIdProperty != null && (operatorUserIdProperty.PropertyType == typeof(long?)))
            {
                operatorUserIdProperty.SetValue(mqMessages, AbpSession.UserId);
            }
        }
    
        if (AbpSession.TenantId.HasValue)
        {
            var tenantIdProperty = mqMessages.GetType().GetProperty("TenantId");
            if (tenantIdProperty != null && (tenantIdProperty.PropertyType == typeof(int?)))
            {
                tenantIdProperty.SetValue(mqMessages, AbpSession.TenantId);
            }
        }
    }
    

    程序集Abplus.MqMessages.RebusCore是同时被Abplus.MqMessages.RebusPublisherAbplus.MqMessages.RebusRabbitMqConsumer依赖的,以便消费端依然具有发布消息的能力。

    3.封装发布模块,便于项目使用

    程序集Abplus.MqMessages.RebusPublisher,发布模块:

    RebusRabbitMqPublisherModule

    这个模块,是封装消息队列的配置和启动连接,使用时,在项目启动模块上配好[DependsOn(typeof(RebusRabbitMqPublisherModule))],并引入命名空间Abp.Configuration.Startup,即可配置相关参数,示例如下:

    namespace Sample
    {
        [DependsOn(typeof(RebusRabbitMqPublisherModule))]
        public class SampleRebusRabbitMqPublisherModule : AbpModule
        {
            public override void PreInitialize()
            {
                Configuration.Modules.UseRebusRabbitMqPublisher()
                    .UseLogging(c => c.NLog())
                    .ConnectionTo("amqp://dev:dev@rabbitmq.local.cn/dev_host");
    
                Configuration.BackgroundJobs.IsJobExecutionEnabled = true;
            }
    
            public override void Initialize()
            {
                IocManager.RegisterAssemblyByConvention(Assembly.GetExecutingAssembly());
            }
    
            public override void PostInitialize()
            {
                Abp.Dependency.IocManager.Instance.IocContainer.AddFacility<LoggingFacility>(f => f.UseNLog().WithConfig("nlog.config"));
    
                var workManager = IocManager.Resolve<IBackgroundWorkerManager>();
                workManager.Add(IocManager.Resolve<TestWorker>());// to send TestMqMessage every 3 seconds
            }
        }
    }
    

    只要项目配置好模块依赖,即可在代码中通过构造函数注入或者属性注入使用IMqMessagePublisher接口进行消息发布。

    4.封装消费端模块

    同消息发布模块,消费模块在程序集Abplus.MqMessages.RebusConsumer中,代码见:

    RebusRabbitMqConsumerModule

    使用方式:

    namespace Sample
    {
        [DependsOn(typeof(RebusRabbitMqConsumerModule))]
        public class SampleRebusRabbitMqConsumerModule : AbpModule
        {
            public override void PreInitialize()
            {
                Configuration.Modules.UseRebusRabbitMqConsumer()
                    .UseLogging(c => c.NLog())
                    .ConnectTo("amqp://dev:dev@rabbitmq.local.cn/dev_host")
                    //以当前项目名作为队列名
                    .UseQueue(Assembly.GetExecutingAssembly().GetName().Name)
                    //register assembly whitch has rebus handlers
                    .RegisterHandlerInAssemblys(Assembly.GetExecutingAssembly());
            }
    
            public override void Initialize()
            {
                base.Initialize();
            }
    
            public override void PostInitialize()
            {
                Abp.Dependency.IocManager.Instance.IocContainer.AddFacility<LoggingFacility>(f => f.UseNLog().WithConfig("nlog.config"));
            }
        }
    }
    

    只要项目配置好模块依赖,消费端亦可在代码中通过构造函数注入或者属性注入使用IMqMessagePublisher接口进行消息发布。

    消费端的RebusHanlder示例:

    namespace Sample.Handlers
    {
        public class TestHandler : IHandleMessages<TestMqMessage>
        {
            public ILogger Logger { get; set; }
            public IMqMessagePublisher Publisher { get; set; }
            public TestHandler()
            {
                Publisher = NullMqMessagePublisher.Instance;
            }
    
            public async Task Handle(TestMqMessage message)
            {
                var msg = $"{Logger.GetType()}:{message.Name},{message.Value},{message.Time}";
                Logger.Debug(msg);
                await Publisher.PublishAsync(msg);//send it again!
            }
        }
    }
    

    5.忘了说,消息格式的定义

    消息的定义,不依赖任何框架,也不依赖Abp或者Abplus,因为前面发布接口IMqMessagePublisher定义时采用的类型是object

    void Publish(object mqMessages);
    

    例如:

    namespace Sample.MqMessages
    {
        /// <summary>
        /// Custom MqMessage Definition. No depends on any framework,this class library can be shared as nuget pkg.
        /// </summary>
        public class TestMqMessage
        {
            public string Name { get; set; }
            public string Value { get; set; }
            public DateTime Time { get; set; }
        }
    }
    

    这个消息定义可以单独作为一个程序集,发布到私有nuget服务器中,以便团队共享

    Tips & Extended

    Abplus中还提供了几个消息队列相关的常用实现:

    1. Abplus IMessageTracker 定义消费端处理幂等机制的接口
    2. Abplus.MqMessages EventDataPublishHandlerBase<TEventData, TMqMessage> EventData和MqMessage一对一的泛型版抽象发布Handler,这是EventDataHandler
    3. Abplus.MqMessages AbpMqHandlerBase是包含IMessageTracker属性的消费端MqHandler抽象基类
    4. Abplus.MqMessages.AuditingStore 审计日志发布到消息队列
    5. Abplus.MqMessages.RedisStoreMessageTracker 消费端消费行为的幂等支持,基于Redis存储,Rebus Handler的重试机制和幂等处理

    上述具体实现请参考Abplus代码库

  • 相关阅读:
    day10 Python 形参顺序
    为oracle中的表格增加列和删除列
    为mapcontrol中的图层设置透明度
    最大的愿望 2007-05-10
    动心 2004年后半年
    写在十年 2007-09-15 (写给L之三)
    致vi老大 2011.1
    如潮 2011.2
    自然人——女孩思绪 (2006-09-14 08:21:51)
    朋友(2003年)
  • 原文地址:https://www.cnblogs.com/personball/p/7762931.html
Copyright © 2011-2022 走看看