zoukankan      html  css  js  c++  java
  • 分布式事务最终一致性-CAP框架轻松搞定

    前言

    对于分布式事务,常用的解决方案根据一致性的程度可以进行如下划分:

    • 强一致性(2PC、3PC):数据库层面的实现,通过锁定资源,牺牲可用性,保证数据的强一致性,效率相对比较低。
    • 弱一致性(TCC):业务层面的实现,通过预留或锁定部分资源,最后通过确认或取消操作完成事务的处理。比如A向B转款500元,A账号会冻结500元,其他操作正常,B接收转款时,也不能直接入账,而是将500元放到预留空间,只有经过确认之后,A才正式扣钱,B才正式入账; 如果取消把A的500块解冻,B也不会入账。
    • 最终一致性(本地消息表):不管经过多少个服务节点,最终数据一致就行。比如下单成功之后,需要库存服务扣减库存,如果库存扣减失败,不管是重试,还是最后人工处理,最后确保订单和库存数据能对上就行;为保证用户体验,及时通过中间状态的形式反馈给用户,比如常见的出票中、数据处理中等。

    对于强一致性和弱一致性的解决方案一般针对数据一致性和时效性要求特别高的业务场景,通常会牺牲暂时的可用性来满足一致性的要求;由于为保证一致性,会锁定资源,在高并发的业务场景不是最佳选择,所以很多系统在业务需求允许的情况下,基本上都会采用最终一致性方案。

    正文

    1.1 最终一致性简述

    顾名思义就是保证数据最后的一致性就行了。如果中间节点发生失败,系统为了减少代价,一般不会自动回滚,而是通过重试机制和人工参与的方式对失败数据进行处理,从而保证系统高并发场景下高可用数据一致性需求。

    1.2 解决方案

    目前用得最多的方案是结合本地消息表进行实现,再加上后台任务、消息队列中间件就可以更好的实现分布式事务的处理。

    解决方案流程

    本地消息表:就是在对应业务数据库中增加的一张消息表;这张表存储业务产生的消息,通过本地事务保证业务数据和消息数据的一致性。在消息表中通过一个状态来标识业务是否执行成功,如果失败,后台任务就进行重试。

    1.2.2 CAP框架简介

    CAP 是一个EventBus(事件总线),同时也是一个在微服务或者SOA系统中解决分布式事务问题的一个框架,基于CAP理论思想进行封装的。采用模块化设计,具有高度的可扩展性,可靠并且易于更改。

    对于分布式事务的处理,CAP 框架采用的是“异步确保”这种方案,即本地消息表。官方支持的数据存储方式有SQL Server、MySQL、PostgreSql、MongoDB、In-Memory(内存),由于是开源项目,社区大佬也提供了其他数据存储支持,如:Oracle、SQLite、SmartSql等。

    在分布式系统,各节点需要进行消息传输,CAP框架提供以下几种方式RabbitMQ、Kafka、Redis Streams(Redis 5.0支持)、Azure Service Bus、Amazon SQS、In-Memory Queue,使用方式都差不多。

    CAP的架构图如下:

    架构图

    上图简要说明:

    • 有两个微服务,服务A和服务B;
    • 服务A中通过本地事务的方式,将事件消息和业务逻辑进行事务保存(事件消息保存在本地消息表中),保证业务逻辑和消息的一致性和可靠性;关于消息的处理和保存CAP已经封装在内部;
    • CAP内部定时调度任务将消息发布到消息队列中;
    • 服务B订阅到消息,将其保存到服务B的本地消息表中,CAP已经封装好,只需按照说明使用即可;
    • 如果业务处理失败,服务B中集成的CAP会根据配置的定时任务策略进行重试,直到处理成功为止;

    主要的理论就说那么多,更多详细内容,请进下方传送门:

    接下来就到撸码时刻,CAP由于封装比较好,所以使用起来比较简单。

    1.3 撸码实践

    以下的业务场景是为了案例演示,目的是体现CAP的实践,所以业务逻辑都只是模拟,切勿当真。

    1.3.1 环境准备

    演示中要用到RabbitMQ,为了安装方便,这里使用Docker的方式,直接通过镜像运行,简单,快速方便。关于Docker的实践,后续会专门出系列文章。这里就先总结一下Docker的安装和RabbitMQ在Docker中的运行步骤,采用的主机环境是我之前买的阿里云服务器(CentOS 7);演示用的数据库是SqlServer。

    • Docker安装

      1、移除移动旧版本

      sudo yum remove docker 
                        docker-client 
                        docker-client-latest 
                        docker-common 
                        docker-latest 
                        docker-latest-logrotate 
                        docker-logrotate 
                        docker-engine
      

      2、安装需要的依赖包

      sudo yum install -y yum-utils
      

      3、设置镜像仓库

      sudo yum-config-manager 
          --add-repo 
          https://download.docker.com/linux/centos/docker-ce.repo
      

      4、更新Yum软件包索引

      sudo yum makecache fast # 提高安装速度
      

      5、开始安装Docker

      sudo yum install docker-ce docker-ce-cli containerd.io
      

      6、启动Docker

      sudo systemctl start docker
      

      7、测试Docker

      sudo docker run hello-world # 运行Hello-world
      

      安装成功

    • RabbitMQ在Docker中安装和运行

      1、一行命令直接指定镜像运行,如果本地找不到镜像,会去远程仓储里去找。

      docker run -d --hostname my-rabbit --name cap-rabbit -p 8888:15672 -p 5672:5672 -p 5671:5671 -p 1883:1883 rabbitmq:3-management
      

      这里先不细说命令了,后续聊Docker的时候好好说说。命令需要注意的是主机端口和容器端口的映射。

      2、运行成功后就可以访问啦,默认用户名和密码:guest/guest;

      这里访问的地址端口是8888,那是在启动容器的时候将主机端口8888和容器端口15672进行了映射。

    这就是选择Dokcer安装的原因,超级快;如果用传统的方式,还得安装语言环境,还得配置,最后才能安装;Docker通过镜像的方式直接运行即可。

    如果小伙伴新增用户之后不能访问,或者程序连接报错,可以排查是否有权限访问,如下:

    注:如果小伙伴用的是云服务器,需要配置安全组,允许端口访问;另外如果程序和RabbitMq所在的主机不是同一台机器,主机防火墙也需要放开对应的端口。

    1.3.2 开始撸码
    • 项目准备

      这里模拟两个服务,一个是订单服务,一个是库存服务,两都用到EF(Code First),如果小伙伴对EF入门还不熟,<<跟我一起学.NetCore之EF Core 实战入门,一看就会>>这篇文章超详细,肯定能帮到你; 所以接下来就上几张关键的图就行啦。

      项目结构:

      OrderDbContext:

      Startup中注册服务:

      库存服务的代码和这个类似。

      通过迁移并更新到数据库时,会生成如下数据库和表:

    • 集成CAP

      这里因为用的是RabbitMQ、SqlServer,所以需要引入以下几个包;如果用其他消息队列或数据库,可以引入对应的包。

      因为订单服务是在Respository层使用CAP,所以对应的包就在这层引用;

      库存服务是直接在Controller那层引用,这里就不重复截图啦。

      订单服务和库存服务都是在各自项目的Startup文件中注册CAP相关服务,并配置相关信息,如下图:

      集成完毕之后,启动项目(不需要手动自己迁移),在各自业务数据库中就自动生成两个消息表,用于后续消息的存储,如下:

    • 编写业务代码

      订单服务,在订单生成成功之后,向库存服务发送消息,业务逻辑如下:

      图中用到的_capPublisher是通过构造函数注入的。订单服务其他层的代码就不用截图了,就是简单调用,源码地址在文末。

      库存服务直接订阅就行,演示案例中是直接在StockController中进行订阅,如下:

      // 标记为不实Action
      [NonAction]
      // 订阅消息,参数和发布时指定的参数一致
      [CapSubscribe("Order.Create.Success")]
      public void UpdateStock(OrderEntity order)
      {
          //throw new Exception("扣减库存异常了~~~");
          // 为了测试,库存里面没有数据的话,先模拟一条数据
          bool bHaveData = _stockDbContext.Stock.Any();
          if(!bHaveData)
          {
              StockEntity stock = new StockEntity
              {
                  Id = Guid.NewGuid(),
                  ProductNo = "Product001",
                  StockCount = 100,
                  UpdateDate = DateTime.Now
              };
              _stockDbContext.Stock.Add(stock);
              _stockDbContext.SaveChanges();
          }
          // 模拟扣减库存
          using var trans = _stockDbContext.Database.BeginTransaction(_capPublisher, autoCommit: false);
          try
          {
              // 根据产品编号找到产品
              var product = _stockDbContext.Stock.Where(s => s.ProductNo == order.ProductNo).FirstOrDefault();
              // 扣减库存之后保存
              product.StockCount = product.StockCount - order.Count;
              _stockDbContext.Update(product);
              _stockDbContext.SaveChanges();
              // 可以继续向下发布流程,比如库存扣减成功,下一步到物流服务进行相关处理,可以继续发布消息
              // _capPublisher.Publish();
              trans.Commit();
              Console.WriteLine(order.OrderNo);
          }
          catch (Exception ex)
          {
              trans.Rollback();
          }
      }
      

      可以看到,订阅很简单,直接标上[CapSubscribe("Order.Create.Success")]这个Attribute就行了,如果消息状态为失败,后续CAP的定时任务会根据定时策略调用此方法。

    1.3.3 运行看效果
    • 正常流程,下单成功,扣减库存成功

      将订单服务(端口5000)和库存服务(端口6000)都启动起来。

      订单服务中增加了OrderController,里面有一个GenerateOrder的接口,直接调用即可:

      这里使用Postman工具进行测试,如下:

      库存服务就会订阅到信息,如下:

      业务流程完成之后,订单和库存数据整体一致了,回过头来看看消息表,看看里面有什么消息,如下:

    • 异常流程模拟,下单成功,扣减库存失败

      在扣减服务逻辑方法中手动抛出异常,代码如下:

      然后启动项目重新测试,再下一个订单试试; 操作后,先来看看消息表,如下:

      注:CAP在默认情况下,发送和消费消息的过程中失败会立即重试 3 次,在 3 次以后将进入重试轮询;重试将在发送和消费消息失败的 4分钟后 开始,这是为了避免设置消息状态延迟导致可能出现的问题;后续就会每隔1分钟之后重试一次,默认的最高重试次数为50次,当达到50次时,就不会重试了。

      现在知道问题了,优化代码,重新启动,即把抛异常的代码注释掉,看看会不会自动处理,如下:

      如上图,稍等一会,消息就自动处理了,业务数据符合预期,保证一致性。 这个是CAP内部定时读取消息表,根据状态不断重试业务逻辑,直到成功为止。 CAP的全自动是不是感觉比较便捷,写最少的代码,解决了最难搞的分布式事务。

    • 修改默认的配置

      在实际业务场景中,默认配置可能不太实用,可以在注册服务时进行默认配置更改,如下:

      配置修改之后的测试这里就不截图了,留给小伙伴们动手试试吧。

    案例代码地址:https://gitee.com/CodeZoe/microservies-demo/tree/main/CapDemo

    总结

    关于分布式事务的实操,把最常用的最终一致性方案简单分享了一下,小伙伴可以根据自己的业务场景,赶紧动手试试吧;

    其他方案会在后续的文章中加上,主要还是以实用为主,已经不咋用的就没必要再说啦。

    文章中提及到Docker和RabbitMQ,我已经在着手准备这块的文章了,关注“Code综艺圈”,和我一起学习吧;

    图片

  • 相关阅读:
    JDBC事务
    PreparedStatement预编译对象实现
    读取properties配置文件
    eclipse 快捷键总结
    JDBC编程六部曲
    JDBC 配置环境
    基于注解的DI(DI:Dependency Injection 依赖注入)
    基于XML的DI
    汇编call jmp理解
    常用jar包下载地址
  • 原文地址:https://www.cnblogs.com/zoe-zyq/p/15117129.html
Copyright © 2011-2022 走看看