zoukankan      html  css  js  c++  java
  • 第六节:基于Cap框架引入事件总线机制(RabbitMQ+SQLServer) 和 下单成功后的方案剖析

    一. 事件总线机制

    1. 业务改造

     引入时间总线的概念,采用CAP框架进行业务处理,同时利用RabbitMQ代替Redis队列,采用SQLServer进行本地消息表的存储, 采用 推模式 发送消息,我们习惯称之为 发布-订阅 模式。

    关于基于CAP框架实现事件总线,详见:

     https://www.cnblogs.com/yaopengfei/p/13763500.html

     https://www.cnblogs.com/yaopengfei/p/13776361.html

    2. 分析

     (1) 多件商品参与秒杀,订阅者如何在一个接口中接收,如何处理这多个队列?

     答:这里思考的角度不对,因为队列中存的是下单信息,而不是商品信息,所有用户对秒杀下单信息都往这一个队列中存的,下单信息里肯定有商品信息,订阅者拿到商品信息,再进行不同的业务处理。

     PS: 当然,也可以定义多个订阅者接口。

     (2).消息总线 发消息 给订阅者,订阅者的并发怎么处理?

     答:CAP框架默认是单线程处理的,下一个消息的发送,需要等待上一个消息执行完才行,当然也可以多线程,但无法保证消费顺序且订阅者中可能会出现并发问题。

     (3).由于是单线程发送,所以执行速度会相对慢,不如项目消费者主动获取块级处理那种模式块。

     答:单条数据的处理肯定不如批量提交效率高,这是一个无法回避的缺陷,这里用CAP框架的优势在于基于本地消息表的异常处理很友好。

    核心代码:

    cap框架注册代码:

               services.AddCap(x =>
                {
                    //-----------------------------一.声明存储类型---------------------------------
                    //1. 使用SQLServer存储
                    //还需要配合上面EF上下文的注入 services.AddDbContext
                    x.UseEntityFramework<ESHOPContext>();   //EFCore配置
    
                    //-----------------------------二.声明消息队列类型---------------------------------//1.使用RabbitMq队列存储
                    x.UseRabbitMQ(rb =>
                    {
                        rb.HostName = "localhost";
                        rb.UserName = "guest";
                        rb.Password = "guest";
                        rb.Port = 5672;
                        rb.VirtualHost = "/";
                        //rb.QueueMessageExpires = 24 * 3600 * 10;  //队列中消息自动删除时间(默认10天)
                    });
    
                    //-----------------------------三.添加后台监控,用于人工干预---------------------------------
    
    
                    //-----------------------------四.通用配置---------------------------------
                    x.ConsumerThreadCount = 1; //消费者线程并行处理消息的线程数,提高消费速度,但这个值大于1时,将不能保证消息执行的顺序,且可能存在并发问题。
                });
    View Code

    发布订阅核心代码:

            /// <summary>
            ///09-引入事件总线RabbitMQ
            /// </summary>
            /// <param name="userId">用户编号</param>
            /// <param name="arcId">商品编号</param>
            /// <param name="totalPrice">订单总额</param>
            /// <param name="requestId">请求ID</param>
            /// <param name="goodNum">用户购买的商品数量</param>
            /// <returns></returns>
            public string POrder9([FromServices] ICapPublisher _capBus, string userId, string arcId, string totalPrice, string requestId = "125643", int goodNum = 1)
            {
                int tLimits = 100;    //限制请求数量
                int tSeconds = 1;     //限制秒数
                string limitKey = $"LimitRequest{arcId}";//受限商品ID
                int tGoodBuyLimits = 3;  //用户单个商品可以购买的数量
                string userBuyGoodLimitKey = $"userBuyGoodLimitKey-{userId}-{arcId}";  //用户单个商品的限制key
                string userRequestId = requestId;    //用户下单页面的请求ID
                string arcKey = $"{arcId}-sCount";   //该商品库存key
                try
                {
                    //调用lua脚本
                    //参数说明:
                    var result = RedisHelper.EvalSHA(_cache.Get<string>("SeckillLua1"), "ypf12345", tLimits, tSeconds, limitKey, goodNum, tGoodBuyLimits, userBuyGoodLimitKey, userRequestId, arcKey);
                    if (result.ToString() == "1")
                    {
                        //2. 将下单信息存到消息队列中
                        var orderNum = Guid.NewGuid().ToString("N");
                        _capBus.Publish("seckillGoods", $"{userId}-{arcId}-{totalPrice}-{orderNum}");
    
                        //3. 把部分订单信息返回给前端
                        return $"下单成功,订单信息为:userId={userId},arcId={arcId},orderNum={orderNum}";
                    }
                    else
                    {
                        //请求被禁止,或者是商品卖完了
                        throw new Exception($"没抢到");
                    }
                }
                catch (Exception ex)
                {
                    //lua回滚
                    RedisHelper.EvalSHA(_cache.Get<string>("SeckillLuaCallback1"), "ypf12345", limitKey, userBuyGoodLimitKey, userRequestId, arcKey, goodNum);
                    throw new Exception(ex.Message);
                }
            }
    
    
            /// <summary>
            /// 订阅者的方法
            /// //可以改为调用SQL语句,速率会有提升
            /// </summary>
            /// <param name="time"></param>
            [NonAction]
            [CapSubscribe("seckillGoods")]
            public void CreateOrder(string orderInfor)
            {
                try
                {
                    Console.WriteLine("下面开始执行订阅业务");
                    //1.扣减库存
                    var sArctile = _baseService.Entities<T_SeckillArticle>().Where(u => u.id == "300001").FirstOrDefault();
                    sArctile.articleStockNum = sArctile.articleStockNum - 1;
                    //2. 插入订单信息
                    List<string> tempData = orderInfor.Split('-').ToList();
                    T_Order tOrder = new T_Order();
                    tOrder.id = Guid.NewGuid().ToString("N");
                    tOrder.userId = tempData[0];
                    tOrder.orderNum = tempData[3];
                    tOrder.articleId = tempData[1];
                    tOrder.orderTotalPrice = Convert.ToDecimal(tempData[2]);
                    tOrder.addTime = DateTime.Now;
                    tOrder.orderStatus = 0;
                    _baseService.AddNo<T_Order>(tOrder);
    
                    int count = _baseService.SaveChange();
                    Console.WriteLine($"执行成功,条数为:{count}");
    
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"订阅业务执行失败:{ex.Message}");
                }
            }
    View Code

    lua核心脚本 1

    --[[本脚本主要整合:单品限流、购买的商品数量限制、方法幂等、扣减库存的业务]]
    
    --[[
        一. 方法声明
    ]]--
    
    --[[
    --1. 单品限流--存在缓存覆盖问题
    local function seckillLimit()
    --(1).获取相关参数
    -- 限制请求数量
    local tLimits=tonumber(ARGV[1]);
    -- 限制秒数
    local tSeconds =tonumber(ARGV[2]);
    -- 受限商品key
    local limitKey = ARGV[3];
    --(2).执行判断业务
    local myLimitCount = redis.call('INCR',limitKey);
    if myLimitCount > tLimits 
    then
    return 0;  --失败
    else
    redis.call('expire',limitKey,tSeconds)
    return 1;  --成功
    end;    --对应的是if的结束
    end;    --对应的是整个代码块的结束
    ]]--
    
    --1. 单品限流--解决缓存覆盖问题
    local function seckillLimit()
    --(1).获取相关参数
    -- 限制请求数量
    local tLimits=tonumber(ARGV[1]);
    -- 限制秒数
    local tSeconds =tonumber(ARGV[2]);
    -- 受限商品key
    local limitKey = ARGV[3];
    --(2).执行判断业务
    local myLimitCount = redis.call('INCR',limitKey);
    
    -- 仅当第一个请求进来设置过期时间
    if (myLimitCount ==1) 
    then
    redis.call('expire',limitKey,tSeconds) --设置缓存过期
    end;   --对应的是if的结束
    
    -- 超过限制数量,返回失败
    if (myLimitCount > tLimits) 
    then
    return 0;  --失败
    end;   --对应的是if的结束
    
    end;   --对应的是整个代码块的结束
    
    
    
    --2. 限制一个用户商品购买数量(这里假设一次购买一件,后续改造)
    local function userBuyLimit()
    --(1).获取相关参数
    local tGoodBuyLimits = tonumber(ARGV[5]); 
    local userBuyGoodLimitKey = ARGV[6]; 
    
    --(2).执行判断业务
    local myLimitCount = redis.call('INCR',userBuyGoodLimitKey);
    if (myLimitCount > tGoodBuyLimits)
    then
    return 0;  --失败
    else
    redis.call('expire',userBuyGoodLimitKey,600)  --10min过期
    return 1;  --成功
    end;
    end;    --对应的是整个代码块的结束
    
    
    
    
    --3. 方法幂等(防止网络延迟多次下单)
    local function recordOrderSn()
    --(1).获取相关参数
    local requestId = ARGV[7];    --请求ID
    --(2).执行判断业务
    local requestIdNum = redis.call('INCR',requestId);
    --表示第一次请求
    if (requestIdNum==1)                            
    then
    redis.call('expire',requestId,600)  --10min过期
    return 1; --成功
    end;
    --第二次及第二次以后的请求
    if (requestIdNum>1)
    then
    return 0;  --失败
    end;
    end;  --对应的是整个代码块的结束
    
    
    
    
    --4、扣减库存
    local function subtractSeckillStock()
    --(1) 获取相关参数
    --local key =KEYS[1];   --传过来的是ypf12345没有什么用处
    --local arg1 = tonumber(ARGV[1]);--购买的商品数量
    -- (2).扣减库存
    -- local lastNum = redis.call('DECR',"sCount");
    local lastNum = redis.call('DECRBY',ARGV[8],tonumber(ARGV[4]));  --string类型的自减
    -- (3).判断库存是否完成
    if lastNum < 0 
    then
    return 0; --失败
    else
    return 1; --成功
    end
    end
    
    
    
    --[[
        二. 方法调用   返回值1代表成功,返回:0,2,3,4 代表不同类型的失败
    ]]--
    
    --[[
    --1. 单品限流调用
    local status1 = seckillLimit();
    if status1 == 0 then
    return 2;   --失败
    end
    ]]--
    
    --[[
    --2. 限制购买数量
    local status2 = userBuyLimit();
    if status2 == 0 then
    return 3;   --失败
    end
    ]]--
    
    --3.  方法幂等
    --[[
    local status3 = recordOrderSn();
    if status3 == 0 then
    return 4;   --失败
    end
    ]]--
    
    
    --4.扣减秒杀库存
    local status4 = subtractSeckillStock();
    if status4 == 0 then
    return 0;   --失败
    end
    return 1;    --成功
    View Code

    lua回滚脚本

    --[[本脚本主要整合:单品限流、购买的商品数量限制、方法幂等、扣减库存的业务的回滚操作]]
    
    --[[
        一. 方法声明
    ]]--
    
    --1.单品限流恢复
    local function RecoverSeckillLimit()
    local limitKey = ARGV[1];-- 受限商品key
    redis.call('INCR',limitKey);
    end;
    
    --2.恢复用户购买数量
    local function RecoverUserBuyNum()
    local userBuyGoodLimitKey =  ARGV[2]; 
    local goodNum = tonumber(ARGV[5]); --商品数量
    redis.call("DECRBY",userBuyGoodLimitKey,goodNum);
    end
    
    --3.删除方法幂等存储的记录
    local function DelRequestId()
    local userRequestId = ARGV[3];  --请求ID
    redis.call('DEL',userRequestId);
    end;
    
    --4. 恢复订单原库存
    local function RecoverOrderStock()
    local stockKey = ARGV[4];  --库存中的key
    local goodNum = tonumber(ARGV[5]); --商品数量
    redis.call("INCRBY",stockKey,goodNum);
    end;
    
    --[[
        二. 方法调用
    ]]--
    RecoverSeckillLimit();
    RecoverUserBuyNum();
    DelRequestId();
    RecoverOrderStock();
    View Code

    3.各种异常处理

    (1). 发布者发送消息失败

     A. CAP框架有本地消息存储发送的消息,会自动重试.

     B. 如果还没有存储到本地消息表中失败了怎么办?→要回滚之前的业务,写一个Lua回滚的脚本)

    (2).RabbitMq宕机

     本地消息表机制

    (3).订阅者异常

     本地消息表机制+人工干预

    二. 下单成功后的处理方案

    1.技术背景

     在最终的下单接口模式下,当业务出错或被限制的时候,会直接返回给前端,比如‘抢单失败等’,但是当下单请求存放到队列中后也是直接返回给前端, 此时我们并不知道后续创建订单等操作是否成功。

     面对这种情况,,应该怎么处理? 前端又作何提示呢?

    2. 常用方案剖析

    方案1:等待推送

     (1).原理:前端页面提示“请求已提交,请耐心等待”,等服务端消费成功后,主动推送给客户端,客户端接受到推送成功请求后,自动跳转付款页面进行付款.

     (2).分析:

      A.该方案受客户端网络等影响或者用户退出应用了,很容易推送失败, 而且还需要引入额外的机制进行离线处理.

      B.高并发下推送也会出现丢失现象.

      C.需要引入新的实时通讯技术,增加了技术成本

    方案2:主动刷新

     (1).原理:前端页面提示“请求已提交,请稍后刷新查看结果”,需要用户主动刷新,或者可以主动轮训获取结果。

     (2).分析:

      A.秒杀期间并发已经很高了,又要增加刷新请求,增加硬件资源成本。

      B.体验不是很友好,还需要用户主动刷新一下,看似加了一步操作,有的人可能不知道怎么操作导致直接不付钱了

      C.实际上该方案也经常被运用,在一定场景下,该方案不失为一种办法

    方案3:直接付款下单(推荐!!)

     (1).思考

      用户既然已经提交成功了,说明该用户已经抢到购买资格了,只要正常付款就可以, 但是由于商家技术层面的问题,导致该用户的后续业务失败,凭啥让用户来买单呢??

      所以很多商城都认为:提交成功了,就是成功了!!  可以直接付款即可.

     (2).原理

      A.提交到队列前,相关订单信息我们实际上早已组装好(比如订单号、总金额、用户id、商品id等等),所以提交队列成功后,我们完全可以拿着这些信息直接去付款,也就是在付款表中插入记录。

      B.另外我们还需要一个定时器:用于轮训支付记录表和订单表,当支付记录表中有记录显示付款成功,但是由于一些原因导致订单创建失败,这个时候需要根据支付记录表中的数据,向订单表插入数据来创建订单信息.(如果定时器也失败了,就人工干预)

     (3).分析

      A.首先业务我们已经很严谨了,采用CAP框架后有很多异常处理方案,也就是说插入队列成功,但订阅业务失败的这种情况极为少见,就会有重试机制,也会有人工手动干预处理的。

      B.上面我们还增加了一个定时器,就算真的失败了没有创建订单成功,定时器也会进行同步数据的。

      C.由于是高并发,数据可能非常多,轮训同步订单数据 和 订阅者创建订单数据 不一定谁先进行,所以都要先判断一下订单表中是否有数据,再进行操作。

    部分代码分享(简版):

    支付接口和支付回调接口:

     #region 01-创建支付接口
            /// <summary>
            /// 创建支付接口
            /// </summary>
            /// <param name="userId">用户编号</param>
            /// <param name="orderNum">订单号</param>
            /// <param name="arcId">商品编号</param>
            /// <param name="totalPrice">订单总额</param>
            /// <returns></returns>
            public string CreatePayInfor(string userId, string orderNum, string arcId, string totalPrice)
            {
                //1.组装数据请求向第三方支付平台下单
    
    
                //2.三方支付平台下单成功
                //说明:会返回一个链接,用户跳转唤起支付应用(微信/支付宝)
    
                //2.1 创建预订单信息 (这里错了!!!,应该创建支付记录,预定单是在队列中创建啊!!!)
                T_Order tOrder = new T_Order();
                tOrder.id = Guid.NewGuid().ToString("N");
                tOrder.userId = userId;
                tOrder.orderNum = orderNum;
                tOrder.articleId = arcId;
                tOrder.orderTotalPrice = Convert.ToDecimal(totalPrice);
                tOrder.addTime = DateTime.Now;
                tOrder.orderStatus = 0;
                _baseService.Add(tOrder);
                _baseService.SaveChange();
    
                //2.2 跳转支付成功的链接进行支付
                return "";
            }
            #endregion
    
            #region 02-支付成功回调接口
            /// <summary>
            /// 支付成功回调接口
            /// 仅为了演示,实际解析出来的参数很多
            /// </summary>
            /// <param name="orderNum">订单号</param>
            /// <returns></returns>
            public string PaySuccessCallBack(string orderNum)
            {
                //1. 参数加密规则校验--用户判断是否是支付平台发过来的
    
    
                //2. 解析参数,再次请求支付平台,看是否有这条信息(很多情况下这一步是省略的)
    
    
                //3. 将自己系统的订单信息改为已支付
                //3.1 这里要有一层判断逻辑,假设一个订单支付平台回调了两次,我们系统是只执行一次的
                var data = _baseService.Entities<T_Order>().Where(u => u.orderNum == orderNum).FirstOrDefault();
                if (data!=null)
                {
                    data.payTime = DateTime.Now;
                    data.orderStatus = 1;
                }
                _baseService.SaveChange();
    
    
                //4. 返回一个标记,否则支付平台会多次回调
                return "success";
    
            } 
            #endregion
    View Code

    轮训同步数据:

     业务相对简单,控制好间隔和查询数量即可。  

     

    !

    • 作       者 : Yaopengfei(姚鹏飞)
    • 博客地址 : http://www.cnblogs.com/yaopengfei/
    • 声     明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
    • 声     明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。
     
  • 相关阅读:
    str_split — 将字符串转换为数组
    str_replace — 子字符串替换
    str_pad — 使用另一个字符串填充字符串为指定长度
    parse_str — 将字符串解析成多个变量
    number_format — 以千位分隔符方式格式化一个数字
    企业邮箱的优势有哪些?使用企业邮箱的好处
    TOM VIP邮箱,化繁为简,在微信里收发邮件
    邮箱办公神操作,让办公更自在,沟通无边界!
    常用的企业邮箱有哪些?企业邮箱有哪几种?
    外贸企业邮箱批发,收费企业邮箱与免费企业邮箱区别
  • 原文地址:https://www.cnblogs.com/yaopengfei/p/13836909.html
Copyright © 2011-2022 走看看