zoukankan      html  css  js  c++  java
  • C#无锁内存队列

      1 public delegate void Consumer(IList<object> monitors);
      2 
      3     public enum ConsumeErrorAction
      4     {
      5         AbandonAndLogException
      6     }
      7     public enum NotReachBatchCountConsumeAction
      8     {
      9         ConsumeAllItems
     10     }
     11     public enum ReachMaxItemCountAction
     12     {
     13         AbandonOldItems
     14     }
     15     public class MyQueueConfig
     16     {
     17         private string queueName;
     18         public Consumer Consumer { get; private set; }
     19         public MyQueueConfig(string name, Consumer c)
     20         {
     21             queueName = name;
     22             Consumer = c;
     23         }
     24 
     25         public int MaxItemCount { get; set; }
     26         public ConsumeErrorAction ConsumeErrorAction { get; set; }
     27         public int ConsumeIntervalMilliseconds { get; set; }
     28         public int ConsumeItemCountInOneBatch { get; set; }
     29         public int ConsumeThreadCount { get; set; }
     30         public NotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }
     31         public ReachMaxItemCountAction ReachMaxItemCountAction { get; set; }
     32     }
     33     public class MyQueue
     34     {
     35         private MyQueueConfig c;
     36         private Queue<object> queue;
     37         private Dictionary<int, Queue<List<object>>> queueData;
     38         private Dictionary<int, Thread> threads;
     39         public MyQueue()
     40         {
     41             queue = new Queue<object>();
     42             queueData = new Dictionary<int, Queue<List<object>>>();
     43             threads = new Dictionary<int, Thread>();
     44         }
     45 
     46         public void Init(MyQueueConfig config)
     47         {
     48             c = config;
     49             for (int i = 0; i < c.ConsumeThreadCount; i++)
     50             {
     51                 Thread thread = new Thread(consumer);
     52                 thread.Start(i);
     53                 threads.Add(i, thread);
     54                 queueData.Add(i, new Queue<List<object>>());
     55             }
     56             Thread threadBack = new Thread(consumer);
     57             threadBack.Start(c.ConsumeThreadCount);
     58             queueData.Add(c.ConsumeThreadCount, new Queue<List<object>>());
     59 
     60             new Thread(sendConsumer).Start();
     61         }
     62 
     63         private void sendConsumer()
     64         {
     65             try
     66             {
     67                 while (true)
     68                 {
     69                     List<object> forConsumer = new List<object>();
     70                     int queueCount = queue.Count;
     71                     //队列空时
     72                     if (queueCount == 0)
     73                     {
     74                         Thread.Sleep(1000);
     75                         continue;
     76                     }
     77                     //队列过大时
     78                     if (queueCount > c.MaxItemCount - 1000)
     79                     {
     80                         queue.Clear();
     81                         continue;
     82                     }
     83                     int consumerCount = c.ConsumeItemCountInOneBatch;
     84                     //队列不满每次消费数量时
     85                     if (queueCount < consumerCount)
     86                     {
     87                         consumerCount = queueCount;
     88                     }
     89                     for (int i = 0; i < consumerCount; i++)
     90                     {
     91                         forConsumer.Add(queue.Dequeue());
     92                     }
     93 
     94                     List<object[]> batchs = forConsumer.BatchesOf(consumerCount / c.ConsumeThreadCount).ToList();
     95                     if (batchs.Count < c.ConsumeThreadCount)
     96                     {
     97                         batchs.ForEach(t =>
     98                         {
     99                             queueData[0].Enqueue(t.ToList());
    100                         });
    101                     }
    102                     else
    103                     {
    104                         for (int i = 0; i < batchs.Count; i++)
    105                         {
    106                             queueData[i].Enqueue(batchs[i].ToList());
    107                         }
    108                     }
    109                     //获取大队列数据
    110                     //分发到线程数量的小队列中
    111                     Thread.Sleep(c.ConsumeIntervalMilliseconds);
    112                 }
    113             }
    114             catch (Exception ex)
    115             {
    116                 //异常117             }
    118 
    119         }
    120 
    121         private void consumer(object index)
    122         {
    123             try
    124             {
    125                 int queueIndex = Convert.ToInt32(index);
    126                 while (true)
    127                 {
    128                     if (queueData[queueIndex].Count > 0)
    129                     {
    130                         List<object> forConsumerQueue = queueData[queueIndex].Dequeue();
    131                         if (forConsumerQueue.Count > 0)
    132                         {
    133                             c.Consumer(forConsumerQueue);
    134                         }
    135                         else
    136                         {
    137                             Thread.Sleep(c.ConsumeIntervalMilliseconds);
    138                         }
    139                     }
    140                     else
    141                     {
    142                         Thread.Sleep(c.ConsumeIntervalMilliseconds);
    143                     }
    144                     //Thread.Sleep(c.ConsumeIntervalMilliseconds);
    145                 }
    146             }
    147             catch (Exception ex)
    148             {
    149                 //异常150             }
    151 
    152         }
    153 
    154         public void Enqueue(object obj)
    155         {
    156             queue.Enqueue(obj);
    157         }
    158     }

    消费过程如下:

    1.启动构建主队列,主队列线程;消费队列,消费队列线程

    2.主队列线程负责拉取指定数量的数据到待消费队列

    3.待消费队列每个单独启动一个线程进行消费,

    过程每个队列单线程消费,保证消费多线程的同时又无锁,我是这么认为的。

    中间有个拓展方法:

    BatchesOf

    public static class EnumerableExtensions
        {
            public static IEnumerable<T[]> BatchesOf<T>(this IEnumerable<T> sequence, int batchSize)
            {
                List<T> iteratorVariable0 = new List<T>(batchSize);
                foreach (T iteratorVariable1 in sequence)
                {
                    iteratorVariable0.Add(iteratorVariable1);
                    if (iteratorVariable0.Count >= batchSize)
                    {
                        yield return iteratorVariable0.ToArray();
                        iteratorVariable0.Clear();
                    }
                }
                if (iteratorVariable0.Count > 0)
                {
                    yield return iteratorVariable0.ToArray();
                    iteratorVariable0.Clear();
                }
            }
        }

    调用如下:

    public static MyQueue queueService = new MyQueue();
            private static bool _enable;
            private static int _threadCount = 2;
            private static int _mSecond = 1000;
            private static int _oneBatch = 500;
    
            private static object _root = new object();
    
            public static QueueAsycAction Instance
            {
                get
                {
                    if (_instance == null)
                        lock (_root)
                            if (_instance == null)
                                _instance = new QueueAsycAction();
    
                    return _instance;
                }
            }
    
            private QueueAsycAction()
            {
    
                InitMemoryQueueService();
            }
            /// <summary>
            /// 初始化队列
            /// </summary>
            private void InitMemoryQueueService()
            {
                try
                {
                    queueService.Init(new MyQueueConfig("Member_AsycAction", Consumer)
                    {
                        ConsumeIntervalMilliseconds = _mSecond,
                        ConsumeItemCountInOneBatch = _oneBatch,
                        ConsumeThreadCount = _threadCount,
                        MaxItemCount = 100000,
                        NotReachBatchCountConsumeAction = NotReachBatchCountConsumeAction.ConsumeAllItems,
                        ReachMaxItemCountAction = ReachMaxItemCountAction.AbandonOldItems,
                    });
                }
                catch (Exception ex)
                {
                   //异常处理
                }
    
            }
    
            /// <summary>
            /// 消费者
            /// </summary>
            /// <param name="monitors"></param>
            private void Consumer(IList<object> monitors)
            {
                try
                {
                    foreach (object item in monitors)
                    {
                        //消费方法
                    }
                }
                catch (Exception ex)
                {
                    //异常处理
                }
    
            }
            /// <summary>
            /// 添加队列项
            /// </summary>
            /// <param name="log"></param>
            public void AddItem(object action)
            {
                try
                {
                    queueService.Enqueue(action);
                }
                catch (Exception ex)
                {
                    //异常处理
                }
            }

     采用单例模式调用,可以实例化多条队列做不同的操作。

     望指正

  • 相关阅读:
    Codeforces Beta Round #92 (Div. 2 Only) B. Permutations 模拟
    POJ 3281 Dining 最大流 Dinic算法
    POJ 2441 Arrange the BUlls 状压DP
    URAL 1152 Faise Mirrors 状压DP 简单题
    URAL 1039 Anniversary Party 树形DP 水题
    URAL 1018 Binary Apple Tree 树形DP 好题 经典
    pytorch中的forward前向传播机制
    .data()与.detach()的区别
    Argparse模块
    pytorch代码调试工具
  • 原文地址:https://www.cnblogs.com/isingel/p/4919892.html
Copyright © 2011-2022 走看看