zoukankan      html  css  js  c++  java
  • C#实现请求唯一性校验支持高并发

    使用场景描述:

      网络请求中经常会遇到发送的请求,服务端响应是成功的,但是返回的时候出现网络故障,导致客户端无法接收到请求结果,那么客户端程序可能判断为网络故障,而重复发送同一个请求。当然如果接口中定义了请求结果查询接口,那么这种重复会相对少一些。特别是交易类的数据,这种操作更是需要避免重复发送请求。另外一种情况是用户过于快速的点击界面按钮,产生连续的相同内容请求,那么后端也需要进行过滤,这种一般出现在系统对接上,无法去控制第三方系统的业务逻辑,需要从自身业务逻辑里面去限定。

    其他需求描述:

      这类请求一般存在时间范围和高并发的特点,就是短时间内会出现重复的请求,因此对模块需要支持高并发性。

    技术实现:

      对请求的业务内容进行MD5摘要,并且将MD5摘要存储到缓存中,每个请求数据都通过这个一个公共的调用的方法进行判断。

    代码实现:

      公共调用代码 UniqueCheck 采用单例模式创建唯一对象,便于在多线程调用的时候,只访问一个统一的缓存库

    /*
             * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
             * 它是被设计用来修饰被不同线程访问和修改的变量。
             * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
             */
            private static readonly object lockHelper = new object();
    
            private volatile static UniqueCheck _instance;        
    
            /// <summary>
            /// 获取单一实例
            /// </summary>
            /// <returns></returns>
            public static UniqueCheck GetInstance()
            {
                if (_instance == null)
                {
                    lock (lockHelper)
                    {
                        if (_instance == null)
                            _instance = new UniqueCheck();
                    }
                }
                return _instance;
            }
    

      这里需要注意volatile的修饰符,在实际测试过程中,如果没有此修饰符,在高并发的情况下会出现报错。

      自定义一个可以进行并发处理队列,代码如下:ConcurrentLinkedQueue

      1 using System;
      2 using System.Collections.Generic;
      3 using System.Text;
      4 using System.Threading;
      5 
      6 namespace PackgeUniqueCheck
      7 {
      8     /// <summary>
      9     /// 非加锁并发队列,处理100个并发数以内
     10     /// </summary>
     11     /// <typeparam name="T"></typeparam>
     12     public class ConcurrentLinkedQueue<T>
     13     {
     14         private class Node<K>
     15         {
     16             internal K Item;
     17             internal Node<K> Next;
     18 
     19             public Node(K item, Node<K> next)
     20             {
     21                 this.Item = item;
     22                 this.Next = next;
     23             }
     24         }
     25 
     26         private Node<T> _head;
     27         private Node<T> _tail;
     28 
     29         public ConcurrentLinkedQueue()
     30         {
     31             _head = new Node<T>(default(T), null);
     32             _tail = _head;
     33         }
     34 
     35         public bool IsEmpty
     36         {
     37             get { return (_head.Next == null); }
     38         }
     39         /// <summary>
     40         /// 进入队列
     41         /// </summary>
     42         /// <param name="item"></param>
     43         public void Enqueue(T item)
     44         {
     45             Node<T> newNode = new Node<T>(item, null);
     46             while (true)
     47             {
     48                 Node<T> curTail = _tail;
     49                 Node<T> residue = curTail.Next;
     50 
     51                 //判断_tail是否被其他process改变
     52                 if (curTail == _tail)
     53                 {
     54                     //A 有其他process执行C成功,_tail应该指向新的节点
     55                     if (residue == null)
     56                     {
     57                         //C 其他process改变了tail节点,需要重新取tail节点
     58                         if (Interlocked.CompareExchange<Node<T>>(
     59                           ref curTail.Next, newNode, residue) == residue)
     60                         {
     61                             //D 尝试修改tail
     62                             Interlocked.CompareExchange<Node<T>>(ref _tail, newNode, curTail);
     63                             return;
     64                         }
     65                     }
     66                     else
     67                     {
     68                         //B 帮助其他线程完成D操作
     69                         Interlocked.CompareExchange<Node<T>>(ref _tail, residue, curTail);
     70                     }
     71                 }
     72             }
     73         }
     74         /// <summary>
     75         /// 队列取数据
     76         /// </summary>
     77         /// <param name="result"></param>
     78         /// <returns></returns>
     79         public bool TryDequeue(out T result)
     80         {
     81             Node<T> curHead;
     82             Node<T> curTail;
     83             Node<T> next;
     84             while (true)
     85             {
     86                 curHead = _head;
     87                 curTail = _tail;
     88                 next = curHead.Next;
     89                 if (curHead == _head)
     90                 {
     91                     if (next == null) //Queue为空
     92                     {
     93                         result = default(T);
     94                         return false;
     95                     }
     96                     if (curHead == curTail) //Queue处于Enqueue第一个node的过程中
     97                     {
     98                         //尝试帮助其他Process完成操作
     99                         Interlocked.CompareExchange<Node<T>>(ref _tail, next, curTail);
    100                     }
    101                     else
    102                     {
    103                         //取next.Item必须放到CAS之前
    104                         result = next.Item;
    105                         //如果_head没有发生改变,则将_head指向next并退出
    106                         if (Interlocked.CompareExchange<Node<T>>(ref _head,
    107                           next, curHead) == curHead)
    108                             break;
    109                     }
    110                 }
    111             }
    112             return true;
    113         }
    114         /// <summary>
    115         /// 尝试获取最后一个对象
    116         /// </summary>
    117         /// <param name="result"></param>
    118         /// <returns></returns>
    119         public bool TryGetTail(out T result)
    120         {
    121             result = default(T);
    122             if (_tail == null)
    123             {
    124                 return false;
    125             }
    126             result = _tail.Item;
    127             return true;
    128         }
    129     }
    130 }

    虽然是一个非常简单的唯一性校验逻辑,但是要做到高效率,高并发支持,高可靠性,以及低内存占用,需要实现这样的需求,需要做细致的模拟测试。

      1 using System;
      2 using System.Collections.Generic;
      3 using System.Text;
      4 using System.Threading;
      5 using System.Collections;
      6 
      7 namespace PackgeUniqueCheck
      8 {
      9     public class UniqueCheck
     10     {
     11         /*
     12          * volatile就像大家更熟悉的const一样,volatile是一个类型修饰符(type specifier)。
     13          * 它是被设计用来修饰被不同线程访问和修改的变量。
     14          * 如果没有volatile,基本上会导致这样的结果:要么无法编写多线程程序,要么编译器失去大量优化的机会。
     15          */
     16         private static readonly object lockHelper = new object();
     17 
     18         private volatile static UniqueCheck _instance;        
     19 
     20         /// <summary>
     21         /// 获取单一实例
     22         /// </summary>
     23         /// <returns></returns>
     24         public static UniqueCheck GetInstance()
     25         {
     26             if (_instance == null)
     27             {
     28                 lock (lockHelper)
     29                 {
     30                     if (_instance == null)
     31                         _instance = new UniqueCheck();
     32                 }
     33             }
     34             return _instance;
     35         }
     36 
     37         private UniqueCheck()
     38         {
     39             //创建一个线程安全的哈希表,作为字典缓存
     40             _DataKey = Hashtable.Synchronized(new Hashtable());
     41             Queue myqueue = new Queue();
     42             _DataQueue = Queue.Synchronized(myqueue);
     43             _Myqueue = new ConcurrentLinkedQueue<string>();
     44             _Timer = new Thread(DoTicket);
     45             _Timer.Start();
     46         }
     47 
     48         #region 公共属性设置
     49         /// <summary>
     50         /// 设定定时线程的休眠时间长度:默认为1分钟
     51         /// 时间范围:1-7200000,值为1毫秒到2小时
     52         /// </summary>
     53         /// <param name="value"></param>
     54         public void SetTimeSpan(int value)
     55         {
     56             if (value > 0&& value <=7200000)
     57             {
     58                 _TimeSpan = value;
     59             }
     60         }
     61         /// <summary>
     62         /// 设定缓存Cache中的最大记录条数
     63         /// 值范围:1-5000000,1到500万
     64         /// </summary>
     65         /// <param name="value"></param>
     66         public void SetCacheMaxNum(int value)
     67         {
     68             if (value > 0 && value <= 5000000)
     69             {
     70                 _CacheMaxNum = value;
     71             }
     72         }
     73         /// <summary>
     74         /// 设置是否在控制台中显示日志
     75         /// </summary>
     76         /// <param name="value"></param>
     77         public void SetIsShowMsg(bool value)
     78         {
     79             Helper.IsShowMsg = value;
     80         }
     81         /// <summary>
     82         /// 线程请求阻塞增量
     83         /// 值范围:1-CacheMaxNum,建议设置为缓存最大值的10%-20%
     84         /// </summary>
     85         /// <param name="value"></param>
     86         public void SetBlockNumExt(int value)
     87         {
     88             if (value > 0 && value <= _CacheMaxNum)
     89             {
     90                 _BlockNumExt = value;
     91             }
     92         }
     93         /// <summary>
     94         /// 请求阻塞时间
     95         /// 值范围:1-max,根据阻塞增量设置请求阻塞时间
     96         /// 阻塞时间越长,阻塞增量可以设置越大,但是请求实时响应就越差
     97         /// </summary>
     98         /// <param name="value"></param>
     99         public void SetBlockSpanTime(int value)
    100         {
    101             if (value > 0)
    102             {
    103                 _BlockSpanTime = value;
    104             }
    105         }
    106         #endregion
    107 
    108         #region 私有变量
    109         /// <summary>
    110         /// 内部运行线程
    111         /// </summary>
    112         private Thread _runner = null;
    113         /// <summary>
    114         /// 可处理高并发的队列
    115         /// </summary>
    116         private ConcurrentLinkedQueue<string> _Myqueue = null;
    117         /// <summary>
    118         /// 唯一内容的时间健值对
    119         /// </summary>
    120         private Hashtable _DataKey = null;
    121         /// <summary>
    122         /// 内容时间队列
    123         /// </summary>
    124         private Queue _DataQueue = null;
    125         /// <summary>
    126         /// 定时线程的休眠时间长度:默认为1分钟
    127         /// </summary>
    128         private int _TimeSpan = 3000;
    129         /// <summary>
    130         /// 定时计时器线程
    131         /// </summary>
    132         private Thread _Timer = null;
    133         /// <summary>
    134         /// 缓存Cache中的最大记录条数
    135         /// </summary>
    136         private int _CacheMaxNum = 500000;
    137         /// <summary>
    138         /// 线程请求阻塞增量
    139         /// </summary>
    140         private int _BlockNumExt = 10000;
    141         /// <summary>
    142         /// 请求阻塞时间
    143         /// </summary>
    144         private int _BlockSpanTime = 100;
    145         #endregion
    146 
    147         #region 私有方法
    148         private void StartRun()
    149         {
    150             _runner = new Thread(DoAction);
    151             _runner.Start();
    152             Helper.ShowMsg("内部线程启动成功!");
    153         }
    154 
    155         private string GetItem()
    156         {
    157             string tp = string.Empty;
    158             bool result = _Myqueue.TryDequeue(out tp);
    159             return tp;
    160         }
    161         /// <summary>
    162         /// 执行循环操作
    163         /// </summary>
    164         private void DoAction()
    165         {
    166             while (true)
    167             {
    168                 while (!_Myqueue.IsEmpty)
    169                 {
    170                     string item = GetItem();
    171                     _DataQueue.Enqueue(item);
    172                     if (!_DataKey.ContainsKey(item))
    173                     {
    174                         _DataKey.Add(item, DateTime.Now);
    175                     }
    176                 }
    177                 //Helper.ShowMsg("当前数组已经为空,处理线程进入休眠状态...");
    178                 Thread.Sleep(2);
    179             }
    180         }
    181         /// <summary>
    182         /// 执行定时器的动作
    183         /// </summary>
    184         private void DoTicket()
    185         {
    186             while (true)
    187             {
    188                 Helper.ShowMsg("当前数据队列个数:" + _DataQueue.Count.ToString());
    189                 if (_DataQueue.Count > _CacheMaxNum)
    190                 {
    191                     while (true)
    192                     {
    193                         Helper.ShowMsg(string.Format("当前队列数:{0},已经超出最大长度:{1},开始进行清理操作...", _DataQueue.Count, _CacheMaxNum.ToString()));
    194                         string item = _DataQueue.Dequeue().ToString();
    195                         if (!string.IsNullOrEmpty(item))
    196                         {
    197                             if (_DataKey.ContainsKey(item))
    198                             {
    199                                 _DataKey.Remove(item);
    200                             }
    201                             if (_DataQueue.Count <= _CacheMaxNum)
    202                             {
    203                                 Helper.ShowMsg("清理完成,开始休眠清理线程...");
    204                                 break;
    205                             }
    206                         }
    207                     }
    208                 }
    209                 Thread.Sleep(_TimeSpan);
    210             }
    211         }
    212 
    213         /// <summary>
    214         /// 线程进行睡眠等待
    215         /// 如果当前负载压力大大超出了线程的处理能力
    216         /// 那么需要进行延时调用
    217         /// </summary>
    218         private void BlockThread()
    219         {
    220             if (_DataQueue.Count > _CacheMaxNum + _BlockNumExt)
    221             {
    222                 Thread.Sleep(_BlockSpanTime);
    223             }
    224         }
    225         #endregion
    226 
    227         #region 公共方法
    228         /// <summary>
    229         /// 开启服务线程
    230         /// </summary>
    231         public void Start()
    232         {
    233             if (_runner == null)
    234             {
    235                 StartRun();
    236             }
    237             else
    238             {
    239                 if (_runner.IsAlive == false)
    240                 {
    241                     StartRun();
    242                 }
    243             }
    244 
    245         }
    246         /// <summary>
    247         /// 关闭服务线程
    248         /// </summary>
    249         public void Stop()
    250         {
    251             if (_runner != null)
    252             {
    253                 _runner.Abort();
    254                 _runner = null;
    255             }
    256         }
    257 
    258         /// <summary>
    259         /// 添加内容信息
    260         /// </summary>
    261         /// <param name="item">内容信息</param>
    262         /// <returns>true:缓存中不包含此值,队列添加成功,false:缓存中包含此值,队列添加失败</returns>
    263         public bool AddItem(string item)
    264         {
    265             BlockThread();
    266             item = Helper.MakeMd5(item);
    267             if (_DataKey.ContainsKey(item))
    268             {
    269                 return false;
    270             }
    271             else
    272             {
    273                 _Myqueue.Enqueue(item);
    274                 return true;
    275             }
    276         }
    277         /// <summary>
    278         /// 判断内容信息是否已经存在
    279         /// </summary>
    280         /// <param name="item">内容信息</param>
    281         /// <returns>true:信息已经存在于缓存中,false:信息不存在于缓存中</returns>
    282         public bool CheckItem(string item)
    283         {
    284             item = Helper.MakeMd5(item);
    285             return _DataKey.ContainsKey(item);
    286         }
    287         #endregion    
    288 
    289     }
    290 }

    模拟测试代码:

    private static string _example = Guid.NewGuid().ToString();
    
            private static UniqueCheck _uck = null;
    
            static void Main(string[] args)
            {
                _uck = UniqueCheck.GetInstance();
                _uck.Start();
                _uck.SetIsShowMsg(false);
                _uck.SetCacheMaxNum(20000000);
                _uck.SetBlockNumExt(1000000);
                _uck.SetTimeSpan(6000);
    
                _uck.AddItem(_example);
                Thread[] threads = new Thread[20];
    
                for (int i = 0; i < 20; i++)
                {
                    threads[i] = new Thread(AddInfo);
                    threads[i].Start();
                }
    
                Thread checkthread = new Thread(CheckInfo);
                checkthread.Start();
    
                string value = Console.ReadLine();
    
                checkthread.Abort();
                for (int i = 0; i < 50; i++)
                {
                    threads[i].Abort();
                }
                _uck.Stop();
            }
    
            static void AddInfo()
            {
                while (true)
                {
                    _uck.AddItem(Guid.NewGuid().ToString());
                }
            }
    
            static void CheckInfo()
            {
                while (true)
                {
                    Console.WriteLine("开始时间:{0}...", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
                    Console.WriteLine("插入结果:{0}", _uck.AddItem(_example));
                    Console.WriteLine("结束时间:{0}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.ffff"));
              //调整进程休眠时间,可以测试高并发的情况 //Thread.Sleep(
    1000); } }

    测试截图:

  • 相关阅读:
    欧拉定理 (证明+在求逆元上的应用)
    【转】弱校的ACM奋斗史
    SDUT 2412 (单调队列 + dp)
    做SRM感想。。。
    平面图中最小割向最短路的转化
    HDU 4533
    黑书上的DP 30题
    POJ【数论/组合/博弈论】题目列表
    HDU 4534 郑厂长系列故事——新闻净化
    SRM 571
  • 原文地址:https://www.cnblogs.com/huangyoulong/p/11611145.html
Copyright © 2011-2022 走看看