zoukankan      html  css  js  c++  java
  • C#无锁内存队列

      1 public delegate void Consumer(IList<object> monitors);
      2 
      3     public enum ConsumeErrorAction
      4     {
      5         AbandonAndLogException
      6     }
      7     public enum NotReachBatchCountConsumeAction
      8     {
      9         ConsumeAllItems
     10     }
     11     public enum ReachMaxItemCountAction
     12     {
     13         AbandonOldItems
     14     }
     15     public class MyQueueConfig
     16     {
     17         private string queueName;
     18         public Consumer Consumer { get; private set; }
     19         public MyQueueConfig(string name, Consumer c)
     20         {
     21             queueName = name;
     22             Consumer = c;
     23         }
     24 
     25         public int MaxItemCount { get; set; }
     26         public ConsumeErrorAction ConsumeErrorAction { get; set; }
     27         public int ConsumeIntervalMilliseconds { get; set; }
     28         public int ConsumeItemCountInOneBatch { get; set; }
     29         public int ConsumeThreadCount { get; set; }
     30         public NotReachBatchCountConsumeAction NotReachBatchCountConsumeAction { get; set; }
     31         public ReachMaxItemCountAction ReachMaxItemCountAction { get; set; }
     32     }
     33     public class MyQueue
     34     {
     35         private MyQueueConfig c;
     36         private Queue<object> queue;
     37         private Dictionary<int, Queue<List<object>>> queueData;
     38         private Dictionary<int, Thread> threads;
     39         public MyQueue()
     40         {
     41             queue = new Queue<object>();
     42             queueData = new Dictionary<int, Queue<List<object>>>();
     43             threads = new Dictionary<int, Thread>();
     44         }
     45 
     46         public void Init(MyQueueConfig config)
     47         {
     48             c = config;
     49             for (int i = 0; i < c.ConsumeThreadCount; i++)
     50             {
     51                 Thread thread = new Thread(consumer);
     52                 thread.Start(i);
     53                 threads.Add(i, thread);
     54                 queueData.Add(i, new Queue<List<object>>());
     55             }
     56             Thread threadBack = new Thread(consumer);
     57             threadBack.Start(c.ConsumeThreadCount);
     58             queueData.Add(c.ConsumeThreadCount, new Queue<List<object>>());
     59 
     60             new Thread(sendConsumer).Start();
     61         }
     62 
     63         private void sendConsumer()
     64         {
     65             try
     66             {
     67                 while (true)
     68                 {
     69                     List<object> forConsumer = new List<object>();
     70                     int queueCount = queue.Count;
     71                     //队列空时
     72                     if (queueCount == 0)
     73                     {
     74                         Thread.Sleep(1000);
     75                         continue;
     76                     }
     77                     //队列过大时
     78                     if (queueCount > c.MaxItemCount - 1000)
     79                     {
     80                         queue.Clear();
     81                         continue;
     82                     }
     83                     int consumerCount = c.ConsumeItemCountInOneBatch;
     84                     //队列不满每次消费数量时
     85                     if (queueCount < consumerCount)
     86                     {
     87                         consumerCount = queueCount;
     88                     }
     89                     for (int i = 0; i < consumerCount; i++)
     90                     {
     91                         forConsumer.Add(queue.Dequeue());
     92                     }
     93 
     94                     List<object[]> batchs = forConsumer.BatchesOf(consumerCount / c.ConsumeThreadCount).ToList();
     95                     if (batchs.Count < c.ConsumeThreadCount)
     96                     {
     97                         batchs.ForEach(t =>
     98                         {
     99                             queueData[0].Enqueue(t.ToList());
    100                         });
    101                     }
    102                     else
    103                     {
    104                         for (int i = 0; i < batchs.Count; i++)
    105                         {
    106                             queueData[i].Enqueue(batchs[i].ToList());
    107                         }
    108                     }
    109                     //获取大队列数据
    110                     //分发到线程数量的小队列中
    111                     Thread.Sleep(c.ConsumeIntervalMilliseconds);
    112                 }
    113             }
    114             catch (Exception ex)
    115             {
    116                 //异常117             }
    118 
    119         }
    120 
    121         private void consumer(object index)
    122         {
    123             try
    124             {
    125                 int queueIndex = Convert.ToInt32(index);
    126                 while (true)
    127                 {
    128                     if (queueData[queueIndex].Count > 0)
    129                     {
    130                         List<object> forConsumerQueue = queueData[queueIndex].Dequeue();
    131                         if (forConsumerQueue.Count > 0)
    132                         {
    133                             c.Consumer(forConsumerQueue);
    134                         }
    135                         else
    136                         {
    137                             Thread.Sleep(c.ConsumeIntervalMilliseconds);
    138                         }
    139                     }
    140                     else
    141                     {
    142                         Thread.Sleep(c.ConsumeIntervalMilliseconds);
    143                     }
    144                     //Thread.Sleep(c.ConsumeIntervalMilliseconds);
    145                 }
    146             }
    147             catch (Exception ex)
    148             {
    149                 //异常150             }
    151 
    152         }
    153 
    154         public void Enqueue(object obj)
    155         {
    156             queue.Enqueue(obj);
    157         }
    158     }

    消费过程如下:

    1.启动构建主队列,主队列线程;消费队列,消费队列线程

    2.主队列线程负责拉取指定数量的数据到待消费队列

    3.待消费队列每个单独启动一个线程进行消费,

    过程每个队列单线程消费,保证消费多线程的同时又无锁,我是这么认为的。

    中间有个拓展方法:

    BatchesOf

    public static class EnumerableExtensions
        {
            public static IEnumerable<T[]> BatchesOf<T>(this IEnumerable<T> sequence, int batchSize)
            {
                List<T> iteratorVariable0 = new List<T>(batchSize);
                foreach (T iteratorVariable1 in sequence)
                {
                    iteratorVariable0.Add(iteratorVariable1);
                    if (iteratorVariable0.Count >= batchSize)
                    {
                        yield return iteratorVariable0.ToArray();
                        iteratorVariable0.Clear();
                    }
                }
                if (iteratorVariable0.Count > 0)
                {
                    yield return iteratorVariable0.ToArray();
                    iteratorVariable0.Clear();
                }
            }
        }

    调用如下:

    public static MyQueue queueService = new MyQueue();
            private static bool _enable;
            private static int _threadCount = 2;
            private static int _mSecond = 1000;
            private static int _oneBatch = 500;
    
            private static object _root = new object();
    
            public static QueueAsycAction Instance
            {
                get
                {
                    if (_instance == null)
                        lock (_root)
                            if (_instance == null)
                                _instance = new QueueAsycAction();
    
                    return _instance;
                }
            }
    
            private QueueAsycAction()
            {
    
                InitMemoryQueueService();
            }
            /// <summary>
            /// 初始化队列
            /// </summary>
            private void InitMemoryQueueService()
            {
                try
                {
                    queueService.Init(new MyQueueConfig("Member_AsycAction", Consumer)
                    {
                        ConsumeIntervalMilliseconds = _mSecond,
                        ConsumeItemCountInOneBatch = _oneBatch,
                        ConsumeThreadCount = _threadCount,
                        MaxItemCount = 100000,
                        NotReachBatchCountConsumeAction = NotReachBatchCountConsumeAction.ConsumeAllItems,
                        ReachMaxItemCountAction = ReachMaxItemCountAction.AbandonOldItems,
                    });
                }
                catch (Exception ex)
                {
                   //异常处理
                }
    
            }
    
            /// <summary>
            /// 消费者
            /// </summary>
            /// <param name="monitors"></param>
            private void Consumer(IList<object> monitors)
            {
                try
                {
                    foreach (object item in monitors)
                    {
                        //消费方法
                    }
                }
                catch (Exception ex)
                {
                    //异常处理
                }
    
            }
            /// <summary>
            /// 添加队列项
            /// </summary>
            /// <param name="log"></param>
            public void AddItem(object action)
            {
                try
                {
                    queueService.Enqueue(action);
                }
                catch (Exception ex)
                {
                    //异常处理
                }
            }

     采用单例模式调用,可以实例化多条队列做不同的操作。

     望指正

  • 相关阅读:
    软件工程实践个人编程作业
    实验 2:Mininet 实验——拓扑的命令脚本生成
    软工实践个人总结
    第08组 每周小结 (3/3)
    第08组 每周小结 (2/3)
    第08组 每周小结 (1/3)
    第08组 Beta冲刺 总结
    第08组 Beta冲刺 (5/5)
    第08组 Beta冲刺 (4/5)
    第08组 Beta冲刺 (3/5)
  • 原文地址:https://www.cnblogs.com/isingel/p/4919892.html
Copyright © 2011-2022 走看看