zoukankan      html  css  js  c++  java
  • .Net Core 基于CAP框架的事件总线

    .Net Core 基于CAP框架的事件总线


    CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

    github:https://github.com/dotnetcore/CAP

    doc:http://cap.dotnetcore.xyz

    CAP是一款优秀的框架,但是CAP在消息订阅的处理类必须使用[CapSubscribe]特性来绑定,本人觉得不是很科学.

    好在2.5.1版本开放了接口IConsumerServiceSelector为public,提供了扩展的基础,以下将基于CAP,扩展成简洁的事件总线组件.

     使用:

       /// <summary>
        /// 修改用户名事件
        /// </summary>
        public class UserNameUpdateEvent : IEvent
        {
            public int Id { get; set; }
    
            public string OldName { get; set; }
    
            public string NewName { get; set; }
        }
    
        /// <summary>
        /// 发布事件,你可以在任意地方注入IEventPublisher,进行事件发布
        /// </summary>
        public class PublishEvent : Guc.Kernel.Dependency.ITransient
        {
            public PublishEvent(IEventPublisher eventPublisher)
            {
                EventPublisher = eventPublisher;
            }
    
            IEventPublisher EventPublisher { get; }
    
            public void Publish()
            {
                EventPublisher.Publish(new UserNameUpdateEvent
                {
                    Id = 1,
                    OldName = "老王1",
                    NewName = "老王2"
                });
            }
        }
    
        /// <summary>
        /// 事件处理,你可以注入任何需要的类型
        /// </summary>
        public class UserNameUpdateEventHandler : IEventHandler<UserNameUpdateEvent>
        {
            public UserNameUpdateEventHandler(IUserStore userStore, ILogger<UserNameUpdateEventHandler> logger)
            {
                UserStore = userStore;
                Logger = logger;
            }
    
            IUserStore UserStore { get; }
            ILogger<UserNameUpdateEventHandler> Logger { get; }
    
            /// <summary>
            /// 执行事件处理
            /// </summary>
            /// <param name="event">事件对象</param>
            /// <returns></returns>
            public async Task Execute(UserNameUpdateEvent @event)
            {
                Logger.LogInformation($"修改用户名:{@event.OldName}->{@event.NewName}");
                await Task.CompletedTask;
                UserStore.Update(new UserModel { Id = @event.Id, Name = @event.NewName });
            }
        }

    约定事件类型的FullName为事件名称,如例中的:UserNameUpdateEvent的类型FullName:Guc.Sample.UserNameUpdateEvent;

    约定事件处理类型的FullName为GroupName,如例中的:UserNameUpdateEventHandler 的类型FullName:Guc.Sample.UserNameUpdateEventHandler 

    如果使用的RabbitMQ作为消息的传输,则Guc.Sample.UserNameUpdateEvent为路由的Key,Guc.Sample.UserNameUpdateEventHandler 为队列名称.

    使用:

       services.AddGucKernel()
                    .AddEventBus(capOptions =>
                    {
                        // CAP相关的配置
    
                    });        

    扩展代码,github:https://github.com/280780363/guc/blob/master/src/Guc.EventBus/GucConsumerServiceSelector.cs:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Reflection;
    using System.Text.RegularExpressions;
    using Microsoft.Extensions.DependencyInjection;
    using System.Collections.Concurrent;
    using DotNetCore.CAP;
    using Guc.Kernel.Utils;
    using Microsoft.Extensions.Options;
    
    namespace Guc.EventBus
    {
        class GucConsumerServiceSelector : IConsumerServiceSelector
        {
            private readonly CapOptions _capOptions;
            private readonly IServiceProvider _serviceProvider;
    
            /// <summary>
            /// since this class be designed as a Singleton service,the following two list must be thread safe!!!
            /// </summary>
            private readonly ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>> _asteriskList;
            private readonly ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>> _poundList;
    
            /// <summary>
            /// Creates a new <see cref="DefaultConsumerServiceSelector" />.
            /// </summary>
            public GucConsumerServiceSelector(IServiceProvider serviceProvider)
            {
                _serviceProvider = serviceProvider;
                _capOptions = serviceProvider.GetService<IOptions<CapOptions>>().Value;
    
                _asteriskList = new ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>>();
                _poundList = new ConcurrentDictionary<string, List<RegexExecuteDescriptor<ConsumerExecutorDescriptor>>>();
            }
    
            public IReadOnlyList<ConsumerExecutorDescriptor> SelectCandidates()
            {
                var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
    
                executorDescriptorList.AddRange(FindConsumersFromInterfaceTypes(_serviceProvider));
                return executorDescriptorList;
            }
    
            public ConsumerExecutorDescriptor SelectBestCandidate(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
            {
                var result = MatchUsingName(key, executeDescriptor);
                if (result != null)
                {
                    return result;
                }
    
                //[*] match with regex, i.e.  foo.*.abc
                result = MatchAsteriskUsingRegex(key, executeDescriptor);
                if (result != null)
                {
                    return result;
                }
    
                //[#] match regex, i.e. foo.#
                result = MatchPoundUsingRegex(key, executeDescriptor);
                return result;
            }
    
            private IEnumerable<ConsumerExecutorDescriptor> FindConsumersFromInterfaceTypes(
                IServiceProvider provider)
            {
                var executorDescriptorList = new List<ConsumerExecutorDescriptor>();
    
                using (var scoped = provider.CreateScope())
                {
                    var scopedProvider = scoped.ServiceProvider;
                    var consumerServices = scopedProvider.GetServices<ICapSubscribe>();
                    foreach (var service in consumerServices)
                    {
                        var typeInfo = service.GetType().GetTypeInfo();
    
                        // 必须是非抽象类
                        if (!typeInfo.IsClass || typeInfo.IsAbstract)
                            continue;
    
                        // 继承自IEventHandler<>
                        if (!typeInfo.IsChildTypeOfGenericType(typeof(IEventHandler<>)))
                            continue;
    
                        executorDescriptorList.AddRange(GetTopicAttributesDescription(typeInfo));
                    }
    
                    return executorDescriptorList;
                }
            }
    
            private List<string> GetEventNamesFromTypeInfo(TypeInfo typeInfo)
            {
                List<string> names = new List<string>();
                foreach (var item in typeInfo.ImplementedInterfaces)
                {
                    var @interface = item.GetTypeInfo();
                    if (!@interface.IsGenericType)
                        continue;
                    if (@interface.GenericTypeArguments.Length != 1)
                        continue;
    
                    var eventType = @interface.GenericTypeArguments[0].GetTypeInfo();
                    if (!eventType.IsChildTypeOf<IEvent>())
                        continue;
    
                    names.Add(eventType.FullName);
                }
                return names;
            }
    
            private IEnumerable<ConsumerExecutorDescriptor> GetTopicAttributesDescription(TypeInfo typeInfo)
            {
                var names = GetEventNamesFromTypeInfo(typeInfo);
                if (names.IsNullOrEmpty())
                    return new ConsumerExecutorDescriptor[] { };
    
                List<ConsumerExecutorDescriptor> results = new List<ConsumerExecutorDescriptor>();
                var methods = typeInfo.GetMethods();
                foreach (var eventName in names)
                {
                    var method = methods.FirstOrDefault(r => r.Name == "Execute"
                                                            && r.GetParameters().Length == 1
                                                            && r.GetParameters()[0].ParameterType.FullName == eventName
                                                            && r.GetParameters()[0].ParameterType.IsChildTypeOf<IEvent>());
                    if (method == null)
                        continue;
    
                    results.Add(new ConsumerExecutorDescriptor
                    {
                        Attribute = new CapSubscribeAttribute(eventName) { Group = typeInfo.FullName + "." + _capOptions.Version },
                        ImplTypeInfo = typeInfo,
                        MethodInfo = method
                    });
                }
    
                return results;
            }
    
            private ConsumerExecutorDescriptor MatchUsingName(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
            {
                return executeDescriptor.FirstOrDefault(x => x.Attribute.Name == key);
            }
    
            private ConsumerExecutorDescriptor MatchAsteriskUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
            {
                var group = executeDescriptor.First().Attribute.Group;
                if (!_asteriskList.TryGetValue(group, out var tmpList))
                {
                    tmpList = executeDescriptor.Where(x => x.Attribute.Name.IndexOf('*') >= 0)
                        .Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor>
                        {
                            Name = ("^" + x.Attribute.Name + "$").Replace("*", "[0-9_a-zA-Z]+").Replace(".", "\."),
                            Descriptor = x
                        }).ToList();
                    _asteriskList.TryAdd(group, tmpList);
                }
    
                foreach (var red in tmpList)
                {
                    if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
                    {
                        return red.Descriptor;
                    }
                }
    
                return null;
            }
    
            private ConsumerExecutorDescriptor MatchPoundUsingRegex(string key, IReadOnlyList<ConsumerExecutorDescriptor> executeDescriptor)
            {
                var group = executeDescriptor.First().Attribute.Group;
                if (!_poundList.TryGetValue(group, out var tmpList))
                {
                    tmpList = executeDescriptor
                        .Where(x => x.Attribute.Name.IndexOf('#') >= 0)
                        .Select(x => new RegexExecuteDescriptor<ConsumerExecutorDescriptor>
                        {
                            Name = ("^" + x.Attribute.Name + "$").Replace("#", "[0-9_a-zA-Z\.]+"),
                            Descriptor = x
                        }).ToList();
                    _poundList.TryAdd(group, tmpList);
                }
    
                foreach (var red in tmpList)
                {
                    if (Regex.IsMatch(key, red.Name, RegexOptions.Singleline))
                    {
                        return red.Descriptor;
                    }
                }
    
                return null;
            }
    
    
            private class RegexExecuteDescriptor<T>
            {
                public string Name { get; set; }
    
                public T Descriptor { get; set; }
            }
        }
    }
    View Code
  • 相关阅读:
    FireFox/Chrome不支持在前台js读后台Attributes.Add("DefaultValue")的属性
    IIS Web怪问题: Access is denied due to invalid credentials.
    转:OWC学习笔记电子表格(Spreadsheet)风格属性设置
    最全的CSS浏览器兼容问题整理(IE6.0、IE7.0 与 FireFox)
    转:[网站安全]避免在站点中通过URL参数做重定向
    [SQL Server 2005 BI]在.NET中创建SQL 2005 KPI
    即使asp:TreeView有几万个节点,也让IE不死的解决方法
    Visual Totals in MDX and Role Security
    css hack
    让Updatepanel中的控件触发整个页面Postback
  • 原文地址:https://www.cnblogs.com/gucaocao/p/11527570.html
Copyright © 2011-2022 走看看