zoukankan      html  css  js  c++  java
  • 源码解析-Abp vNext丨分布式事件总线DistributedEventBus

    前言

    上一节咱们讲了LocalEventBus,本节来讲本地事件总线(DistributedEventBus),采用的RabbitMQ进行实现。

    Volo.Abp.EventBus.RabbitMQ模块内部代码并不多,RabbitMQ的操作都集中在Volo.Abp.RabbitMQ这个包中。

    正文

    我们从模块定义开始看,项目启动的时候分别读取了appsetting.json的配置参数和调用了RabbitMqDistributedEventBusInitialize函数。

        public class AbpEventBusRabbitMqModule : AbpModule
        {
            public override void ConfigureServices(ServiceConfigurationContext context)
            {
                var configuration = context.Services.GetConfiguration();
    
                Configure<AbpRabbitMqEventBusOptions>(configuration.GetSection("RabbitMQ:EventBus"));
            }
    
            public override void OnApplicationInitialization(ApplicationInitializationContext context)
            {
                context
                    .ServiceProvider
                    .GetRequiredService<RabbitMqDistributedEventBus>()
                    .Initialize();
            }
        }
    

    Initialize函数中我们根据 MessageConsumerFactory.Create向内部进行查阅可以看到最终调用方法为RabbitMqMessageConsumer.TryCreateChannelAsync并且在其内部我们可以看到下面代码,这里定义了消费的回调函数。反推Initialize方法其实是在启动一个消费者。

          public void Initialize()
            {
                Consumer = MessageConsumerFactory.Create(
                    new ExchangeDeclareConfiguration(
                        AbpRabbitMqEventBusOptions.ExchangeName,
                        type: "direct",
                        durable: true
                    ),
                    new QueueDeclareConfiguration(
                        AbpRabbitMqEventBusOptions.ClientName,
                        durable: true,
                        exclusive: false,
                        autoDelete: false
                    ),
                    AbpRabbitMqEventBusOptions.ConnectionName
                );
    
                Consumer.OnMessageReceived(ProcessEventAsync);
    
                SubscribeHandlers(AbpDistributedEventBusOptions.Handlers);
            }
    
    
     var consumer = new AsyncEventingBasicConsumer(Channel);
                    consumer.Received += HandleIncomingMessageAsync;
    

    继续向下看Consumer.OnMessageReceived(ProcessEventAsync);该方法向一个并发安全集合输入一个委托事件,并该事件会在上面的HandleIncomingMessageAsync会调中触发故确定为消费者的执行逻辑,而ProcessEventAsync其实还是走了我们在讲LocalEventBus哪一套,寻找Handler执行函数。

    SubscribeHandlers还是上节讲的基类的函数,这里要注意内部调用的Subscribe该方法中的 Consumer.BindAsync会根据为消费者Bind路由,这样才能触发事件处理函数。

    
           public override IDisposable Subscribe(Type eventType, IEventHandlerFactory factory)
            {
                var handlerFactories = GetOrCreateHandlerFactories(eventType);
    
                if (factory.IsInFactories(handlerFactories))
                {
                    return NullDisposable.Instance;
                }
    
                handlerFactories.Add(factory);
    
                if (handlerFactories.Count == 1) //TODO: Multi-threading!
                {
                    Consumer.BindAsync(EventNameAttribute.GetNameOrDefault(eventType));
                }
    
                return new EventHandlerFactoryUnregistrar(this, eventType, factory);
            }
    
    

    看完了事件消费者我们来看看事件发布,直接看PublishAsync函数就完事了,整个函数非常简单,都是RabbitMQ的操作语法,这里的路由Key是在EventNameAttribute.GetNameOrDefault(eventType);函数中通过读取ETO上指定注解Name来指定的。

    protected Task PublishAsync(
                string eventName,
                byte[] body,
                IBasicProperties properties,
                Dictionary<string, object> headersArguments = null,
                Guid? eventId = null)
            {
                using (var channel = ConnectionPool.Get(AbpRabbitMqEventBusOptions.ConnectionName).CreateModel())
                {
                    channel.ExchangeDeclare(
                        AbpRabbitMqEventBusOptions.ExchangeName,
                        "direct",
                        durable: true
                    );
    
                    if (properties == null)
                    {
                        properties = channel.CreateBasicProperties();
                        properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
                    }
    
                    if (properties.MessageId.IsNullOrEmpty())
                    {
                        properties.MessageId = (eventId ?? GuidGenerator.Create()).ToString("N");
                    }
    
                    SetEventMessageHeaders(properties, headersArguments);
    
                    channel.BasicPublish(
                        exchange: AbpRabbitMqEventBusOptions.ExchangeName,
                        routingKey: eventName,
                        mandatory: true,
                        basicProperties: properties,
                        body: body
                    );
                }
    
                return Task.CompletedTask;
            }
    

    解析

    整个分布式事件的实现其实非常简单,在事件发生时发布者只需要定义好路由名称和消息内容发送RabbitMQ中,而消费者则是在项目运行的时候的通过调用Initialize就启动起来了。

    这里我们也同样根据整个原理自己实现一下这个流程。

    Dppt.EventBus分别定义IDistributedEventBus、DistributedEventBusOptions、IDistributedEventHandler分别用于采用分布式事件总线调用、配置选项用于存储处理程序Handler、定义分布式处理程序抽象。

    新建Dppt.EventBus.RabbitMQ类库先简单对RabbitMQ进行一个简单的封装

    public class RabbitMqConnections : IRabbitMqConnections
        {
            private readonly IConnectionFactory _connectionFactory;
            private readonly ILogger<RabbitMqConnections> _logger;
            IConnection _connection;
            bool _disposed;
            public RabbitMqConnections(IConnectionFactory connectionFactory, ILogger<RabbitMqConnections> logger)
            {
                _connectionFactory = connectionFactory;
                _logger = logger;
            }
    
    
            public bool IsConnected
            {
                get
                {
                    return _connection != null && _connection.IsOpen && !_disposed;
                }
            }
    
            public void TryConnect() {
    
                _connection = _connectionFactory.CreateConnection();
    
            }
    
    
            public IModel CreateModel()
            {
                if (!IsConnected)
                {
                    throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
                }
    
                return _connection.CreateModel();
            }
    
    
            public void Dispose()
            {
                if (_disposed) return;
    
                _disposed = true;
    
                try
                {
                    _connection.Dispose();
                }
                catch (IOException ex)
                {
                    _logger.LogCritical(ex.ToString());
                }
            }
    
        }
    

    然后我们分别定义ExchangeDeclareConfiguration、QueueDeclareConfiguration用于记录配置信息。

    开始处理RabbitMqEventBus处理程序首先是发布事件,大体代码如下就是往RabbitMQ里面丢消息。

            /// <summary>
            /// rabbmitmq 连接服务
            /// </summary>
            public readonly IRabbitMqConnections _rabbitMqConnections;
    
    
    public Task PublishAsync<TEvent>(TEvent eventData)
            {
                var eventName = EventNameAttribute.GetNameOrDefault(typeof(TEvent));
                var body = JsonSerializer.Serialize(eventData);
                return PublishAsync(eventName, body, null, null);
            }
    
            public Task PublishAsync(string eventName, string body, IBasicProperties properties, Dictionary<string, object> headersArguments = null, Guid? eventId = null)
            {
    
                if (!_rabbitMqConnections.IsConnected)
                {
                    _rabbitMqConnections.TryConnect();
                }
                using (var channel = _rabbitMqConnections.CreateModel())
                {
                    // durable 设置队列持久化  
                    channel.ExchangeDeclare(RabbitMqEventBusOptions.ExchangeName, "direct", durable: true);
    
                    if (properties == null)
                    {
                        properties = channel.CreateBasicProperties();
                        // 设置消息持久化
                        properties.DeliveryMode = 2;
                    }
    
                    if (properties.MessageId.IsNullOrEmpty())
                    {
                        // 消息的唯一性标识
                        properties.MessageId = (eventId ?? Guid.NewGuid()).ToString("N");
                    }
    
                    SetEventMessageHeaders(properties, headersArguments);
    
                    channel.BasicPublish(
                       exchange: RabbitMqEventBusOptions.ExchangeName,
                       routingKey: eventName,
                       mandatory: true,
                       basicProperties: properties,
                       body: Encoding.UTF8.GetBytes(body)
                   );
    
                }
    
                return Task.CompletedTask;
            }
    
          private void SetEventMessageHeaders(IBasicProperties properties, Dictionary<string, object> headersArguments)
            {
                if (headersArguments == null)
                {
                    return;
                }
    
                properties.Headers ??= new Dictionary<string, object>();
    
                foreach (var header in headersArguments)
                {
                    properties.Headers[header.Key] = header.Value;
                }
            }
    
    
    

    然后就是消费者的处理,我们同样定义Initialize函数,并简化部分封装代码,完成消费者启动。

     public void Initialize()
            {
    
                Exchange = new ExchangeDeclareConfiguration(RabbitMqEventBusOptions.ExchangeName,"direct",true);
                Queue = new QueueDeclareConfiguration(RabbitMqEventBusOptions.ClientName, true, false, false);
    
                // 启动一个消费者
                if (!_rabbitMqConnections.IsConnected)
                {
                    _rabbitMqConnections.TryConnect();
                }
    
                try
                {
    
                    Channel = _rabbitMqConnections.CreateModel();
    
    
    
                    Channel.ExchangeDeclare(
                      exchange: Exchange.ExchangeName,
                      type: Exchange.Type,
                      durable: Exchange.Durable,
                      autoDelete: Exchange.AutoDelete,
                      arguments: Exchange.Arguments
                  );
    
    
                    Channel.QueueDeclare(
                       queue: Queue.QueueName,
                       durable: Queue.Durable,
                       exclusive: Queue.Exclusive,
                       autoDelete: Queue.AutoDelete,
                       arguments: Queue.Arguments
                   );
    
                    var consumer = new AsyncEventingBasicConsumer(Channel);
                    consumer.Received += HandleIncomingMessageAsync;
    
                    Channel.BasicConsume(
                        queue: Queue.QueueName,
                        autoAck: false,
                        consumer: consumer
                    );
    
                    SubscribeHandlers(DistributedEventBusOptions.Handlers);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Error:" + ex.Message);
                }
            }
    

    参数配置这边主要是读取AppSetting信息和索要Handler

     public static class DpptEventBusRabbitMqRegistrar
        {
            public static void AddDpptEventBusRabbitMq(this IServiceCollection services, IConfiguration configuration, List<Type> types)
            {
         
                services.AddSingleton<IRabbitMqConnections>(sp =>
                {
                    var logger = sp.GetRequiredService<ILogger<RabbitMqConnections>>();
    
                    var factory = new ConnectionFactory()
                    {
                        HostName = configuration["RabbitMQ:EventBusConnection"],
                        VirtualHost = configuration["RabbitMQ:EventBusVirtualHost"],
                        DispatchConsumersAsync = true,
                        AutomaticRecoveryEnabled = true
                };
    
                    if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusUserName"]))
                    {
                        factory.UserName = configuration["RabbitMQ:EventBusUserName"];
                    }
    
                    if (!string.IsNullOrEmpty(configuration["RabbitMQ:EventBusPassword"]))
                    {
                        factory.Password = configuration["RabbitMQ:EventBusPassword"];
                    }
    
                    return new RabbitMqConnections(factory, logger);
                });
    
                var distributedHandlers = types;
                foreach (var item in distributedHandlers)
                {
                    services.AddSingleton(item);
                }
    
                services.Configure<DistributedEventBusOptions>(options =>
                {
                    options.Handlers.AddIfNotContains(distributedHandlers);
                });
    
                services.Configure<DpptRabbitMqEventBusOptions>(options => {
    
                    options.ExchangeName = configuration["RabbitMQ:EventBus:ExchangeName"];
                    options.ClientName = configuration["RabbitMQ:EventBus:ClientName"];
                });
    
                services.AddSingleton<IDistributedEventBus, RabbitMqDistributedEventBus>();
    
              
            }
        }
    

    测试

    新建一个空项目,进行插件注册,然后创建ETO和Handler进行测试。

    64

    测试结果放在下面了。

    62

    63

    结语

    本次挑选了一个比较简单的示例来讲,整个EventBus我应该分成3篇 下一篇我来讲分布式事务。

    最后欢迎各位读者关注我的博客, https://github.com/MrChuJiu/Dppt/tree/master/src 欢迎大家Star

    另外这里有个社区地址(https://github.com/MrChuJiu/Dppt/discussions),如果大家有技术点希望我提前档期可以写在这里,希望本项目助力我们一起成长

  • 相关阅读:
    ARM中断(一)
    窗口置顶小工具
    volatile关键字
    IIC总线
    ARM中断(三)
    BootLoader —— S3C2440
    视频那些事
    [轉]簡單的顯示隱藏實現
    Apache Service Monitor Start按鈕变灰 的解决方法
    [转载]经典java转码程序,实现native2ascii命令行的功能
  • 原文地址:https://www.cnblogs.com/MrChuJiu/p/15491549.html
Copyright © 2011-2022 走看看