zoukankan      html  css  js  c++  java
  • 事件总线--参考文档

    事件总线

    什么是事物

    例如:事物 所有看到的一切都是事物,不能看到的也是事物

    例如:团队微服务,成员微服务,聚合微服务,网关api,认证中心等等包括类,对象

    所有的事件都是事物变化的结果

    大家接触事件最早就是在js 或者是c#高级特性。大家对于事件不默认,但是对于事件不是很好理解

    什么是事件

    事件就是指事物状态的变化,每一次事物变化的结果都称作为事件

    什么是事件总线

    就是用来管理所有的事件的一种机制就称作为事件总线

    包括事件发布,事件存储,事件订阅,事件处理的统称

    作用:

    事件总线是一种机制,它允许不同的组件彼此通信而不彼此了解。 组件可以将事件发送到Eventbus,而无需知道是谁来接听或有多少其他人来接听。 组件也可以侦听Eventbus上的事件,而无需知道谁发送了事件。 这样,组件可以相互通信而无需相互依赖。 同样,很容易替换一个组件。 只要新组件了解正在发送和接收的事件,其他组件就永远不会知道.

    为什么要使用事件总线

    将微服务系统各组件之间进行解耦

    使用业务的发展来说

    事件总线框架

    CAP

    masstransit

    CAP内部概念

    事件 : 就是一些状态信息

    发布者:发布事件的角色 cap

    订阅者:订阅消费事件的角色 cap

    消息传输器:传输事件

    消息存储器:存储事件

    CAP存储事件消息队列类型Transport

    Azure

    rabbitmq

    kafaka

    In Memory Queue

    CAP存储事件持久化类型

    SQL Server

    MySQL

    PostgreSQL

    MongoDB

    InMemoryStorage

    CAP事件监控

    Dashboard

    微服务系统中如何使用CAP

    条件

    1、微服务系统

    2、RabbitMQ

    3、SqlServer

    4、CAP

    步骤

    1、微服务系统准备

    微服务系统全部准备完毕

    2、RabbitMQ准备

    2.1 环境准备

    Erlang下载地址:https://www.erlang.org/downloads

    RabbitMQ下载地址:https://www.rabbitmq.com/download.html

    2.2 RabbitMQ 启动

    1、在安装目录下添加可视化插件

    rabbitmq-plugins enable rabbitmq_management

    2、在安装目录下启动

        rabbitmq-server 

    3、查看rabbitmq状态

        rabbitmqctl status

    4、在浏览器输入http://127.0.0.1:15672

    访问rabbitmq后台系统

    3、SqlServer准备

    SqlServer启动,安装

    4、CAP准备

    4.1 CAP环境

    CAP官网地址:https://cap.dotnetcore.xyz/user-guide/zh/monitoring/dashboard/

    4.2 CPA配置

    1、在MicroService.Core项目中添加依赖

    CAP Nuget DotNetCore.CAP
    CAP传输器Nuget DotNetCore.CAP.RabbitMQ
    CAP持久化DotNetCore.CAP.SqlServer

    2、在MicroService.AggregateService服务中startup.cs中添加

     // 8、添加事件总线cap
              services.AddCap(x => {
                  // 8.1 使用内存存储消息(消息发送失败处理)
                  x.UseInMemoryStorage();

                  // 8.2 使用RabbitMQ进行事件中心处理
                  x.UseRabbitMQ(rb => {
                      rb.HostName = "localhost";
                      rb.UserName = "guest";
                      rb.Password = "guest";
                      rb.Port = 5672;
                      rb.VirtualHost = "/";
                  });
              });

    2.1 在AggregateController.cs中注入ICapPublisher

        private readonly ICapPublisher capPublisher;
    public TeamsController(ICapPublisher capPublisher)
      {
          this.capPublisher = capPublisher;
      }

    3、在MicroService.VideoService服务startup.cs中添加

     // 8、添加事件总线cap
              services.AddCap(x => {
                  // 8.1 使用RabbitMQ进行事件中心处理
                  x.UseRabbitMQ(rb => {
                      rb.HostName = "localhost";
                      rb.UserName = "guest";
                      rb.Password = "guest";
                      rb.Port = 5672;
                      rb.VirtualHost = "/";
                  });
              });

    3.1 在VideoController.cs 中方法上添加特性[CapSubscribe]

           /// <summary>
          /// 视频添加(异步添加)
          /// </summary>
          /// <param name="Video"></param>
          /// <returns></returns>
          [NonAction]
          [CapSubscribe("tontcap")]
          public ActionResult<Video> PostVideo(Video Video)
          {
              videoService.Create(Video);
          return CreatedAtAction("GetVideo", new { id = Video.Id }, Video);
      }

    4、效果展示​

    RabbitMQ宕机情况

    步骤

    1、将RabbitMQ直接关闭

    事件消息无法发送,存储到内存缓存中

    2、当将RabbitMQ启动后,消息正常发送

    内部使用定时器轮询机制实现

    AggregateService宕机情况

    AggregateService 执行业务成功,发送消息前宕机

    使用本地消息表解决(思想:持久化操作)

    条件

    1、本地消息表

    步骤

    1、在MicroService.AggregateService服务中

    1.1 创建Context文件,然后在Context文件夹内创建AggregateContext

    /// <summary>
        /// Aggregate服务上下文
        /// </summary>
        public class AggregateContext : DbContext
        {
            public AggregateContext(DbContextOptions<AggregateContext> options) : base(options)
            {
            }
    }

    1.2 在appsettings.json中添加

    {
      "ConnectionStrings": {
        "DefaultConnection": "Data Source=.;Initial Catalog=aggregateservice;Persist Security Info=True;User ID=sa;Password=tony"
      }
    }

    1.3 在startup.cs中添加消息持久化

             // 9、注册上下文到IOC容器
                services.AddDbContext<AggregateContext>(options =>
                {
                    options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
                });
            // 8、添加事件总线cap
            services.AddCap(x => {
                // 8.1 使用EntityFramework进行存储操作
                x.UseEntityFramework<AggregateContext>();
                // 8.2 使用sqlserver进行事务处理
                x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
            // 8.2 使用RabbitMQ进行事件中心处理
            x.UseRabbitMQ(rb => {
                rb.HostName = "localhost";
                rb.UserName = "guest";
                rb.Password = "guest";
                rb.Port = 5672;
                rb.VirtualHost = "/";
            });
        });

    1.4测试演示效果

    数据库中多了两张表

    1589706447269

    当业务执行成功,发送消息时,聚合微服务宕机,消息被持久化到数据库

    当重启聚合微服务时,消息发送成功,被成功消费

    1.5 原理

    1、定时器 消息重试

    2、幂等性 一个函数每次都是相同的结果,状态只有一个

    VideoService宕机情况

    VideoService接受消息失败

    当VideoService直接宕机的时候接受消息失败,

    然后重启VideoService消息消费成功

    VideoService接受消息成功执行失败

    条件

    1、本地消息表

    步骤

    1、在MicroService.Core项目中

    1.1 安装SqlServer

    Nuget DotNetCore.CAP.SqlServer

    2、在MicroService.VideoService项目中

    2.1 在startup.cs中添加消息持久化

        // 8、添加事件总线cap
        services.AddCap(x => {
            // 8.1 使用EntityFramework进行存储操作
            x.UseEntityFramework<AggregateContext>();
            // 8.2 使用sqlserver进行事务处理
            x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
        // 8.2 使用RabbitMQ进行事件中心处理
        x.UseRabbitMQ(rb => {
            rb.HostName = "localhost";
            rb.UserName = "guest";
            rb.Password = "guest";
            rb.Port = 5672;
            rb.VirtualHost = "/";
        });
    });

    2.2 效果展示

    数据库多了两张表

    1589706483154

    2.3 原理

    1、定时器 消息重试

    2、幂等性 一个函数每次都是相同的结果,状态只有一个

    消息重试完还是消费失败情况

    使用人工干预实现

    条件

    1、Dashboard -- 后台管理页面

    步骤

    1、在MicroService.Core项目中

    1.1 安装Dashboard

    Nuget DotNetCore.CAP.Dashboard

    2、在MicroService.VideoService项目中

    2.1 在startup.cs中添加Dashboard

    // 8、添加事件总线cap
    services.AddCap(x => {
        // 8.1 使用EntityFramework进行存储操作
        x.UseEntityFramework<AggregateContext>();
        // 8.2 使用sqlserver进行事务处理
        x.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));
    // 8.3 使用RabbitMQ进行事件中心处理
    x.UseRabbitMQ(rb => {
        rb.HostName = "localhost";
        rb.UserName = "guest";
        rb.Password = "guest";
        rb.Port = 5672;
        rb.VirtualHost = "/";
    });
    
    // 8.4添加cap后台监控页面
        x.UseDashboard();
    });

    2.2 运行打开cap后台监控页面

    http://localhost:5007/cap

    对于发送失败的消息进行重复发送

    对于消费失败的消息进行重复消费

     

    如有错误,欢迎您指出。
    本文版权归作者和博客园共有,欢迎转载,但必须在文章页面给出原文链接,否则保留追究法律责任的权利。
  • 相关阅读:
    学习windows编程 day4 之视口和窗口
    学习windows编程 day4 之 映射模式
    学习windows编程 day4 之 盯裆猫
    Android自动化测试(UiAutomator)简要介绍
    UltraEdit正则表达式介绍及实例
    addr2line -f -e *.so 0x9d69
    Android执行shell命令
    Drawable、Bitmap、byte[]之间的转换
    Ubuntu 12.04 64bit 配置完android 5.0编译环境后出现“could not write bytes: Broken pipe.”而无法进入输入帐号密码的登陆界面
    CameraTest
  • 原文地址:https://www.cnblogs.com/qingyunye/p/13378022.html
Copyright © 2011-2022 走看看