zoukankan      html  css  js  c++  java
  • eShopOnContainers学习系列(三):RabbitMQ消息总线实践

    今天研究了下eShopOnContainers里的RabbitMQ的使用,在项目里是以封装成消息总线的方式使用的,但是仍然是以其发布、订阅两个方法作为基础封装的,我们今天就来实际使用一下。

    为了简单起见,就在同一个API项目里实现发布订阅。

    新建API项目 RabbitMQ_Bus_Test ,类库 EventBus、EventBusRabbitMQ,这两个类库中将会实现消息总线最主要的方法、发布订阅。

    在EventBus中新增消息事件类:IntegrationEvent,这个类在事件里是作为一个消息父类,所有的消息类都需要继承这个类,在实际项目里按需修改。

    public class IntegrationEvent
        {
            public IntegrationEvent()
            {
                Id = Guid.NewGuid();
                CreationDate = DateTime.UtcNow;
            }
    
            [JsonConstructor]
            public IntegrationEvent(Guid id, DateTime createDate)
            {
                Id = id;
                CreationDate = createDate;
            }
    
            [JsonProperty]
            public Guid Id { get; private set; }
    
            [JsonProperty]
            public DateTime CreationDate { get; private set; }
        }

    新增泛型接口类:IIntegrationEventHandler.cs,通过消息队列发送的消息需要有处理程序,而这个接口则是作为一个约束类存在,所有的处理程序都需要继承这个泛型接口类:

        public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler
                where TIntegrationEvent : IntegrationEvent
        {
            Task Handle(TIntegrationEvent @event);
        }
    
        public interface IIntegrationEventHandler
        {
        }

    新增事件订阅管理接口:IEventBusSubscriptionsManager,从类名就可以看出这个是专门用来管理消息处理方法的,我们这里主要看AddSubscription这个订阅方法,它的作用是绑定消息类和消息处理程序,即哪一个消息被哪一个方法消费,当然这里有两个约束,消息类需要继承IntegrationEvent,处理类需要继承IIntegrationEventHandler:

    public interface IEventBusSubscriptionsManager
        {
            bool IsEmpty { get; }
            event EventHandler<string> OnEventRemoved;
            void AddDynamicSubscription<TH>(string eventName)
                where TH : IDynamicIntegrationEventHandler;
    
            void AddSubscription<T, TH>()
               where T : IntegrationEvent
               where TH : IIntegrationEventHandler<T>;
    
            void RemoveSubscription<T, TH>()
                where TH : IIntegrationEventHandler<T>
                where T : IntegrationEvent;
    
            void RemoveDynamicSubscription<TH>(string eventName)
                where TH : IDynamicIntegrationEventHandler;
    
            bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;
            bool HasSubscriptionsForEvent(string eventName);
            Type GetEventTypeByName(string eventName);
            void Clear();
            IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;
            IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);
            string GetEventKey<T>();
        }

    接着新增它的实现类InMemoryEventBusSubscriptionsManager.cs,从类名可以看出订阅的管理都是在内存中处理的,在生产项目里我们可以使用Redis来进行存储:

    public partial class InMemoryEventBusSubscriptionsManager: IEventBusSubscriptionsManager
        {
            private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
            private readonly List<Type> _eventTypes;
    
            public event EventHandler<string> OnEventRemoved;
    
            public InMemoryEventBusSubscriptionsManager()
            {
                _handlers = new Dictionary<string, List<SubscriptionInfo>>();
                _eventTypes = new List<Type>();
            }
    
            public bool IsEmpty => !_handlers.Keys.Any();
            public void Clear() => _handlers.Clear();
    
            public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);
    
            public void AddDynamicSubscription<TH>(string eventName)
                where TH : IDynamicIntegrationEventHandler
            {
                DoAddSubscription(typeof(TH), eventName, isDynamic: true);
            }
    
            public void AddSubscription<T,TH>()
                where T:IntegrationEvent
                where TH:IIntegrationEventHandler<T>
            {
                var eventName = GetEventKey<T>();
    
                DoAddSubscription(typeof(TH), eventName, isDynamic: false);
    
                if(!_eventTypes.Contains(typeof(T)))
                {
                    _eventTypes.Add(typeof(T));
                }
            }
    
            public void DoAddSubscription(Type handlerType,string eventName,bool isDynamic)
            {
                if(!HasSubscriptionsForEvent(eventName))
                {
                    _handlers.Add(eventName, new List<SubscriptionInfo>());
                }
    
                if(_handlers[eventName].Any(s=>s.HandlerType==handlerType))
                {
                    throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
                }
    
                if(isDynamic)
                {
                    _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));
                }
                else
                {
                    _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));
                }
            }
    
            public void RemoveDynamicSubscription<TH>(string eventName)
                where TH: IDynamicIntegrationEventHandler
            {
                var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);
                
            }
    
            private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName)
                where TH:IDynamicIntegrationEventHandler
            {
                return DoFindSubscriptionToRemove(eventName, typeof(TH));
            }
    
    
            private SubscriptionInfo DoFindSubscriptionToRemove(string eventName,Type handlerType)
            {
                if(!HasSubscriptionsForEvent(eventName))
                {
                    return null;
                }
    
                return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType);
            }
    
            public void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove)
            {
                if(subsToRemove!=null)
                {
                    _handlers[eventName].Remove(subsToRemove);
                    if(!_handlers[eventName].Any())
                    {
                        _handlers.Remove(eventName);
                        var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);
                        if(eventType!=null)
                        {
                            _eventTypes.Remove(eventType);
                        }
                        RaiseOnEventRemoved(eventName);
                    }
                }
            }
    
            public void RaiseOnEventRemoved(string eventName)
            {
                var handler = OnEventRemoved;
                if(handler!=null)
                {
                    OnEventRemoved(this, eventName);
                }
            }
    
            public void RemoveSubscription<T, TH>()
                where T : IntegrationEvent
                where TH : IIntegrationEventHandler<T>
            {
                var handlerToRemove = FindSubscriptionToRemove<T, TH>();
                var eventName = GetEventKey<T>();
                DoRemoveHandler(eventName, handlerToRemove);
            }
    
            private SubscriptionInfo FindSubscriptionToRemove<T,TH>()
                where T:IntegrationEvent
                where TH:IIntegrationEventHandler<T>
            {
                var eventName = GetEventKey<T>();
                return DoFindSubscriptionToRemove(eventName, typeof(TH));
            }
    
            public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName);
    
            public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent
            {
                var key = GetEventKey<T>();
                return GetHandlersForEvent(key);
            }
    
            public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName];
    
            public string GetEventKey<T>()
            {
                return typeof(T).Name;
            }
    
            public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent
            {
                var key = GetEventKey<T>();
                return HasSubscriptionsForEvent(key);
            }
        }

    这个实现类里有许多关于订阅事件的处理方法,我们本次需要关注的有两个地方,这里使用字典Dictionary<string, List<SubscriptionInfo>>存储消息类和消息处理类,这里有个事件资源类订阅信息类,存储两个属性,一个是否是动态的,另一个是处理程序类型:

    public class SubscriptionInfo
        {
            public bool IsDynamic { get; }
            public Type HandlerType { get; }
    
            private SubscriptionInfo(bool isDynamic, Type handlerType)
            {
                IsDynamic = isDynamic;
                HandlerType = handlerType;
            }
    
            public static SubscriptionInfo Dynamic(Type handlerType)
            {
                return new SubscriptionInfo(true, handlerType);
            }
    
            public static SubscriptionInfo Typed(Type handlerType)
            {
                return new SubscriptionInfo(false, handlerType);
            }
        }

    简单描述一下消息的处理流程,在程序初次加载时会将消息类和对应的处理类进行绑定,当消息发送的时候从根据发送的消息类从字典里查找对应的处理类,通过反射进行加载调用。

    OK,关于消息资源方面的方法就这些,下面我们看下关于RabbitMQ的封装,新增事件总线接口,这里面有两个最重要的方法,发布和订阅:

    public interface IEventBus
        {
            void Publish(IntegrationEvent @event);
    
            void Subscribe<T, TH>()
                where T : IntegrationEvent
                where TH : IIntegrationEventHandler<T>;
    
            void SubscribeDynamic<TH>(string eventName)
                where TH : IDynamicIntegrationEventHandler;
    
            void UnsubscribeDynamic<TH>(string eventName)
                where TH : IDynamicIntegrationEventHandler;
    
            void Unsubscribe<T, TH>()
                where TH : IIntegrationEventHandler<T>
                where T : IntegrationEvent;
        }

    看一下它的实现类EventBusRabbitMQ.cs,这里我们只看最重要的两个方法Publish和Subscribe:

    public class EventBusRabbitMQ : IEventBus, IDisposable
        {
            const string BROKER_NAME = "eshop_event_bus";
    
            private readonly IRabbitMQPersistentConnection _persistentConnection;
            private readonly ILogger<EventBusRabbitMQ> _logger;
            private readonly IEventBusSubscriptionsManager _subsManager;
            private readonly ILifetimeScope _autofac;
            private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus";
            private readonly int _retryCount;
    
            private IModel _consumerChannel;
            private string _queueName;
    
    
            public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
        ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
            {
                _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
                _logger = logger ?? throw new ArgumentNullException(nameof(logger));
                _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();
                _queueName = queueName;
                _consumerChannel = CreateConsumerChannel();
                _autofac = autofac;
                _retryCount = retryCount;
                _subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
            }
    
            private void SubsManager_OnEventRemoved(object sender, string eventName)
            {
                if (!_persistentConnection.IsConnected)
                {
                    _persistentConnection.TryConnect();
                }
    
                using (var channel = _persistentConnection.CreateModel())
                {
                    channel.QueueUnbind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
    
                    if (_subsManager.IsEmpty)
                    {
                        _queueName = string.Empty;
                        _consumerChannel.Close();
                    }
                }
            }
    
            public void Publish(IntegrationEvent @event)
            {
                if (!_persistentConnection.IsConnected)
                {
                    _persistentConnection.TryConnect();
                }
    
                var policy = RetryPolicy.Handle<BrokerUnreachableException>()
                    .Or<SocketException>()
                    .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                    {
                        _logger.LogWarning(ex.ToString());
                    });
    
                using (var channel = _persistentConnection.CreateModel())
                {
                    var eventName = @event.GetType()
                        .Name;
    
                    channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
    
                    var message = JsonConvert.SerializeObject(@event);
                    var body = Encoding.UTF8.GetBytes(message);
    
                    try
                    {
                        policy.Execute(() =>
                        {
                            var properties = channel.CreateBasicProperties();
                            properties.DeliveryMode = 2; // persistent
                            channel.BasicPublish(exchange: BROKER_NAME, routingKey: eventName, mandatory: false, basicProperties: properties, body: body);
                        });
                    }
                    catch(Exception ex)
                    {
                        Console.WriteLine(ex.Message);
                    }
                }
            }
    
            /// <summary>
            /// 订阅动态
            /// </summary>
            public void SubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler
            {
                DoInternalSubscription(eventName);
                _subsManager.AddDynamicSubscription<TH>(eventName);
            }
    
    
            /// <summary>
            /// 订阅
            /// </summary>
            /// <typeparam name="T"></typeparam>
            /// <typeparam name="TH"></typeparam>
            public void Subscribe<T, TH>()
                where T : IntegrationEvent
                where TH : IIntegrationEventHandler<T>
            {
                var eventName = _subsManager.GetEventKey<T>();
                DoInternalSubscription(eventName);
                _subsManager.AddSubscription<T, TH>();
            }
    
           /// <summary>
            /// 进行内部订阅
            /// </summary>
            /// <param name="eventName"></param>
            private void DoInternalSubscription(string eventName)
            {
                var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
                if(!containsKey)
                {
                    if(!_persistentConnection.IsConnected)
                    {
                        _persistentConnection.TryConnect();
                    }
    
                    using (var channel = _persistentConnection.CreateModel())
                    {
                        channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
                    }
                }
            }
    
            /// <summary>
            /// 取消订阅
            /// </summary>
            public void Unsubscribe<T,TH>()
                where TH:IIntegrationEventHandler<T>
                where T:IntegrationEvent
            {
                _subsManager.RemoveSubscription<T, TH>();
            }
    
            /// <summary>
            /// 取消订阅动态
            /// </summary>
            public void UnsubscribeDynamic<TH>(string eventName)
        where TH : IDynamicIntegrationEventHandler
            {
                _subsManager.RemoveDynamicSubscription<TH>(eventName);
            }
    
            /// <summary>
            /// 释放资源
            /// </summary>
            public void Dispose()
            {
                if(_consumerChannel!=null)
                {
                    _consumerChannel.Dispose();
                }
    
                _subsManager.Clear();
            }
    
              /// <summary>
            /// 创建消费者通道
            /// </summary>
            private IModel CreateConsumerChannel()
            {
                if (!_persistentConnection.IsConnected)
                {
                    _persistentConnection.TryConnect();
                }
    
                var channel = _persistentConnection.CreateModel();
    
                channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
    
                channel.QueueDeclare(queue: _queueName
                    , durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += async (model, ea) =>
                {
                    var eventName = ea.RoutingKey;
                    var message = Encoding.UTF8.GetString(ea.Body);
    
                    await ProcessEvent(eventName, message);
    
                    channel.BasicAck(ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
    
                channel.CallbackException += (sender, ea) =>
                {
                    _consumerChannel.Dispose();
                    _consumerChannel = CreateConsumerChannel();
                };
    
                return channel;
            }
    
    /// <summary>
            /// 流程事件
            /// </summary>
            private async Task ProcessEvent(string eventName, string message)
            {
                if (_subsManager.HasSubscriptionsForEvent(eventName))
                {
                    using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                    {
                        var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                        foreach (var subscription in subscriptions)
                        {
                            if (subscription.IsDynamic)
                            {
                                var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                                if (handler == null) continue;
                                dynamic eventData = JObject.Parse(message);
                                await handler.Handle(eventData);
                            }
                            else
                            {
                                var handler = scope.ResolveOptional(subscription.HandlerType);
                                if (handler == null) continue;
                                var eventType = _subsManager.GetEventTypeByName(eventName);
                                var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                                var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                                await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                            }
                        }
                    }
                }
            }
        }

    首先是发布流程,当调用发布方法的时候首先会检查RabbitMQ是否连接,关于RabbitMQ连接的操作,项目里又封装了一个单独和接口和实现,下面贴一下代码:

        public interface IRabbitMQPersistentConnection:IDisposable
        {
            bool IsConnected { get; }
            bool TryConnect();
            IModel CreateModel();
        }
     public class DefaultRabbitMQPersistentConnection:IRabbitMQPersistentConnection
        {
            private readonly IConnectionFactory _connectionFactory;
            private readonly ILogger<DefaultRabbitMQPersistentConnection> _logger;
            private readonly int _retryCount;
            IConnection _connection;
            bool _disposed;
    
            object sync_root = new object();
    
            public DefaultRabbitMQPersistentConnection(IConnectionFactory connectionFactory,ILogger<DefaultRabbitMQPersistentConnection> logger,int retryCount=5)
            {
                _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
                _logger = logger ?? throw new ArgumentNullException(nameof(logger));
                _retryCount = retryCount;
            }
    
            public bool IsConnected
            {
                get
                {
                    return _connection != null && _connection.IsOpen && !_disposed;
                }
            }
    
            public IModel CreateModel()
            {
                if(!IsConnected)
                {
                    throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
                }
    
                return _connection.CreateModel();
            }
    
    
            public void Dispose()
            {
                if (_disposed) return;
    
                _disposed = true;
    
                try
                {
                    _connection.Dispose();
                }
                catch(IOException ex)
                {
                    _logger.LogCritical(ex.ToString());
                }
            }
    
    
            public bool TryConnect()
            {
                _logger.LogInformation("RabbitMQ Client is trying to connect");
    
                lock(sync_root)
                {
                    var policy = RetryPolicy.Handle<SocketException>()
                        .Or<BrokerUnreachableException>()
                        .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
                        {
                            _logger.LogWarning(ex.ToString());
                        });
    
                    policy.Execute(() =>
                    {
                        _connection = _connectionFactory
                              .CreateConnection();
                    });
    
                    if(IsConnected)
                    {
                        _connection.ConnectionShutdown += OnConnectionShutdown;
                        _connection.CallbackException += OnCallbackException;
                        _connection.ConnectionBlocked += OnConnectionBlocked;
    
                        _logger.LogInformation($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events");
    
                        return true;
                    }
                    else
                    {
                        _logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");
    
                        return false;
                    }
                }
            }
    
    
            private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
            {
                if (_disposed) return;
    
                _logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");
    
                TryConnect();
            }
    
            void OnCallbackException(object sender, CallbackExceptionEventArgs e)
            {
                if (_disposed) return;
    
                _logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");
    
                TryConnect();
            }
    
            void OnConnectionShutdown(object sender,ShutdownEventArgs reason)
            {
                if (_disposed) return;
    
                _logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");
    
                TryConnect();
            }
        }

    首在程序加载的时候会创建一个消费者通道,同时给通道的Received委托注册了一个消费方法,这个消费方法的关键点是调用了ProcessEvent方法,完成后就发送一个ack确认:

    private IModel CreateConsumerChannel()
            {
                if (!_persistentConnection.IsConnected)
                {
                    _persistentConnection.TryConnect();
                }
    
                var channel = _persistentConnection.CreateModel();
    
                channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");
    
                channel.QueueDeclare(queue: _queueName
                    , durable: true,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
    
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += async (model, ea) =>
                {
                    var eventName = ea.RoutingKey;
                    var message = Encoding.UTF8.GetString(ea.Body);
    
                    await ProcessEvent(eventName, message);
    
                    channel.BasicAck(ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer);
    
                channel.CallbackException += (sender, ea) =>
                {
                    _consumerChannel.Dispose();
                    _consumerChannel = CreateConsumerChannel();
                };
    
                return channel;
            }

    我们看一下ProcessEvent方法,这个方法是专门用来消费MQ的,前面我们看过,通过字典记录了每个消息类对应的消费类Handler,这个方法的逻辑就是从字典里根据消息类查找对应的消费Handler并通过反射调用:

    private async Task ProcessEvent(string eventName, string message)
            {
                if (_subsManager.HasSubscriptionsForEvent(eventName))
                {
                    using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
                    {
                        var subscriptions = _subsManager.GetHandlersForEvent(eventName);
                        foreach (var subscription in subscriptions)
                        {
                            if (subscription.IsDynamic)
                            {
                                var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                                if (handler == null) continue;
                                dynamic eventData = JObject.Parse(message);
                                await handler.Handle(eventData);
                            }
                            else
                            {
                                var handler = scope.ResolveOptional(subscription.HandlerType);
                                if (handler == null) continue;
                                var eventType = _subsManager.GetEventTypeByName(eventName);
                                var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                                var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                                await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                            }
                        }
                    }
                }
            }

    方法看起来一堆变量名有点乱,其实梳理一下就会发现很简单,方法需要传入一个消息类名eventName和消息内容message,首先会根据eventName判断字典里是注册了当前消息体,如果没有,则直接跳过,并返回ack确认,相当于丢弃该消息。如果字典里存在,则从字典中取出这些处理方法,通过反射加载调用。

    OK,那问题来了,字典里的内容是什么时候存进去的呢,那就是再调用订阅方法的时候存进去的,这个订阅方法是在启动类里调用的,我们看一下:

            public void Configure(IApplicationBuilder app, IHostingEnvironment env)
            {
                ConfigureEventBus(app);
    
                if (env.IsDevelopment())
                {
                    app.UseDeveloperExceptionPage();
                }
    
                app.UseMvcWithDefaultRoute();
            }
            private void ConfigureEventBus(IApplicationBuilder app)
            {
                var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
                eventBus.Subscribe<PublisherIntegrationEvent, OrderStatusChangedValidationIntegrationEventHandle>();
            }

    这个是在Startup.cs的Configure方法里调用的,这里就是通过调用订阅方法Subscribe绑定消息类和处理类:

    public void Subscribe<T, TH>()
                where T : IntegrationEvent
                where TH : IIntegrationEventHandler<T>
            {
                var eventName = _subsManager.GetEventKey<T>();
                DoInternalSubscription(eventName);
                _subsManager.AddSubscription<T, TH>();
            }

    可以看到这里调用了两个方法,DoInternalSubscription方法是绑定routingKey到指定的交换器和队列:

    private void DoInternalSubscription(string eventName)
            {
                var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
                if(!containsKey)
                {
                    if(!_persistentConnection.IsConnected)
                    {
                        _persistentConnection.TryConnect();
                    }
    
                    using (var channel = _persistentConnection.CreateModel())
                    {
                        channel.QueueBind(queue: _queueName, exchange: BROKER_NAME, routingKey: eventName);
                    }
                }
            }

    而AddSubscription方法则是将消息类和处理类添加到字典中:

    public void AddSubscription<T,TH>()
                where T:IntegrationEvent
                where TH:IIntegrationEventHandler<T>
            {
                var eventName = GetEventKey<T>();
    
                DoAddSubscription(typeof(TH), eventName, isDynamic: false);
    
                if(!_eventTypes.Contains(typeof(T)))
                {
                    _eventTypes.Add(typeof(T));
                }
            }
    public void DoAddSubscription(Type handlerType,string eventName,bool isDynamic)
            {
                if(!HasSubscriptionsForEvent(eventName))
                {
                    _handlers.Add(eventName, new List<SubscriptionInfo>());
                }
    
                if(_handlers[eventName].Any(s=>s.HandlerType==handlerType))
                {
                    throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));
                }
    
                if(isDynamic)
                {
                    _handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));
                }
                else
                {
                    _handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));
                }
            }

    这里将消息类名和处理类的类型Type加到了字典里,后面会通过反射来进行调用。

    OK,到这里主要的发布订阅方法就梳理完成了,写的有点乱,不过不要紧!!!后面有时间我再重新排版一下!!!!我们现在在新建的项目的试一下,我再ValueController里调用了发布方法,通过MQ发送一个订单号OrderId:

            [HttpGet]
            public IEnumerable<string> Get()
            {
                var @event = new PublisherIntegrationEvent(5);
                _eventBus.Publish(@event);
                _logger.LogError("发送MQ成功!");
    
                return new string[] { "value1", "value2" };
            }

    消息体PublisherIntegrationEvent是这样的:

        public class PublisherIntegrationEvent : IntegrationEvent
        {
            public int OrderId { get; }
    
            public PublisherIntegrationEvent(int orderId) => OrderId = orderId;
        }

    我在启动类里注册了对当前消息类的处理类OrderStatusChangedValidationIntegrationEventHandle:

    public class OrderStatusChangedValidationIntegrationEventHandle: IIntegrationEventHandler<PublisherIntegrationEvent>
        {
            private readonly ILogger<OrderStatusChangedValidationIntegrationEventHandle> _logger = null;
            public OrderStatusChangedValidationIntegrationEventHandle(ILogger<OrderStatusChangedValidationIntegrationEventHandle> logger)
            {
                _logger = logger;
            }
            public async Task Handle(PublisherIntegrationEvent @event)
            {
                _logger.LogError("收到MQ" + @event.OrderId);
            }
        }

    这里发布和订阅都在一个API里,直接启动:

    OK,eShopOnContainers的梳理就到这里,接下来会在.NET Core商城系列里加入消息总线,但是会改一下,使用数据库+Redis存储消息的信息。

    在我公司项目里现在也是使用RabbitMQ来实现服务与服务之间的异步通信,但是方式肯定和这里是不一样的,我们是根据RoutingKey与服务地址来绑定,通过windows服务处理消息的发布,根据RoutingKey查找对应的服务地址并进行调用,没有使用RabbitMQ的订阅方法,我们提供了一个web界面专门用来注册,这种方式还是挺好用的。

    OK,大功告成,下面我会在商城项目里加入RabbitMQ的使用。

  • 相关阅读:
    es6 常用方法
    vue HTTP 请求(vue-resource)
    vue 常用语法糖
    js中slice,SubString和SubStr的区别
    浅谈JavaScript中forEach与each
    vue 新版本 webpack 代理 跨域设置
    js 动态添加class封装(es6语法)
    jsonp promise 封装
    location.origin兼容IE
    给zTree的treeNode添加class
  • 原文地址:https://www.cnblogs.com/weiBlog/p/10090999.html
Copyright © 2011-2022 走看看