zoukankan      html  css  js  c++  java
  • 消息队列处理高并发案例

    asp.net c# 通过消息队列处理高并发请求(以抢小米手机为例)

     

      网站面对高并发的情况下,除了增加硬件, 优化程序提高以响应速度外,还可以通过并行改串行的思路来解决。这种思想常见的实践方式就是数据库锁和消息队列的方式。这种方式的缺点是需要排队,响应速度慢,优点是节省成本。

    演示一下现象

    创建一个在售产品表

    复制代码
    CREATE TABLE [dbo].[product](
        [id] [int] NOT NULL,--唯一主键
        [name] [nvarchar](50) NULL,--产品名称
        [status] [int] NULL ,--0未售出  1 售出  默认为0
        [username] [nvarchar](50) NULL--下单用户
     )
    复制代码

    添加一条记录

    insert into product(id,name,status,username) values(1,'小米手机',0,null)

    创建一个抢票程序

    复制代码
    public ContentResult PlaceOrder(string userName)
            {
                using (RuanMou2020Entities db = new RuanMou2020Entities())
                {
                        var product = db.product.Where<product>(p => p.status== 0).FirstOrDefault();
                        if (product.status == 1)
                        {
                            return Content("失败,产品已经被卖光");
                        }
                        else
                        {
                            //模拟数据库慢造成并发问题
                            Thread.Sleep(5000);
                            product.status = 1;
    product.username= userName;
                  db.SaveChanges();
                  return Content("成功购买");
                 }
          }
        }
    复制代码

    如果我们在5秒内一次访问以下两个地址,那么返回的结果都是成功购买且数据表中的username是lisi。

    /controller/PlaceOrder?username=zhangsan

    /controller/PlaceOrder?username=lisi

    这就是并发带来的问题。

    第一阶段,利用线程锁简单粗暴

    Web程序是多线程的,那我们把他在容易出现并发的地方加一把锁就可以了,如下图处理方式。

    复制代码
            private static object _lock = new object();
    
            public ContentResult PlaceOrder(string userName)
            {
                using (RuanMou2020Entities db = new RuanMou2020Entities())
                {
                    lock (_lock)
                    {
                        var product = db.product.Where<product>(p => p.status == 0).FirstOrDefault();
                        if (product.status == 1)
                        {
                            return Content("失败,产品已经被卖光");
                        }
                        else
                        {
                            //模拟数据库慢造成并发问题
                            Thread.Sleep(5000);
                            product.status = 1;
                            product.username = userName;
                            db.SaveChanges();
                            return Content("成功购买");
                        }
                    }
                }
            }
    复制代码

    这样每一个请求都是依次执行,不会出现并发问题了。

    优点:解决了并发的问题。

    缺点:效率太慢,用户体验性太差,不适合大数据量场景。

    第二阶段,拉消息队列,通过生产者,消费者的模式

    1,创建订单提交入口(生产者)

    复制代码
    public class HomeController : Controller
        {
    
            /// <summary>
            /// 接受订单提交(生产者)
            /// </summary>
            /// <returns></returns>
            public ContentResult PlaceOrderQueen(string userName)
            {
                //直接将请求写入到订单队列
                OrderConsumer.TicketOrders.Enqueue(userName);
                return Content("wait");
            }
    
            /// <summary>
            /// 查询订单结果
            /// </summary>
            /// <returns></returns>
            public ContentResult PlaceOrderQueenResult(string userName)
            {
                var rel = OrderConsumer.OrderResults.Where(p => p.userName == userName).FirstOrDefault();
                if (rel == null)
                {
                    return Content("还在排队中");
                }
                else
                {
                    return Content(rel.Result.ToString());
                }
            }
    }
    复制代码

    2,创建订单处理者(消费者)

    复制代码
    /// <summary>
        /// 订单的处理者(消费者)
        /// </summary>
        public class OrderConsumer
        {
            /// <summary>
            /// 订票的消息队列
            /// </summary>
            public static ConcurrentQueue<string> TicketOrders = new ConcurrentQueue<string>();
            /// <summary>
            /// 订单结果消息队列
            /// </summary>
            public static List<OrderResult> OrderResults = new List<OrderResult>();
            /// <summary>
            /// 订单处理
            /// </summary>
            public static void StartTicketTask()
            {
                string userName = null;
                while (true)
                {
                    //如果没有订单任务就休息1秒钟
                    if (!TicketOrders.TryDequeue(out userName))
                    {
                        Thread.Sleep(1000);
                        continue;
                    }
                    //执行真实的业务逻辑(如插入数据库)
                    bool rel = new TicketHelper().PlaceOrderDataBase(userName);
                    //将执行结果写入结果集合
                    OrderResults.Add(new OrderResult() { Result = rel, userName = userName });
                }
            }
        }
    复制代码

    3,创建订单业务的实际执行者

    复制代码
    /// <summary>
        /// 订单业务的实际处理者
        /// </summary>
        public class TicketHelper
        {
            /// <summary>
            /// 实际库存标识
            /// </summary>
            private bool hasStock = true;
            /// <summary>
            /// 执行一个订单到数据库
            /// </summary>
            /// <returns></returns>
            public bool PlaceOrderDataBase(string userName)
            {
                //如果没有了库存,则直接返回false,防止频繁读库
                if (!hasStock)
                {
                    return hasStock;
                }
                using (RuanMou2020Entities db = new RuanMou2020Entities())
                {
                    var product = db.product.Where(p => p.status == 0).FirstOrDefault();
                    if (product == null)
                    {
                        hasStock = false;
                        return false;
                    }
                    else
                    {
                        Thread.Sleep(10000);//模拟数据库的效率比较慢,执行插入时间比较久
                        product.status = 1;
                        product.username = userName;
                        db.SaveChanges();
                        return true;
                    }
                }
            }
        }
        /// <summary>
        /// 订单处理结果实体
        /// </summary>
        public class OrderResult
        {
            public string userName { get; set; }
            public bool Result { get; set; }
        }
    复制代码

    4,在程序启动前,启动消费者线程

    复制代码
    protected void Application_Start()
            {
                AreaRegistration.RegisterAllAreas();
                GlobalConfiguration.Configure(WebApiConfig.Register);
                FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
                RouteConfig.RegisterRoutes(RouteTable.Routes);
                BundleConfig.RegisterBundles(BundleTable.Bundles);
    
                //在Global的Application_Start事件里单独开启一个消费者线程
                Task.Run(OrderConsumer.StartTicketTask);
            }
    复制代码

    这样程序的运行模式是:用户提交的需求里都会添加到消息队列里去排队处理,程序会依次处理该队列里的内容(当然可以一次取出多条来进行处理,提高效率)。

    优点:比上一步快了。

    缺点:不够快,而且下单后需要轮询另外一个接口判断是否成功。

    第三阶段 反转生产者消费者的角色,把可售产品提前放到队列里,然后让提交的订单来消费队列里的内容

    1,创建生产者并且在程序启动前调用其初始化程序

    复制代码
    public class ProductForSaleManager
        {
            /// <summary>
            /// 待售商品队列
            /// </summary>
            public static ConcurrentQueue<int> ProductsForSale = new ConcurrentQueue<int>();
            /// <summary>
            /// 初始化待售商品队列
            /// </summary>
            public static void Init()
            {
                using (RuanMou2020Entities db = new RuanMou2020Entities())
                {
                    db.product.Where(p => p.status == 0).Select(p => p.id).ToList().ForEach(p =>
                    {
                        ProductsForSale.Enqueue(p);
                    });
                }
            }
        }
    
    复制代码
    复制代码
     public class MvcApplication : System.Web.HttpApplication
        {
            protected void Application_Start()
            {
                AreaRegistration.RegisterAllAreas();
                GlobalConfiguration.Configure(WebApiConfig.Register);
                FilterConfig.RegisterGlobalFilters(GlobalFilters.Filters);
                RouteConfig.RegisterRoutes(RouteTable.Routes);
                BundleConfig.RegisterBundles(BundleTable.Bundles);
    
                //程序启动前,先初始化待售产品消息队列
                ProductForSaleManager.Init();
            }
        }
    复制代码

    2,创建消费者

    复制代码
    public class OrderController : Controller
        {
            /// <summary>
            /// 下订单
            /// </summary>
            /// <param name="userName">订单提交者</param>
            /// <returns></returns>
            public async Task<ContentResult> PlaceOrder(string userName)
            {
                if (ProductForSaleManager.ProductsForSale.TryDequeue(out int pid))
                {
                    await new TicketHelper2().PlaceOrderDataBase(userName, pid);
                    return Content($"下单成功,对应产品id为:{pid}");
                }
                else
                {
                    await Task.CompletedTask;
                    return Content($"商品已经被抢光");
                }
            }
        }
    复制代码

    3,当然还需要一个业务的实际执行者

    复制代码
    /// <summary>
        /// 订单业务的实际处理者
        /// </summary>
        public class TicketHelper2
        {
            /// <summary>
            /// 执行复杂的订单操作(如数据库)
            /// </summary>
            /// <param name="userName">下单用户</param>
            /// <param name="pid">产品id</param>
            /// <returns></returns>
            public async Task PlaceOrderDataBase(string userName, int pid)
            {
                using (RuanMou2020Entities db = new RuanMou2020Entities())
                {
                    var product = db.product.Where(p => p.id == pid).FirstOrDefault();
                    if (product != null)
                    {
                        product.status = 1;
                        product.username = userName;
                        await db.SaveChangesAsync();
                    }
                }
            }
        }
    复制代码

    这样我们同时访问下面三个地址,如果数据库里只有两个商品的话,会有一个请求结果为:商品已经被抢光。

    http://localhost:88/Order/PlaceOrder?userName=zhangsan

    http://localhost:88/Order/PlaceOrder?userName=lisi

    http://localhost:88/Order/PlaceOrder?userName=wangwu

    这种处理方式的优点为:执行效率快,相比第二种方式不需要第二个接口来返回查询结果。

    缺点:暂时没想到,欢迎大家补充。

    说明:该方式只是个人猜想,并非实际项目经验,大家只能作为参考,慎重用于项目。欢迎大家批评指正。

    https://www.cnblogs.com/chenxizhaolu/p/12543376.html 转载自此处觉得有用的请支持正版

  • 相关阅读:
    JS高级
    函数作用域面试题
    11.14
    11.13
    Redux知识
    react-router-dom
    react 的三大属性
    vuex
    数组的扩展
    函数作用域和 class
  • 原文地址:https://www.cnblogs.com/Gpw012/p/14279074.html
Copyright © 2011-2022 走看看