zoukankan      html  css  js  c++  java
  • .NetCoreCAP分布式事务

    分布式事务,EventBus 解决方案:CAP

    .NET Core 事件总线,分布式事务解决方案:CAP

    分布式事务,EventBus 解决方案:CAP【中文文档】

    来自CAP原作者yang-xiaodong的原理图:
    本文撰写者:cmliu,部分内容引用自官方文档,部分内容待更新# .NetCore CAP
    来源地址

    1、简介

    CAP 是一个遵循 .NET Standard 标准库的C#库,用来处理分布式事务以及提供EventBus的功能,她具有轻量级,高性能,易使用等特点。
    目前 CAP 使用的是 .NET Standard 1.6 的标准进行开发,目前最新预览版本已经支持 .NET Standard 2.0

    CAP 的应用场景主要有以下两个

    分布式事务中的最终一致性(异步确保)的方案

    分布式事务是在分布式系统中不可避免的一个硬性需求,CAP 没有采用两阶段提交(2PC)这种事务机制
    而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫做 异步确保。

    具有高可用性的 EventBus(事件总线)

    CAP 实现了 EventBus 中的发布/订阅,它具有 EventBus 的所有功能。

    也就是说你可以像使用 EventBus 一样来使用 CAP,另外 CAP 的 EventBus 是具有高可用性的, CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化

    这样可以保证 EventBus 发出的消息是可靠的,当消息队列出现宕机或者连接失败的情况时,消息也不会丢失

    注意本文最底部内容

    2,入门

    1),引用基本包

        DotNetCore.CAP
    

    2),引用消息层包(用于服务端【生产者】与客户端【订阅者】之间的通信)

        RabbitMQ 引用DotNetCore.CAP.RabbitMQ
        Kafka    引用DotNetCore.CAP.Kafka
    

    3)引用数据库包(用于保存本地的收发消息记录表)

        SqlServer  引用DotNetCore.CAP.SqlServer
        MySql      引用DotNetCore.CAP.MySql
        MongODB    引用DotNetCore.CAP.MongoDB
    

    4)启动配置:Startup.cs的ConfigureServices()方法中配置Cap服务

            //此处用于注册继承自:ICapSubscribe接口的订阅服务,以下CapUserService类继承了ICapSubscribe接口
            //继承自ICapSubscribe接口的订阅,需要在:AddCap方法之前注册服务,否则将不会被扫描到
    
            //services.AddTransient<ICapUserService, CapUserService>();
    
            //下面语句用于开启支持使用EntityFramework,使用此方案时,无需配置UseSqlServer或者Mysql
    
            //services.AddDbContext<AppDbContext>();
    
            //配置Cap
    
            services.AddCap(x =>
                {
                    //配置Cap的本地消息记录库,用于服务端保存Published消息记录表;客户端保存Received消息记录表
    
                    // 此方法默认使用的数据库Schema为Cap;2,要求最低sql server2012(因为使用了Dashboard的sql查询语句使用了Format新函数)
                    //x.UseSqlServer("Integrated Security=False;server=服务器;database=cap;User ID=sa;Password=密码;Connect Timeout=30");
    
                    // 配置Cap的本地消息记录库,用于服务端保存Published消息记录表;客户端保存Received消息记录表
                    // 此方法可以指定是否使用sql server2008,数据库Schema,链接字符串
                    x.UseSqlServer((options) =>
                        {
                            //数据库连接字符串
                            options.ConnectionString="Integrated Security=False;server=192.168.1.109;database=cap;User ID=sa;Password=密码;Connect Timeout=30";
                            //标记使用的是SqlServer2008引擎(此处设置的是2008,因为192.168.1.109数据库是2008)
                            options.UseSqlServer2008();
                            //Cap默认使用的数据库Schema为Cap;此处可以指定使用自己的数据库Schema
                            //options.Schema = "dbo";
                        });
    
                    //使用Kafka作为底层之间的消息发送
                    x.UseKafka("192.168.1.230:9092,192.168.1.231:9092,192.168.1.232:9092");
                    //x.UseKafka(options =>
                    //{
                    //    options.Servers = "192.168.1.230:9092,192.168.1.231:9092,192.168.1.232:9092";
                    //});
    
                    //使用Dashboard,这是一个Cap的可视化管理界面;默认地址:http://localhost:端口/cap
                    x.UseDashboard();
    
                    //默认分组名,此值不配置时,默认值为当前程序集的名称
                    //x.DefaultGroup = "m";
                    //失败后的重试次数,默认50次;在FailedRetryInterval默认60秒的情况下,即默认重试50*60秒(50分钟)之后放弃失败重试
                    //x.FailedRetryCount = 10;
    
                    //失败后的重拾间隔,默认60秒
                    //x.FailedRetryInterval = 30;
    
                    //设置成功信息的删除时间默认24*3600秒
                    //x.SucceedMessageExpiredAfter = 60 * 60;
                });
    

    5)消息推送者(生产者)

    5.1)以下代码是在一个Controler中进行推送

            //省略其他代码
            public class CapDemoController : Controller
            {
                //注入一个ICapPublisher
                private readonly ICapPublisher _capBus;
                public CapDemoController(ICapPublisher capPublisher)
                {
                    _capBus = capPublisher;
                }
    
                //简单的推送使用
                public async Task<IActionResult> GetDemo()
                {
                    //发送消息给客户端,第一个参值数"kjframe.test"为消息队列的topic
                    await _capBus.PublishAsync("kjframe.test", DateTime.Now);
                    return Ok();
                }
    
            }
    

    5.2)以下是添加了一个手动提交事务推送的Api接口

            public async Task<IActionResult> GetTransaction()
            {
                using (var connectionn = new SqlConnection("链接字符串"))
                {
                    //创建手动提交的事务,false,表示手动提交
                    using (var transaction = connectionn.BeginTransaction(_capBus, false))
                    {
                        //sqlserver,执行自定义业务
                        connectionn.Execute("update TableA set Name='嘻嘻' where id=1", null, transaction);
    
                        //mysql
                        //connection.Execute("sql语句",null,(IDbTransaction)transaction.DbTransaction);
    
                        //执行异步的分布式事务,推送必须在transaction.Commit()事务提交语句之前执行
                        _capBus.PublishAsync("m.test", DateTime.Now);
    
                        //事务提交:如果connectionn.BeginTransaction(_capBus, false)的autoCommit参数为false,则需要手动提交事务
                        transaction.Commit();
                    }
                }
                return Ok();
            }
    

    5.3)以下是添加了一个自动提交事务推送的Api接口

            public async Task<IActionResult> GetAutoTransaction()
            {
                using (var connectionn = new SqlConnection("链接字符串"))
                {
                    //创建自动提交的事务
                    using (var transaction = connectionn.BeginTransaction(_capBus, true))
                    {
                        //sqlserver,执行自定义业务
                        connectionn.Execute("update TableA set Name='嘻嘻' where id=1", null, transaction);
    
                        //mysql
                        //connection.Execute("sql语句",null,(IDbTransaction)transaction.DbTransaction);
    
                        //此处connectionn.BeginTransaction的autoCommit参数true,所以cap在PublishAsync方法中会自动提交事务
                        //当使用的EntityFramework(EF)操作数据库时,此处也会保存EF的上下文(SaveChanges)
                        //自动提交事务时,PublishAsync需要在最后面
                        _capBus.PublishAsync("m.test", DateTime.Now);
                    }
                }
                return Ok();
            }
    

    6)订阅(在Controller中的订阅)

    订阅的Controller无需继承ICapSubscribe接口,也无需像继承自ICapSubscribe接口的订阅那样要在Startup中的AddCap方法之前注册服务

        [Route("api/[controller]")]
        [ApiController]
        public class CapSubscribeController : Controller
        {
    
            //使用指定订阅组
            //无需返回值,void或Task即可
            [CapSubscribe("m.test", Group = "group4")]
            public void TestSubscribe(string date)
            {
                Console.WriteLine($"接收到订阅:{date}");
            }
    
            //使用默认订阅组(当前程序集名,或者是Startup中配置的DefaultGroup参数)
            //无需返回值,void或Task即可
            //此处订阅了两个topic,
            [CapSubscribe("m.test")]
            [CapSubscribe("xxx.services.bar")]
            public void TestSubscribe(string date)
            {
                Console.WriteLine($"接收到订阅:{date}");
            }
        }
    

    7)订阅(在服务层或者非Controller中订阅的实现方式:继承ICapSubscribe接口,并在Startup.cs配置文件中的AddCap方法之前注册该服务)

    订阅处:ICapUserService.cs;CapUserService.cs

        public interface ICapUserService
        {
            void SubscribeWithnoController(string date);
        }
    
        public class CapUserService : ICapSubscribe, ICapUserService
        {
            [CapSubscribe("m.test")]
            public void SubscribeWithnoController(string date)
            {
                Console.WriteLine($"SubscribeWithnoController接收到订阅:{date}");
            }
        }
    

    配置处:Startup.cs的ConfigureServices方法

            public void ConfigureServices(IServiceCollection services)
            {
                //注册继承了继承ICapSubscribe接口的订阅
                services.AddTransient<ICapUserService, CapUserService>();
    
                //省略代码
    
                //注册CAP服务
                services.AddCap(x =>
                {
                    //省略代码
                });
    
                //省略代码
            }
    

    8)同一topic,被多个不同group的订阅者订阅

    此时,每一个订阅者都会收到消息,且Received表中会给每一个订阅者插入一条【Content字段】相同的订阅记录,有3个订阅者,就有3条Received数据记录

    示例

            //生产者
            [HttpGet]
            public async Task<IActionResult> GetDemo()
            {
                //省略其他代码
                await _capBus.PublishAsync("m.test", DateTime.Now);
                //省略其他代码
            }
    
            //订阅者1
            [CapSubscribe("m.test")]
            public void TestSubscribe(DateTime date)
            {
                Console.WriteLine($"接收到订阅:{date.ToString("yyyy-MM-dd hh:mm:ss")}");
            }
    
            //订阅者2(group1)
            [CapSubscribe("m.test", Group = "group1")]
            public void SubscribeGroup2(string date)
            {
                Console.WriteLine($"group1接收到消息:{date}");
            }
    
            //订阅者3(WDB)
            [CapSubscribe("m.test", Group = "group4")]
            public void TestSubscribe(string date)
            {
                Console.WriteLine($"group4接收到消息:{date}");
            }
    

    Received表消费记录,如下图,产生了3条记录,其中“cap.queue.kjframe.core.capdemo.v1”是默认topic组名,注意截图中的Group的组名与本处的代码有出入

    9)失败回调FailedThresholdCallback(失败达到重试上线时,触发此回调)

    配置

            services.AddCap(x=>{
            //其他代码
            x.FailedThresholdCallback = FailCallBack
            //其他代码
            });
    
           //失败时的回调通知函数
           public void FailCallBack(DotNetCore.CAP.Models.MessageType messageType, string messageName, string messageContent)
            {
                Console.WriteLine($"失败回调:messageType:{messageType};messageName:{messageName};
                    messageContent:{messageContent}");
            }
    

    失败回调返回的样本(此处做了格式化显示):

    // 失败回调:
    messageType:Subscribe;
    messageName:m.test;
    messageContent:
    {
        "Id": "5cfdf02ded40720ed4e98de9",
        "Timestamp": "2019-06-10T13:52:45.4107162+08:00",
        "Content": "2019-06-10 13:52:45",
        "CallbackName": null,
        "ExceptionMessage": {
            "Source": "DotNetCore.CAP",
            "Message": "我要扔出异常",
            "InnerMessage": "我要扔出异常"
        }
    }
    

    10)回调callbackName(此处的回调与失败回调不一样)

    这里的callbackName指的是PublishAsync/Publish推送方法中的callbackName参数,这个参数是个string ,实际上是一个topic

    注意:具有callbackName回调值的订阅方法必须有返回值,否则回调将会失败

    当服务端PublishAsync/Publish消息时,会将callbackName放入message的content字段中
    客户端的订阅方法(此订阅方法必须有返回值,否者content会为null,为null则会回调失败)消费成功后,
    客户端订阅方法将会把客户端订阅方法的返回值(设为A)PublishAsync/Publish一条topic为callbackName,content包含返回值A的消息到队列中,
    服务器端只需要在回调方法中订阅callbackName这个topic即可触发回调

    服务端示例:

        //服务端的生产者
        [HttpGet]
        public async Task<IActionResult> TestCallback()
        {
         await _capBus.PublishAsync("m.test", DateTime.Now, "FailCallBack");
         return Content("发起一个带 callbackName参数的消费");
        }
    
        //服务端处理来自客户端的订阅,即订阅回调topic: FailCallBack
        [CapSubscribe("FailCallBack", Group = "CallbackServer")]
        public void FailedCallback(string message)
        {
            Console.WriteLine($"接收到回调:{message}");
        }
    

    客户端示例:

        //客户端的订阅方法,此方法必须要有返回值,否则回调的content将会为null,如果content为null,
        //那么服务端的订阅将无法消费回调消息
        //服务端的【回调订阅方法】所接收到的参数值就是这个【客户端的订阅方法】的返回值
        [CapSubscribe("m.test",Group = "CallbackClient")]
        public DateTime SubscribeCallback(DateTime date)
        {
             Console.WriteLine($"已处理,请回调:{date.ToString("yyyy-MM-dd hh:mm:ss")}");
             return DateTime.Now.AddDays(10);
        }
    
    TestCallback()
    ----> PublishAsync("m.test", DateTime.Now, "FailCallBack") 【服务端推送】
    ----> var response=DateTime SubscribeCallback(DateTime date)     【客户端订阅】
    ----> 底层方法(PublishAsync("FailCallBack", response)             【客户端推送回调消息】
    ----> FailedCallback(string message)                             【服务端订阅回调消息】
    

    注意事项

    1),自动提交事务时,PublishAsync应放在最后面

    2),PublishAsync<T>(string name,T object, string callBackName)中的callBackName是一个回调,当失败重试超过重试限制次数(默认50次:FailedRetryCount)时,
    会触发此回调函数
    回调函数委托签名:FailCallBack(DotNetCore.CAP.Models.MessageType messageType,string messageName,string messageContent)

    3),框架无法做到100%确保消息只执行一次,所以在一些关键场景消息端在方法实现的过程中自己注意业务去重

    4),一个订阅方法可以订阅多个Topic,但多个方法订阅了相同的topic+group时,只会有一个订阅方法消费到同一条消息

    5),支持内存消息队列(2.5版本),需要引入DotNetCore.CAP.InMemoryStorage,并UseInMemoryStorage,此模式用于开发环境下没有Kafka或者RabbitMQ时,可以使用内存队列来模拟

    6),开启.AddDbContext<AppDbContext>()用于支持EF时,无需再配置UseSqlServer或者UseMySql

    7),支持Cap版本隔离(2.4版本),通过本地数据表的Version字段进行版本隔离

    8),Cap会自动创建"Published", "Received"两个本地数据库表

    9),如果多个微服务使用同一个数据库实例,可以通过指定Schema(SqlServer)或者TableNamePrefix(MySql)来隔离不同的微服务之间的本地消息记录

    10),StatusName为Failed会不断进行重试,直到达到重试上线

    11),Successed的消息会在根据该消息的ExpiresAt时间进行清理(默认24小时),每1小时执行一次清理任务;Failed失败的信息会在15天后过期并进行清理

    12),SqlServer2008版本的数据库需要在UseSqlServer()的配置方法中调用UseSqlServer2008(),因为Cap的UseDashboard在SqlServer2012+版本上使用了新的语法Format内置函数

    13),Cap中Kafka订阅者是IConsumer<Null, string>,如果你要使用.NET Framework向Cap的订阅者推消息,需要注意是<Null,string>

    14),回调函数,PublishAsync/Publish中的callbackName参数是一个回调,这是一个topic的值,你可以在服务端订阅这个topic用于处理客户端消费完信息后的回调,
    注意,如果callbackName不为空,那么这个客户端的订阅方法必须有返回值,返回值将传参回调回去,详细参考【回调函数】

    15),Cap文档

    16),Cap GitHub

    17),Cap作者博文

  • 相关阅读:
    【JS教程08】数组及操作方法
    【JS教程07】事件属性及匿名函数
    【JS教程06】操作元素
    【JS教程05】获取元素的方法
    【JS教程04】条件语句
    多线程环境下非安全Dictionary引起的“已添加了具有相同键的项”问题
    GPT分区基础知识及如何在GPT分区上安装WIN7
    Jenkins TFS配置
    windows查看端口占用命令
    VS2015企业版序列号
  • 原文地址:https://www.cnblogs.com/Nine4Cool/p/13727396.html
Copyright © 2011-2022 走看看