zoukankan      html  css  js  c++  java
  • 微软微服务eShopOnContainers示例之EventBusRabbitMq解析与实践

    eShopOnContainers

    eShopOnContainers是微软官方的微服务架构示例,GitHub地址https://github.com/dotnet-architecture/eShopOnContainers

    eShopOnContainers架构中有一个使用RabbitMQ实现的EventBus(事件总线),EventBus使用的是发布订阅模式, 使用事件驱动,使得有了一个新需求后直接添加对应Handler并注册订阅即可,符合单一职责原则。下在就详细说说我是如果把eShopOnContainers中的EventBus用到自己的项目中的

    项目结构图

       

    持久连接类

    面向接口是主流的开发方式,使用rabbitmq我们需要有一个管理与rabbtimq服务器连接的类,也就是

    IRabbitMqPersistentConnection 接口与 DefaultRabbitMqPersistentConnection

       

    1 /// <summary>

    2 /// Rabbitmq持久连接

    3 /// </summary>

    4 public interface IRabbitMqPersistentConnection : IDisposable

    5 {

    6   /// <summary>

    7    /// 是否连接

    8   /// </summary>

    9   bool IsConnected { get; }

    10

    11   /// <summary>

    12   /// 尝试连接

    13    /// </summary>

    14    /// <returns></returns>

    15    bool TryConnect();

    16

    17    IModel CreateModel();

    18 }

       

    在接口里定义了三个方法(属性也是方法),第一个判断是否已经连接,第二个尝试连接,第二个方法创建一个channel

    DefaultRabbitMqPersistentConnection类里实现了接口,它需要一个类型为IConnectionFactory的参数

       

       

    首先来看看TryConnect方法

    在这里使用了传入的_connectionFactory创建了连接,并且注册了连接成功、失败等事件进行了log输出

    1 public bool TryConnect()

    2 {

    3    Logger.Info("RabbitMQ客户端正在尝试连接");

    4

    5    lock (_syncRoot)

    6   {

    7      var policy = Policy.Handle<SocketException>().Or<BrokerUnreachableException>()

    9 .    WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>

    10      {

    11       Logger.Warn(ex.ToString());

    12      }

    13 );

    14

    15    policy.Execute(() =>

    16   {

    17     _connection = _connectionFactory.CreateConnection();

    19    });

    20

    21   if (IsConnected)

    22    {

    23      _connection.ConnectionShutdown += OnConnectionShutdown;

    24      _connection.CallbackException += OnCallbackException;

    25      _connection.ConnectionBlocked += OnConnectionBlocked;

    26

    27      Logger.Info($"连接到 {_connection.Endpoint.HostName} 并注册了异常失败事件");

    28

    29      return true;

    30   }

    31    Logger.Fatal("致命错误:无法创建和打开RabbitMQ连接");

    32

    33    return false;

    34    }

    35 }

       

    订阅管理器

    连接类其它的已经没有什么值得一说的了,下面我们把眼光转到订阅管理器

    这是管理器的接口,作用是来用维护EventDataEventHandler的关系,图中箭头指向的代码是我自己添加的,是为了定义一个命名约定把EventDataEventHandler的关系统一添加到管理器中

    订阅管理器接口的默认实现是InMemoryEventBusSubscriptionsManager类放到内存中,也可以轻松添加redissql server等实现,InMemoryEventBusSubscriptionsManager内部维护了一个字典,对应关系都放在这个字典中,可以进行添加订阅、取消订阅

       

    在说EventBus类之前先说说两种Handler,一种是普通的事件处理,泛型接口接收IntegrationEvent(EventData)的派生类做为参数并且定义了Handler方法来进行处理

    第二种是动态EventDataHandler,它并不会要求必须是IntegrationEvent的派生类,可以灵活的处理其它的类型

       

    事件总线

    下面就到了EventBus的接口,定义很简单,进行订阅事件,和dynamic的事件以及取消订阅和最重要的发布事件

    1   public interface IEventBus

    2    {

    3      /// <summary>

    4      /// 订阅事件

    5      /// </summary>

    6     /// <typeparam name="T"></typeparam>

    7      /// <typeparam name="TH"></typeparam>

    8      void Subscribe<T, TH>()

    9      where T : IntegrationEvent

    10     where TH : IIntegrationEventHandler<T>;

    11

    12      /// <summary>

    13      /// 订阅动态事件

    14      /// </summary>

    15      /// <typeparam name="TH"></typeparam>

    16      /// <param name="eventName"></param>

    17      void SubscribeDynamic<TH>(string eventName)

    18      where TH : IDynamicIntegrationEventHandler;

    19

    20     /// <summary>

    21     /// 取消订阅动态事件

    22      /// </summary>

    23     /// <typeparam name="TH"></typeparam>

    24     /// <param name="eventName"></param>

    25      void UnsubscribeDynamic<TH>(string eventName)

    26      where TH : IDynamicIntegrationEventHandler;

    27

    28      /// <summary>

    29     /// 订阅事件

    30      /// </summary>

    31      /// <param name="event"></param>

    32      /// <param name="handler"></param>

    33      void Subscribe(Type @event, Type handler);

    34

    35      /// <summary>

    36      /// 取消订阅事件

    37      /// </summary>

    38     /// <typeparam name="T"></typeparam>

    39      /// <typeparam name="TH"></typeparam>

    40      void Unsubscribe<T, TH>()

    41      where TH : IIntegrationEventHandler<T>

    42      where T : IntegrationEvent;

    43

    44     /// <summary>

    45      /// 发布事件

    46     /// </summary>

    47      /// <param name="event"></param>

    48      void Publish(IntegrationEvent @event);

    49 }

    我使用的是RabbitMQEventBus实现,RabbitMQAMQP协议的企业级消息队列,可靠性与性能都非常高。具体大家可以去了解一下RbbaitMQ

    EventBusRabbitMq类需要三个参数

    RabbitMqPersistentConnection 持久连接类

    IEventBusSubscriptionsManager 订阅管理器

    ILifetimeScope 以及autofac (依赖注入)

    实际上还需要一个Logger参数,在我的项目中使用了全局的Logger类,所以就不需要这个参数了,在构造函数中可以看到如果没有传入订阅管理器,那么将会使用默认的InMemry,而且在构造函数中就创建了消费者(和Queue), 在示例中是没有常量QueueName的,使用的是断开连接后自动删除的队列,但是这样会丢失消息,所以我改成了持久的队列

       

    下面就是创建消费者的方法了,首先声明一个direct类型的交换机,我对方法进行了修改,添加了对消费者的公平分发,以及手动交付和消息持久化来确保消息会被成功的消费,Received方法中调用了ProcessEvent方法,在这个方法里就是对消息的消费了

    1   private IModel CreateConsumerChannel()

    2   {

    3      if (!_persistentConnection.IsConnected)

    4     {

    5     _persistentConnection.TryConnect();

    6     }

    7

    8     var channel = _persistentConnection.CreateModel();

    9

    10      channel.ExchangeDeclare(exchange: BrokerName,

    11      type: "direct");

    12      //均发,同一时间只处理一个消息

    13      channel.BasicQos(0, 1, false);

    14      _queueName = channel.QueueDeclare(QueueName, true, false, false);

    15

    16      var consumer = new EventingBasicConsumer(channel);

    17     consumer.Received += async (model, ea) =>

    18      {

    19      var eventName = ea.RoutingKey;

    20      var message = Encoding.UTF8.GetString(ea.Body);

    21

    22      await ProcessEvent(eventName, message);

    23      //交付

    24      channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

    25    };

    26

    27      channel.BasicConsume(queue: _queueName,

    28      autoAck: false,

    29      consumer: consumer);

    30

    31      channel.CallbackException += (sender, ea) =>

    32     {

    33        _consumerChannel.Dispose();

    34       _consumerChannel = CreateConsumerChannel();

    35     };

    36

    37      return channel;

    38    }

       

    方法很简单,首先判断事件名称在订阅管理器中是否存在,然后创建一个autofac的生命周期

    使用EventData的名称拿到所有的Hnalder

    var subscriptions = _subsManager.GetHandlersForEvent(eventName);

    然后进行判断是否是Dynamic,后面代码基本一致就是从autofac中拿到EventHandler然后调用Handler方法

       

       

    下面就是依赖注入的部分了

    首先是注册订阅管理器

    在这里创建ConnectionFactory类,并且读取config中的配置项,拿到hostuser以及pwd,我又添加了断线重连和工作进程恢复

    builder.RegisterInstance(new DefaultRabbitMqPersistentConnection(new ConnectionFactory()

    {

      HostName = ConfigurationManager.AppSettings["rabbitmqHost"],

      UserName = ConfigurationManager.AppSettings["rabbitmqUser"],

      Password = ConfigurationManager.AppSettings["rabbitmqPwd"],

      //断线重连并恢复工作进程

      AutomaticRecoveryEnabled = true,

      TopologyRecoveryEnabled = true

    })).As<IRabbitMqPersistentConnection>()

    .SingleInstance();

       

    然后就是注册事件总统与订阅管理器

    builder.RegisterType<EventBusRabbitMq.EventBusRabbitMq>().As<IEventBus>().SingleInstance();

    builder.RegisterType<InMemoryEventBusSubscriptionsManager>().As<IEventBusSubscriptionsManager>().SingleInstance();

       

    最后我拿到了EventDataEventHandler所在程序中,扫描所有的Type并为所有以Handler结尾的类型添加到容器了,然后拿到了派生于IntegrationEvent的所有子类然后与对应的Handler进行订阅,这样基本上大多数的开发场景都不需要单独进行订阅了,只要符合命名约定就会自动订阅

    //配置EventBus

    var ass = Assembly.GetAssembly(typeof(Response));

    builder.RegisterAssemblyTypes(ass).Where(t => t.Name.EndsWith("Handler")).InstancePerDependency();

    var build = builder.Build();

    var bus = build.Resolve<IEventBus>();

    var allEvent = ass.GetTypes().Where(o => o.IsSubclassOf(typeof(IntegrationEvent)));

    foreach (var t in allEvent)

    {

      var handler = ass.GetTypes().FirstOrDefault(o => o.Name.StartsWith(t.Name) && o.Name.EndsWith("Handler"));

      if (handler != null)

      bus.Subscribe(t, handler);

    }

       

       

       

    最后就是使用了,我定义了TestEventTestEventHandler类,在Handler中输出TestEvent的属性 Test

    1     public class TestEvent : IntegrationEvent

    2     {

    3        public string Test { get; set; }

    4      }

    5

    6     public class TestEventHandler : IIntegrationEventHandler<TestEvent>

    7      {

    8        private readonly Repository<Lawfirm> _lawfirmRepository;

    9

    10       public TestEventHandler(Repository<Lawfirm> lawfirmRepository)

    11       {

    12          _lawfirmRepository = lawfirmRepository;

    13        }

    14

    15      public Task Handle(TestEvent @event)

    16       {

    17         Console.WriteLine(@event.Test);

    18        return Task.FromResult(0);

    19         }

    20   }

       

    使用非常简单,下面我使用事件总线发布了一个消息,然后把程序运行起来

    1 _eventBus = container.Resolve<IEventBus>();

    2

    3 _eventBus.Publish(new TestEvent

    4 {

    5 Test = "Hello World"

    6 });

       

    我们看到了刚才发布的消息HelloWorld ,哈哈,输出是不是很奇怪,不家什么成功导入x条,这是因为我的爬虫项目,在博文的同时正在采集着数据,消息被发到了这个消费者下并且被成功消费。安利一下我搭的爬虫框架,使用了.net core 2.0+Ef core 2.0,集成了无头浏览器与anglesharp还有EventBus那就不用说了,相关技术栈有 依赖注入、仓储模式、AutoMapper、还有一个Web的管理界面以及下一步要集成的Hangfire任务调度。有感兴趣的可以留言,我会根据反馈信息决定是否开源分享给大家

       

  • 相关阅读:
    spring requestbody json
    idea 配置自动编译 livereload
    idea hibernate console 执行hql报错
    查找最小的破坏连续性的数字
    xcopy忽略文件 7zip打包
    window 批处理脚本获取上级目录
    substring c# js java
    R语言中使用多重聚合预测算法(MAPA)进行时间序列分析
    R语言使用最优聚类簇数k-medoids聚类进行客户细分
    R语言中的岭回归、套索回归、主成分回归:线性模型选择和正则化
  • 原文地址:https://www.cnblogs.com/LiangSW/p/7512342.html
Copyright © 2011-2022 走看看