zoukankan      html  css  js  c++  java
  • Dotnet微服务:使用cap实现分布式服务的数据一致性

    DotNetCore.CAP是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,具有轻量级,高性能,易使用等特点。开源地址

    Cap(Consistency(一致性)、Availability(可用性)、Partition tolerance(分区容错性))是分布式系统中的一个重要理念,根据CAP定理,存在网络分区(微服务即时网络分区架构)时,Web应用不可能同时满足可用性和一致性,DotNetCore.CAP使用“异步确保”方案,利用消息队列和本地消息列表实现最终数据一致性。异步确保模式是补偿模式的一个典型案例,通过异步的方式进行处理,处理后把结果通过通知系统通知给使用方。

    一,准备内容:

    运输器:过运输将数据从一个地方移动到另一个地方-在采集程序和管道之间,管道与实体数据库之间,甚至在管道与外部系统之间。DotNetCore.CAP 支持以下几种运输方式:RabbitMQ,Kafka,Azure Service bus,Amazon SQS,In-memory queue。我使用的是RabbitMq。

    二,DotnetCore webapi项目集成DotNetCore.CAP

     public void ConfigureServices(IServiceCollection services)
            {
                //获取数据库连接字符串
                var connection = Configuration.GetConnectionString("MySql");
                //Cap
                services.AddCap(conf => {
                    //配置数据库上下文
                    conf.UseEntityFramework<Entity.MicoDatacontext>();
                    //配置数据库连接
                    conf.UseMySql(connection);
                    //使用RabbitMQ运输器
                    conf.UseRabbitMQ(rab => {
                        rab.HostName = "192.168.137.2";
                        rab.Password = "xxxxxx";
                        rab.Port = 5672;
                        rab.UserName = "xxxx";
                    });
                });
                services.AddDbContext<Entity.MicoDatacontext>(options =>
                {
                    options.UseMySql(connection);
                });
                services.AddControllers();
            }

    三,在Contoller中使用CAP发送消息

    比如下面是一个模拟创建订单的接口,创建完订单后发送一个主题为“order.create”的消息到消息总线。这个接口开启了一个需要手动提交的本地事务,插入本地消息和处理业务逻辑都在这个事务内,确保业务完成的同时消息能发送出去。

     readonly Entity.MicoDatacontext dbcontext;
            readonly ICapPublisher capPublisher;
            public TestController(Entity.MicoDatacontext dbcontext, ICapPublisher capPublisher) {
                this.dbcontext = dbcontext;
                this.capPublisher = capPublisher;
            }
            [HttpGet("CreateOrder")]
            public async Task<bool> CreateOrderAsync(string name,string goodsSid)
            {
                Order order = new Order()
                {
                    Sid = Guid.NewGuid().ToString(),
                    CreateTime = DateTime.Now,
                    GoodsSid = goodsSid,
                    Name = name,
                    Status = 0,
                    UpdateTime = DateTime.Now
                };
                //开启一个需要手动提交的本地事务,插入本地消息和处理业务逻辑都在这个事务内。
                using (var tran = dbcontext.Database.BeginTransaction(capPublisher, false))
                {
                    dbcontext.Order.Add(order);
                    await capPublisher.PublishAsync("order.create", new OrderCreate() { GoodsSid = goodsSid, OrderName = name });
                    dbcontext.SaveChanges();
                    tran.Commit();
                    return true;
                };
            } 

    四,在Contoller中使用CAP接收消息 

     下面是模拟仓库服务在接收到订单创建消息后减库存的操作。如果仓库服务没有成功消费这条消息,DotnetCore.CAP将会启用重发机制。

    [Route("api/[controller]")]
        [ApiController]
        public class TestController : ControllerBase
        {
            readonly Entity.MicoDatacontext dbcontext;
            readonly ICapPublisher capPublisher;
            public TestController(ICapPublisher capPublisher, Entity.MicoDatacontext dbcontext)
            {
                this.dbcontext = dbcontext;
                this.capPublisher = capPublisher;
            }
            [NonAction]
            [CapSubscribe("order.create")]
            public Task OrderCreate(Common.Publish.OrderCreate order)
            {
                var storeage = dbcontext.Storeage.Where(r => r.Sid.Equals(order.GoodsSid)).SingleOrDefault();
                if (storeage != null) throw new Exception("库存不足");
                storeage.Count -= 1;
                storeage.UpdateTime = DateTime.Now;
                dbcontext.Storeage.Update(storeage);
                dbcontext.SaveChanges();
                return Task.CompletedTask;
            }
        }
    

    五,消息重试配置

    1、 发送重试

    在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,第一次重试次数为 3,4分钟后以后每分钟重试一次,进行次数 +1,当总次数达到50次后,CAP将不对其进行重试。

    你可以在 CapOptions 中设置FailedRetryCount来调整默认重试的总次数。

    当失败总次数达到默认失败总次数后,就不会进行重试了,你可以在 Dashboard 中查看消息失败的原因,然后进行人工重试处理。

    2、 消费重试

    当 Consumer 接收到消息时,会执行消费者方法,在执行消费者方法出现异常时,会进行重试。这个重试策略和上面的 发送重试 是相同的

        //获取数据库连接字符串
                var connection = Configuration.GetConnectionString("MySql");
                //Cap
                services.AddCap(conf => {
                    //配置数据库上下文
                    conf.UseEntityFramework<Entity.MicoDatacontext>();
                    //配置数据库连接
                    conf.UseMySql(connection);
                    //使用RabbitMQ运输器
                    conf.UseRabbitMQ(rab => {
                        rab.HostName = "192.168.137.2";
                        rab.Password = "xxxx114";
                        rab.Port = 5672;
                        rab.UserName = "xxxx";
                    });
                    //消息重试的最大次数
                    conf.FailedRetryCount = 50;
                    //消息重试间隔时间,4min后该值设置生效(默认快速重试3次)
                    conf.FailedRetryInterval = 60;
                    //发送成功的消息的过期时间(过期则删除)
                    conf.SucceedMessageExpiredAfter = 24 * 3600;
                    //发送消息失败后的回调
                    conf.FailedThresholdCallback = (context) =>
                    {
                        //通知管理人员或其它逻辑
                    };
    

     六,事务补偿

    某些情况下,消费者需要返回值以告诉发布者执行结果,以便于发布者实施一些动作,通常情况下这属于补偿范围。可以在消费者执行的代码中通过重新发布一个新消息来通知上游,CAP 提供了一种简单的方式来做到这一点。 你可以在发送的时候指定 callbackName 来得到消费者的执行结果。

    比如上面的示例,仓库消费后需要告诉订单服务处理结果。

    订单服务:处理创建订单业务后发送一条带有补偿回调的消息并通过CapSubscribe接收该回调消息,处理订单状态

    [Route("api/[controller]")]
        [ApiController]
        public class TestController : ControllerBase
        {
            readonly Entity.MicoDatacontext dbcontext;
            readonly ICapPublisher capPublisher;
            public TestController(Entity.MicoDatacontext dbcontext, ICapPublisher capPublisher) {
                this.dbcontext = dbcontext;
                this.capPublisher = capPublisher;
            }
            [HttpGet("CreateOrder")]
            public async Task<bool> CreateOrderAsync(string name,string goodsSid)
            {
                Order order = new Order()
                {
                    Sid = Guid.NewGuid().ToString(),
                    CreateTime = DateTime.Now,
                    GoodsSid = goodsSid,
                    Name = name,
                    Status = 0,
                    UpdateTime = DateTime.Now
                };
                //开启一个需要手动提交的本地事务,插入本地消息和处理业务逻辑都在这个事务内。
                using (var tran = dbcontext.Database.BeginTransaction(capPublisher, false))
                {
                    dbcontext.Order.Add(order);
                    //发送一个带有补偿回调的消息
                    await capPublisher.PublishAsync("order.create", new OrderCreate() { GoodsSid = goodsSid, OrderName = name },"Soreage.reduced");
                    dbcontext.SaveChanges();
                    tran.Commit();
                    return true;
                };
            }
            /// <summary>
            /// 补偿消息处理
            /// </summary>
            /// <param name="order"></param>
            /// <returns></returns>
            [NonAction]
            [CapSubscribe("Soreage.reduced")]
            public Task OrderCreate(Common.Publish.StoreageReduced msg)
            {
                var order = this.dbcontext.Order.Find(msg.OrderSid);
                if (order != null)
                {
                    order.Status = 1;
                    dbcontext.SaveChanges();
                }
                return Task.CompletedTask;
            }
        }
    

      仓库服务:接收order.create主题消息并返回正确的返回值

     [NonAction]
            [CapSubscribe("order.create")]
            public Common.Publish.StoreageReduced OrderCreate(Common.Publish.OrderCreate order)
            {
                var storeage = dbcontext.Storeage.Where(r => r.Sid.Equals(order.GoodsSid)).SingleOrDefault();
                if (storeage != null) throw new Exception("库存不足");
                storeage.Count -= 1;
                storeage.UpdateTime = DateTime.Now;
                dbcontext.Storeage.Update(storeage);
                dbcontext.SaveChanges();
                return new Common.Publish.StoreageReduced() {OrderSid=order.OrderSid,IsSuccess=true };
            }
    

    ,并发冲突处理

    使用EF的RowVersion做乐观锁解决并发冲突的问题。

    Do实体添加Timestamp列

            [Timestamp]
            public byte[] Timespan { get; set; }
    

    DbContext类可以做如下配置

    protected override void OnModelCreating(ModelBuilder modelBuilder)
            {
                modelBuilder.Entity<Model.Order>().Property(r => r.Timespan).IsRowVersion();
                base.OnModelCreating(modelBuilder);
            }
    

      

  • 相关阅读:
    mina之小小总结(标准的菜鸟级别,行家勿入)
    tomcat(就一句话,自己的日记性质)
    MINA转自itoyo
    java web(没含量的,百科上的)
    解决VS.NET 2008中aspx文件没有设计界面
    正则表达式教程
    [VB] Option Explicit
    sql自定義函數 包含遊標
    sql 中 null+others=?
    实用手机号、IP、身份证号、歌曲查询接口
  • 原文地址:https://www.cnblogs.com/liujiabing/p/13846609.html
Copyright © 2011-2022 走看看