zoukankan      html  css  js  c++  java
  • [超简洁]EasyQ框架-应对WEB高并发业务(秒杀、抽奖)等业务

    背景介绍

        这几年一直在摸索一种框架,足够简单,又能应付很多高并发高性能的需求。研究过一些框架思想如DDD DCI,也实践过CQRS框架。

    但是总觉得复杂度高,门槛也高,自己学都吃力,如果团队新人更难接受。所以自从写了最简单的BaseContext类之后很长一段时间内都没有加任何代码。(basecontext只有10行内代码)

    之前有个秒杀业务要做,用了MVC的异步Action队列处理请求,感觉还是蛮不错,所以跟另外一位同事一同把这个功能整合进这个baseContext里面,既没有用第三方的Queue(如 RabbitMQ )也没有另外开一个宿主进程Exe。

    总之“simple is good!”

    EasyQ

    EasyQ是一个轻量级的专门用来处理高并发HTTP请求的框架。
    应用MVC异步Action机制实现了相同业务单线程排队处理,没有用任何读写锁。
    可以指定某一种业务创建一条队列,也可以指定某一种业务+某一种数据ID作为一条队列。
    如秒杀商品业务,可以指定所有秒杀业务都使用单线程排队处理,避免脏读 脏写。
    但是这样做的话,所有秒杀商品都会进入排队,显然是不科学的。
    所以扩展一种方式是: 秒杀业务+商品ID 作为队列名。
    当然不止商品ID,也可以是用户ID,商品分类等任意字符串作为队列名的后缀。
      GITHUB地址:https://github.com/BTteam/EasyQ

    如能占用您一点时间,提出一点改进的意见,不胜感激!


    使用说明

    HomeController 是入口页面,需要继承AsyncController,使用MVC的异步Action
    BT.Contexts项目放置业务代码,所有Context需要继承抽象类QueueBaseContext,并且实现3个方法
    1,InitData 初始化数据,数据库获取数据的方法应该写在此处
    2,Interact 交互操作,数据模型之间的交互,业务代码的各种计算、判断等
    3,Persist 持久化操作,数据保存到数据库的操作应当写在此处。
    这3个方法的默认执行步骤非常简单 1=》2=》3
      这个类是封装了队列、线程的操作,是EasyQ的核心类。
    在HomeController使用Context时,首先应该分开2个Action 如 TestAsync TestCompleted。这是MVC异步Action的机制决定
    TestAsync用来启动异步,TestCompleted是异步完成后的回调操作。这2个方法必须成对出现。具体原理请参考MSDN

    调用是URL为:{host}/home/text 注意Async后缀在路由时会被去掉。
    SetAsync方法必须传入AsyncManager对象,key是可选参数,如上所述是用来细分队列的。
    如果想根据商品ID生成队列,不同商品的秒杀行为在不同的队列中排队,就在此处用SetAsync传入key是商品ID

    public void TestAsync(string key)
    {
    //GET DATA
    TestContext context = new TestContext(1);
    context.SetAsync(AsyncManager, key);//参数为产品队列标识
    context.Execute();
    }

    再看回调方法

    public ActionResult TestCompleted()
    {
    var result = AsyncManager.Parameters["response"];
    
    return Content(JsonConvert.SerializeObject( result));
    }

     所有Context执行后的结果以Parameters["response"]返回

    核心解析

    QueueBaseContext类

    public abstract class  QueueBaseContext:BaseContext
        {
           private ILog log = LogManager.GetLogger(typeof(QueueBaseContext));
           private static ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>> killQueues = new ConcurrentDictionary<string, ConcurrentQueue<AsyncManager>>();
            private static ConcurrentDictionary<string, Task> taskDic = new ConcurrentDictionary<string, Task>();
           //场景使用步骤  编写好 1.Interact() 2.Persist() 3.在api调用初始场景后,调用QueueContextAsync()
            private AsyncManager AsyncManager;
            private string quenekey;
    
           public void SetAsync(AsyncManager _AsyncManager)
           {
               SetAsync(_AsyncManager, "");
           }
           public void SetAsync(AsyncManager _AsyncManager, string _quenekey)
           {
               quenekey = _quenekey;
               this.AsyncManager = _AsyncManager;
           }
    
           public abstract void InitData();
           public override string Execute()
           {
               if (AsyncManager == null)
               {
                   throw new Exception("必须调用SetAsync 设置AsyncManager对象");
    
               }
             var runtimeType=  this.GetType();
               var qKey = runtimeType.FullName + quenekey;
              // typeof().
               AsyncManager.OutstandingOperations.Increment();
               //开一个队列 判断是否有队列 
               if (killQueues.ContainsKey(qKey) == false)
               {
                   killQueues.TryAdd(qKey, new ConcurrentQueue<AsyncManager>(new[] {AsyncManager}));
               }
               else
               {
                   killQueues[qKey].Enqueue(AsyncManager);
    
               }
               Action ac = () =>
               {
                  
                   while (killQueues[qKey].IsEmpty == false)
                   {
                     //  Thread.Sleep(15000);
                       log.DebugFormat("while 进来了  killQueueitemCount length:{0} ,Q num{1}", killQueues[qKey].Count, killQueues.Count);
                       AsyncManager item;
                       killQueues[qKey].TryDequeue(out item);//取出队列的一个进行处理
                       try
                       {
    
                           InitData();
                           if (Interact())//对应业务逻辑
                               Persist();
    
                           AsyncManager.Parameters["response"] = new { Code = this.StatusCode};
                           AsyncManager.OutstandingOperations.Decrement();
                       }
                       catch (Exception e)
                       {
                           log.ErrorFormat("出错,e msg:{0} ,trace:{1}", e.Message, e.StackTrace);
                           AsyncManager.Parameters["response"] = new { Code = ResponseCode.DataError, Description = "服务器错误,请重试" };
                           AsyncManager.OutstandingOperations.Decrement();
                       }
                   }
                   //remove q
               };
               if (taskDic.ContainsKey(qKey) == false)
               {
                   taskDic.TryAdd(qKey, Task.Factory.StartNew(ac));
               }
               if (taskDic[qKey].IsCompleted || taskDic[qKey].IsFaulted)
               {
                   taskDic[qKey] = Task.Factory.StartNew(ac);
               }
               return "";
           }
         
    
        }

    构建了2个字典 

    killQueues :队列名作为KEY 队列实例作为Value
    taskDic:队列名作为KEY 队列指定的执行Task作为Value

    处理逻辑是创建一个Task 循环队列每次取出一个项,执行业务操作。直到队列为空。
    如果在上一个Task运行过程中有新的请求加入,则不需要新建Task,只需要继续加入队列尾部。上一个Task会执行对应的业务操作。
    队列中的每一个元素代表一次业务操作,操作完毕之后会调用
    AsyncManager.OutstandingOperations.Decrement();
    用来返回异步结果给请求线程。这样HTTP请求结果就返回给用户了。不需要等到队列完毕。
    qKey可以设置任意字符串,用来细分队列名称。

    队列最好用
    ConcurrentQueue
    因为在入列出列操作时处于多线程共享队列,必须要用线程安全的队列类。


    存在缺陷

    1 目前是单点设计,只能在单机上运行,还在研究横向扩展。
    2 性能还需要优化
    3 由于使用异步Action 导致每个Action必须一分为二。

  • 相关阅读:
    飞入飞出效果
    【JSOI 2008】星球大战 Starwar
    POJ 1094 Sorting It All Out
    POJ 2728 Desert King
    【ZJOI 2008】树的统计 Count
    【SCOI 2009】生日快乐
    POJ 3580 SuperMemo
    POJ 1639 Picnic Planning
    POJ 2976 Dropping Tests
    SPOJ QTREE
  • 原文地址:https://www.cnblogs.com/7rhythm/p/5391953.html
Copyright © 2011-2022 走看看