zoukankan      html  css  js  c++  java
  • Rebus消息总线

    这里主要讲一下我基于Rebus写的一个ABP框架的模块


     
    目录结构

    对于Rebus网上的资料很少,其实我对于服务总线也不是很理解 。。个人理解的就是像ABP中的EventBus那样的,但是集成了一些消息队列像MSMQ,RabbitMQ等。

    废话不多说,下面主要讲下几个主要的文件

     
    RebusRabbitMqModule

    这个呢就是ABP的模块写法,详细的可以去ABP官网看下,这里主要的代码是在这里

                var moduleConfig = IocManager.Resolve<IRebusRabbitMqModuleConfig>();
    
                if (moduleConfig.Enabled)
                {
                    var rebusConfig = Configure.With(new CastleWindsorContainerAdapter(IocManager.IocContainer));
    
                    if (moduleConfig.LoggingConfigurer != null)
                    {
                        //配置Rebus用哪种工具来记录日志,我这里用的Log4net
                        rebusConfig.Logging(moduleConfig.LoggingConfigurer);
                    }
    
                    rebusConfig.Serialization(moduleConfig.SerializerConfigurer);
    
                    if (moduleConfig.OptionsConfigurer != null)
                    {
                        //自定义配置
                        rebusConfig.Options(moduleConfig.OptionsConfigurer);
                    }
    
                    rebusConfig.Options(c =>
                    {
                        c.SetMaxParallelism(moduleConfig.MaxParallelism);
                        c.SetNumberOfWorkers(moduleConfig.NumberOfWorkers);
                    });
    
                    if (moduleConfig.MessageAuditingEnabled)
                    {
                        //消息审计队列名称
                        rebusConfig.Options(o => o.EnableMessageAuditing(moduleConfig.MessageAuditingQueueName));
                    }
    
                    var mqMessageTypes = new List<Type>();
                    //通过反射取到所有继承IHandleMessages的类进行消息订阅
                    foreach (var assembly in moduleConfig.AssemblysIncludeRebusMqMessageHandlers)
                    {
                        IocManager.IocContainer.AutoRegisterHandlersFromAssembly(assembly);
    
                        mqMessageTypes.AddRange(assembly.GetTypes()
                            .Where(t => t.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == typeof(IHandleMessages<>)))
                            .SelectMany(t => t.GetInterfaces())
                            .Distinct()
                            .SelectMany(t => t.GetGenericArguments())
                            .Distinct());
                    }
                    //这个就是配置使用RabbitMq进行消息通信的方法,具体的去看Rebus上的文档
                    _bus = rebusConfig.Transport(c => c.UseRabbitMq(moduleConfig.ConnectString, moduleConfig.QueueName)).Start();
    
                    //Subscribe messages
                    mqMessageTypes = mqMessageTypes.Distinct().ToList();
    
                    foreach (var mqMessageType in mqMessageTypes)
                    {
                        _bus.Subscribe(mqMessageType);
                    }
                }
    

    模块的使用

    找到你需要引用模块的地方,如下图


     
    添加依赖

    然后进行对应的配置


     
    image.png

    里面的RabbitMqUrl是你本地RabbitMq的访问地址,比如我的是amqp://guest:guest@127.0.0.1:5672/

    接下来就是如何使用进行消息的发送和处理

    像其它的EventBus一样,需要先建立EventData

     public class Test
        {
            public string Name { get; set; }
        }
    

    然后是EventHander

    public class TestHandler : EventDataConsumerHandlerBase<Test>
        {
            public override Task Handle(Test message)
            {
                //这里就是写你需要对message进行怎样的处理
                return base.Handle(message);
            }
        }
    

    最后就是发布,我这里是在AppService里面进行消息的发送

     #region 构造函数
            private readonly IRepository<BaseItem> _baseItemRepository;
            private readonly ICacheManager _cacheManager;
            private readonly IMqMessagePublisher _iMqMessagePublisher;//依赖注入
            public BaseItemAppService(IRepository<BaseItem> baseItemRepository, ICacheManager cacheManager, IMqMessagePublisher iMqMessagePublisher) : base(baseItemRepository)
            {
                this._cacheManager = cacheManager;
                this._baseItemRepository = baseItemRepository;
                _iMqMessagePublisher = iMqMessagePublisher;
            }
            #endregion
    
            #region 增删改查
    
            protected override IQueryable<BaseItem> CreateFilteredQuery(BaseItemSearchDto input)
            {
                //消息的发布
                _iMqMessagePublisher.Publish(new Test { Name = "123" });
                return base.CreateFilteredQuery(input)
                        .WhereIf(input.DisplayName.IsNotNullOrEmpty(), m => m.DisplayName.Contains(input.DisplayName))
                        .WhereIf(input.TypeId != null, m => m.TypeId == input.TypeId);
            }
           #endregion
    

    这样就可以了

    我讲的不是很详细(我表达能力不行),大家可以去直接看我的代码,我就做了简单的封装不是很麻烦。大家可能会问我集成这个之后的业务场景是什么?我就把我用到的讲下吧:

    • 订单的处理:同一时间可能有很多个订单,这时可以把他发布到队列中一个一个推送进行处理,而且未标记完成的订单,就算你程序报错了,他也会自动重新推送好像默认是5次
    • 扫码入库的时候:仓库管理员扫码入库的速度是很快的,但后台程序需要一个比对操作比较耗时,这时候也可以把扫的码加入到队列中慢慢处理。

    Rebus我研究不多,有些隐藏的好东西我可能暂时没加入,希望能多提点提点。。还有服务总线是啥?

    最后附上几个地址:



    作者:邵佳楠
    链接:https://www.jianshu.com/p/5961bc5e556d
    来源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    java注解,通过反射解析注解,模仿hibernate,获取sql语句。
    Eclipse/Myeclipse中查看和调试JDK源代码的方法
    TCP为什么会出现 RST
    《浅谈F5健康检查常用的几种方式》—那些你应该知道的知识(二)
    负载均衡服务TCP端口健康检查成功,为什么在后端业务日志中出现网络连接异常信息?
    haproxy的丰富特性简介
    健康检查概述
    firewall防火墙常用操作
    gitlab修改默认端口
    vim脚本判断操作系统
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/11069707.html
Copyright © 2011-2022 走看看