zoukankan      html  css  js  c++  java
  • 实现延迟消息队列

    • 交流

      个人博客交流群:580749909 , 顺便推广一下自己和伙伴一起建立wpf交流群:130108655。

    • 简要

     因为在偶然的一次机会下,公司让我着手开发一个数据分发端基于socket通讯的一个中间件。主要用来解决向客户端分发数据的问题,后来多了一个需求就是未付费的用户拿到的数据是有延迟的。

    而付费用户则是正常的。这个时候在网上搜了很久没有找到合适的解决方案,其实能解决这个问题的方案有很多比如说用到一些大厂贡献的xxMQ中间件之类的,确实能解决问题。但是目前项目比较小

    根本用不上这么重的框架,然后又搜索了半天没有暂时没有发现有人用c#来实现,所以才动手写了这个方案。

    附上github源码地址

    • 思路

    这个方案是借鉴了另一位博主的开发思路,受到这位博主的启发然后根据自己的理解写了这个方案。附上该博主的链接地址:  1分钟实现“延迟消息”功能

    在此我就不多赘述里面的内容了。

    • 代码

    首先写一个方案要理清楚自己的项目结构,我做了如下分层。

    • Interfaces , 这层里主要约束延迟消息队列的队列和消息任务行。
     1   public interface IRingQueue<T>
     2     {
     3         /// <summary>
     4         /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
     5         /// </summary>
     6         /// <param name="delayTime">The specified task is executed after N seconds.</param>
     7         /// <param name="action">Definitions of callback</param>
     8         void Add(long delayTime,Action<T> action);
     9 
    10         /// <summary>
    11         /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
    12         /// </summary>
    13         /// <param name="delayTime">The specified task is executed after N seconds.</param>
    14         /// <param name="action">Definitions of callback.</param>
    15         /// <param name="data">Parameters used in the callback function.</param>
    16         void Add(long delayTime, Action<T> action, T data);
    17 
    18         /// <summary>
    19         /// Add tasks [add tasks will automatically generate: task Id, task slot location, number of task cycles]
    20         /// </summary>
    21         /// <param name="delayTime"></param>
    22         /// <param name="action">Definitions of callback</param>
    23         /// <param name="data">Parameters used in the callback function.</param>
    24         /// <param name="id">Task ID, used when deleting tasks.</param>
    25         void Add(long delayTime, Action<T> action, T data, long id);
    26 
    27         /// <summary>
    28         /// Remove tasks [need to know: where the task is, which specific task].
    29         /// </summary>
    30         /// <param name="index">Task slot location</param>
    31         /// <param name="id">Task ID, used when deleting tasks.</param>
    32         void Remove(long id);
    33 
    34         /// <summary>
    35         /// Launch queue.
    36         /// </summary>
    37         void Start();
    38     }
    1 public interface ITask
    2     {
    3     }
    • Achieves,这层里实现之前定义的接口,这里写成抽象类是为了后面方便扩展。
      1    public abstract class BaseQueue<T> : IRingQueue<T>
      2     {
      3         private long _pointer = 0L;
      4         private ConcurrentBag<BaseTask<T>>[] _arraySlot;
      5         private int ArrayMax;
      6 
      7         /// <summary>
      8         /// Ring queue.
      9         /// </summary>
     10         public ConcurrentBag<BaseTask<T>>[] ArraySlot
     11         {
     12             get { return _arraySlot ?? (_arraySlot = new ConcurrentBag<BaseTask<T>>[ArrayMax]); }
     13         }
     14         
     15         public BaseQueue(int arrayMax)
     16         {
     17             if (arrayMax < 60 && arrayMax % 60 == 0)
     18                 throw new Exception("Ring queue length cannot be less than 60 and is a multiple of 60 .");
     19 
     20             ArrayMax = arrayMax;
     21         }
     22 
     23         public void Add(long delayTime, Action<T> action)
     24         {
     25             Add(delayTime, action, default(T));
     26         }
     27 
     28         public void Add(long delayTime,Action<T> action,T data)
     29         {
     30             Add(delayTime, action, data,0);
     31         }
     32 
     33         public void Add(long delayTime, Action<T> action, T data,long id)
     34         {
     35             NextSlot(delayTime, out long cycle, out long pointer);
     36             ArraySlot[pointer] =  ArraySlot[pointer] ?? (ArraySlot[pointer] = new ConcurrentBag<BaseTask<T>>());
     37             var baseTask = new BaseTask<T>(cycle, action, data,id);
     38             ArraySlot[pointer].Add(baseTask);
     39         }
     40 
     41         /// <summary>
     42         /// Remove tasks based on ID.
     43         /// </summary>
     44         /// <param name="id"></param>
     45         public void Remove(long id)
     46         {
     47             try
     48             {
     49                 Parallel.ForEach(ArraySlot, (ConcurrentBag<BaseTask<T>> collection, ParallelLoopState state) =>
     50                 {
     51                     var resulTask = collection.FirstOrDefault(p => p.Id == id);
     52                     if (resulTask != null)
     53                     {
     54                         collection.TryTake(out resulTask);
     55                         state.Break();
     56                     }
     57                 });
     58             }
     59             catch (Exception e)
     60             {
     61                 Console.WriteLine(e);
     62             }
     63         }
     64         
     65         public void Start()
     66         {
     67             while (true)
     68             {
     69                 RightMovePointer();
     70                 Thread.Sleep(1000);
     71                 Console.WriteLine(DateTime.Now.ToString());
     72             }
     73         }
     74 
     75         /// <summary>
     76         /// Calculate the information of the next slot.
     77         /// </summary>
     78         /// <param name="delayTime">Delayed execution time.</param>
     79         /// <param name="cycle">Number of turns.</param>
     80         /// <param name="index">Task location.</param>
     81         private void NextSlot(long delayTime, out long cycle,out long index)
     82         {
     83             try
     84             {
     85                 var circle = delayTime / ArrayMax;
     86                 var second = delayTime % ArrayMax;
     87                 var current_pointer = GetPointer();
     88                 var queue_index = 0L;
     89 
     90                 if (delayTime - ArrayMax > ArrayMax)
     91                 {
     92                     circle = 1;
     93                 }
     94                 else if (second > ArrayMax)
     95                 {
     96                     circle += 1;
     97                 }
     98 
     99                 if (delayTime - circle * ArrayMax < ArrayMax)
    100                 {
    101                     second = delayTime - circle * ArrayMax;
    102                 }
    103 
    104                 if (current_pointer + delayTime >= ArrayMax)
    105                 {
    106                     cycle = (int)((current_pointer + delayTime) / ArrayMax);
    107                     if (current_pointer + second - ArrayMax < 0)
    108                     {
    109                         queue_index = current_pointer + second;
    110                     }
    111                     else if (current_pointer + second - ArrayMax > 0)
    112                     {
    113                         queue_index = current_pointer + second - ArrayMax;
    114                     }
    115                 }
    116                 else
    117                 {
    118                     cycle = 0;
    119                     queue_index = current_pointer + second;
    120                 }
    121                 index = queue_index;
    122             }
    123             catch (Exception e)
    124             {
    125                 Console.WriteLine(e);
    126                 throw;
    127             }
    128         }
    129 
    130         /// <summary>
    131         /// Get the current location of the pointer.
    132         /// </summary>
    133         /// <returns></returns>
    134         private long GetPointer()
    135         {
    136             return Interlocked.Read(ref _pointer);
    137         }
    138 
    139         /// <summary>
    140         /// Reset pointer position.
    141         /// </summary>
    142         private void ReSetPointer()
    143         {
    144             Interlocked.Exchange(ref _pointer, 0);
    145         }
    146 
    147         /// <summary>
    148         /// Pointer moves clockwise.
    149         /// </summary>
    150         private void RightMovePointer()
    151         {
    152             try
    153             {
    154                 if (GetPointer() >= ArrayMax - 1)
    155                 {
    156                     ReSetPointer();
    157                 }
    158                 else
    159                 {
    160                     Interlocked.Increment(ref _pointer);
    161                 }
    162 
    163                 var pointer = GetPointer();
    164                 var taskCollection = ArraySlot[pointer];
    165                 if (taskCollection == null || taskCollection.Count == 0) return;
    166 
    167                 Parallel.ForEach(taskCollection, (BaseTask<T> task) =>
    168                 {
    169                     if (task.Cycle > 0)
    170                     {
    171                         task.SubCycleNumber();
    172                     }
    173 
    174                     if (task.Cycle <= 0)
    175                     {
    176                         taskCollection.TryTake(out task);
    177                         task.TaskAction(task.Data);
    178                     }
    179                 });
    180             }
    181             catch (Exception e)
    182             {
    183                 Console.WriteLine(e);
    184                 throw;
    185             }
    186         }
    187     }
    BaseQueue
     1   public class BaseTask<T> : ITask
     2     {
     3         private long _cycle;
     4         private long _id;
     5         private T _data;
     6 
     7         public Action<T> TaskAction { get; set; }
     8 
     9         public long Cycle
    10         {
    11             get { return Interlocked.Read(ref _cycle); }
    12             set { Interlocked.Exchange(ref _cycle, value); }
    13         }
    14 
    15         public long Id
    16         {
    17             get { return _id; }
    18             set { _id = value; }
    19         }
    20 
    21         public T Data
    22         {
    23             get { return _data; }
    24             set { _data = value; }
    25         }
    26 
    27         public BaseTask(long cycle, Action<T> action, T data,long id)
    28         {
    29             Cycle = cycle;
    30             TaskAction = action;
    31             Data = data;
    32             Id = id;
    33         }
    34 
    35         public BaseTask(long cycle, Action<T> action,T data)
    36         {
    37             Cycle = cycle;
    38             TaskAction = action;
    39             Data = data;
    40         }
    41 
    42         public BaseTask(long cycle, Action<T> action)
    43         {
    44             Cycle = cycle;
    45             TaskAction = action;
    46         }
    47         
    48         public void SubCycleNumber()
    49         {
    50             Interlocked.Decrement(ref _cycle);
    51         }
    52     }
    BaseTask
    • Logic,这层主要实现调用逻辑,调用者最终只需要关心把任务放进队列并指定什么时候执行就行了,根本不需要关心其它的任何信息。
     1  public static void Start()
     2         {
     3             //1.Initialize queues of different granularity.
     4             IRingQueue<NewsModel> minuteRingQueue = new MinuteQueue<NewsModel>();
     5 
     6             //2.Open thread.
     7             var lstTasks = new List<Task>
     8             {
     9                 Task.Factory.StartNew(minuteRingQueue.Start)
    10             };
    11 
    12             //3.Add tasks performed in different periods.
    13             minuteRingQueue.Add(5, new Action<NewsModel>((NewsModel newsObj) =>
    14             {
    15                 Console.WriteLine(newsObj.News);
    16             }), new NewsModel() { News = "Trump's visit to China!" });
    17 
    18             minuteRingQueue.Add(10, new Action<NewsModel>((NewsModel newsObj) =>
    19             {
    20                 Console.WriteLine(newsObj.News);
    21             }), new NewsModel() { News = "Putin Pu's visit to China!" });
    22 
    23             minuteRingQueue.Add(60, new Action<NewsModel>((NewsModel newsObj) =>
    24             {
    25                 Console.WriteLine(newsObj.News);
    26             }), new NewsModel() { News = "Eisenhower's visit to China!" });
    27 
    28             minuteRingQueue.Add(120, new Action<NewsModel>((NewsModel newsObj) =>
    29             {
    30                 Console.WriteLine(newsObj.News);
    31             }), new NewsModel() { News = "Xi Jinping's visit to the US!" });
    32 
    33             //3.Waiting for all tasks to complete is usually not completed. Because there is an infinite loop.
    34             //F5 Run the program and see the effect.
    35             Task.WaitAll(lstTasks.ToArray());
    36             Console.Read();
    37         }
    • Models,这层就是用来在延迟任务中带入的数据模型类而已了。自己用的时候换成任意自定义类型都可以。
    • 截图

  • 相关阅读:
    Recommended Books for Algo Trading in 2020
    Market Making is simpler than you think!
    Top Crypto Market Makers of 2020
    Top Crypto Market Makers, Rated and Reviewed
    爬取伯乐在线文章(五)itemloader
    爬取伯乐在线文章(四)将爬取结果保存到MySQL
    爬取伯乐在线文章(三)爬取所有页面的文章
    爬取伯乐在线文章(二)通过xpath提取源文件中需要的内容
    爬取伯乐在线文章(一)
    爬虫去重策略
  • 原文地址:https://www.cnblogs.com/justzhuzhu/p/9692857.html
Copyright © 2011-2022 走看看