zoukankan      html  css  js  c++  java
  • .NET Core分布式事件总线、分布式事务解决方案:CAP

    简介

    CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,它具有轻量级,高性能,易使用等特点。
    分布式事务是在分布式系统中不可避免的一个硬性需求,CAP 没有采用两阶段提交(2PC)这种事务机制,而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫做 异步确保。
    CAP 实现了 EventBus 中的发布/订阅,它具有 EventBus 的所有功能。也就是说你可以像使用 EventBus 一样来使用 CAP,另外 CAP 的 EventBus 是具有高可用性的, CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化,这样可以保证 EventBus 发出的消息是可靠的,当消息队列出现宕机或者连接失败的情况时,消息也不会丢失。

    源码:
    https://github.com/dotnetcore/CAP

    目前, CAP 同时支持使用 RabbitMQ,Kafka,Azure Service Bus 等进行底层之间的消息发送,你不需要具备这些消息队列的使用经验,仍然可以轻松的集成到项目中。
    CAP 目前支持使用 Sql Server,MySql,PostgreSql,MongoDB 数据库的项目。
    CAP 同时支持使用 EntityFrameworkCore 和 ADO.NET 的项目,你可以根据需要选择不同的配置方式。
    下面是CAP在系统中的一个不完全示意图:

    图中实线部分代表用户代码,虚线部分代表CAP内部实现。

    消息表

    CAP框架会在数据库中添加两个表,以保证消息在任何情况下都会被成功发送、消费
    cap.Published:消息在发送到队列之前会在此表中添加一条记录,防止特殊原因导致消息未成功发送到队列
    cap.Received:在从队列接收到消息后在此表添加一条记录,CAP定时轮询从此表读取未成功消费的消息,交给订阅方法处理,直到成功消费(订阅方法内部不抛出异常即成功消费);同时也能防止消息被重复消费

    CAP集成到项目

    添加Nuget包

    数据库使用Sqlserver,消息队列使用RabbitMQ

    <PackageReference Include="DotNetCore.CAP" Version="3.1.2" />
    <PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="3.1.2" />
    <PackageReference Include="DotNetCore.CAP.SqlServer" Version="3.1.2" />
    

    Startup添加配置

            public void ConfigureServices(IServiceCollection services)
            {
                services.AddControllersWithViews();
    
                services.AddCap(option =>
                {
                    //如果你使用的 EF 进行数据操作,你需要添加如下配置:
                    //option.UseEntityFramework<AppDbContext>();  //可选项,你不需要再次配置 option.UseSqlServer 了
    
                    //如果你使用的ADO.NET,根据数据库选择进行配置:
                    option.UseSqlServer("Server=SC-202003151209\SL1;Database=CAPDB;User=sa;Password=123456;");
                    //option.UseMySql("连接字符串")
    
                    //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,根据使用选择配置:
                    option.UseRabbitMQ(rabbitOption => {
                        rabbitOption.HostName = "xxxx";
                        rabbitOption.Port = 5672;
                        rabbitOption.Password = "xxxx";
                        rabbitOption.UserName = "xxxx";
                        rabbitOption.VirtualHost = "xxxx";
                    });
                });
            }
    

    仪表盘

    仪表盘可以显示订阅列表、发送、接收、失败消息,并且可以操作消息的重复消费
    添加Nuget:

    <PackageReference Include="DotNetCore.CAP.Dashboard" Version="3.1.2" />
    

    UseDashboard:

    services.AddCap(option =>
    {
    option.UseDashboard();
    }
    

    访问仪表盘:http://xxxx/cap

    发布事件

    
    
            /// <summary>
            /// 不使用事务
            /// </summary>
            /// <param name="capBus"></param>
            /// <returns></returns>
            public IActionResult Index1([FromServices] ICapPublisher capBus)
            {
                capBus.Publish("Meshop.PayService.Refund", new RefundMessage { OrderID = 1, RefundPrice = 100M });
                return View();
            }
            /// <summary>
            /// 使用事务,自动提交
            /// </summary>
            /// <param name="capBus"></param>
            /// <returns></returns>
            public IActionResult Index2([FromServices] ICapPublisher capBus)
            {
                using (var conn = new SqlConnection("Server=SC-202003151209\SL1;Database=CAPDB;User=sa;Password=123456;"))
                {
                    using (var tran = conn.BeginTransaction(capBus, true))
                    {
                        var orderMaster = new OrderMaster { OrderState = 1 };
                        long id = conn.Insert(orderMaster, tran);
                        capBus.Publish("Meshop.PayService.Refund", new RefundMessage { OrderID = 1, RefundPrice = 100M });
                    }
                }
                return View();
            }
    

    订阅事件

    如果是在Controller中,直接添加[CapSubscribe("")]来订阅相关消息。

    
            [CapSubscribe("Meshop.PayService.Refund")]
            public Task Refund(RefundMessage message)
            {
                return Task.CompletedTask;
            }
    

    如果你的方法没有位于Controller 中,那么你订阅的类需要继承 ICapSubscribe,然后添加[CapSubscribe("")]标记:

    namespace xxx.Service
    {
        public interface ISubscriberService
        {
        	public void CheckReceivedMessage(DateTime time);
        }
        
        
        public class SubscriberService: ISubscriberService, ICapSubscribe
        {
        	[CapSubscribe("xxxx.services.show.time")]
        	public void CheckReceivedMessage(DateTime time)
        	{
        		
        	}
        }
    }
    

    然后在 Startup.cs 中的 ConfigureServices() 中注入你的 ISubscriberService

    public void ConfigureServices(IServiceCollection services)
    {
        services.AddTransient<ISubscriberService,SubscriberService>();
    }
    

    如何接收返回参数

    在调用Publish方法发布消息时传一个callbackName参数,并添加订阅,异步接收返回数据,如下:

    public IActionResult Index1([FromServices] ICapPublisher capBus)
            {
                capBus.Publish("Meshop.PayService.Refund", new RefundMessage { OrderID = 1, RefundPrice = 100M }, "Meshop.OrderService.Return");
                return View();
            }
    [CapSubscribe("Meshop.OrderService.Return")]
            public IActionResult IndexReturn(RefundMessage message)
            {
                return null;
            }
    

    版本号

    
    

    参考:
    https://www.cnblogs.com/cmliu/p/11767343.html
    https://www.cnblogs.com/savorboard/p/cap.html

  • 相关阅读:
    查看.Net Framework的版本(PC和WinCE)
    .NET C# 创建WebService服务简单的例子
    ORA-01502错误成因和解决方法
    高水线 High water mark(HWM)
    Oracle高水位线(HWM)及性能优化
    解决(Oracle)ORA-12528: TNS: 监听程序: 所有适用例程都无法建立新连接 问题
    ORACLE切非归档模式:
    ArcGIS学习记录—dbf shp shx sbn sbx mdb adf等类型的文件的解释
    ArcGIS学习记录—ArcGIS ArcMap编辑状态中线打断的问题
    ArcGIS学习记录—Arcgis中点、线、面的相互转换方法
  • 原文地址:https://www.cnblogs.com/fanfan-90/p/14233198.html
Copyright © 2011-2022 走看看