zoukankan      html  css  js  c++  java
  • Creating a blocking Queue<T> in .NET

    Creating a blocking Queue<T> in .NET

     这里的代码来自我们的StackOverflow 中的代码示例,我添加了一些中文注释;

     //there I will create blocking quene;
        class SizeQueue<T>
        {
            private readonly Queue<T> queue = new Queue<T>();
            private readonly int _maxSize;
            private readonly object _locker = new object();
            private static bool closing;
    
            public SizeQueue(int size)
            {
                this._maxSize = size;
            }
    
            public void Enquen(T item)
            {
    
                lock (_locker)
                {
                    //这里要重点理解使用while 而不是 我们的if
                    //应为当另外一个线程Pulse 之后,又可能另外一个线程获得锁,而不是当前等待的线程;
                    while (this._maxSize <= this.queue.Count)  //为什么这里要用while 而不是if呢;  应为当 pulse 之后释放锁,很有可能另外一个线程活到锁; 然后进入到队列,添加,出来之后返现,队列还是慢慢的;
                    {
                        Monitor.Wait(_locker); //进入等待队列;
                    }
                    //否则就进入我们的 队列中;
                    queue.Enqueue(item);
                    Monitor.PulseAll(_locker); // wake up any blocked dequeue 同时所有的线程去争夺锁,然后 取出数据;
    
                }
    
            }
    
            public T Eequeue()
            {
                lock (_locker)
                {
                    while (queue.Count == 0)
                    {
                        Monitor.Wait(queue);
                    }
                    T item = queue.Dequeue();
                    if (queue.Count == _maxSize - 1) //队列装满了之后,
                    {
                        // wake up any blocked enqueue
                        Monitor.PulseAll(queue); //通知线程继续写入;
                    }
                    return item;
    
                }
            }
    
           //empty queue just returns(rather than blocking):
           //锁,独占锁,确保一次只有一个线程能够操作我们的队列;
           //特别是在我们的读写的时候
           //读的时候,不允许写
           //写的时候,不允许读;
    
           //这里我们添加一个close 和 tryDequeue的方法;
    
            public void Close()
            {
                lock (_locker)
                {
                    closing = true;
                    Monitor.PulseAll(_locker);
                }
            }
    
            /// <summary>
            /// 考虑到有关系的情况;
            /// </summary>
            /// <param name="value"></param>
            /// <returns></returns>
            public bool TryDequeue(out T value)
            {
                lock (_locker)
                {
    
                    while (queue.Count == 0)
                    {
                        if (closing)
                        {
                            value = default(T);
                            return false;
                        }
                        Monitor.Wait(_locker);
                    }
                    value = value = queue.Dequeue();
                    if (queue.Count == this._maxSize - 1)
                    {
                        Monitor.PulseAll(_locker);
                    }
                    return true;
                }
    
            }
    
        }

    当然这里还有我们的第二个版本的使用;

        class BlockingQueue<T>
        {
            /// <summary>
            ///队列最大值
            /// </summary>
            private readonly int _maxSize;
    
            /// <summary>
            /// 我们的队列
            /// </summary>
            private Queue<T> _Queue = new Queue<T>();
    
            /// <summary>
            /// 我们的独占锁
            /// </summary>
            private readonly object _locker = new object();
    
    
            /// <summary>
            /// 是否停止操作;
            /// </summary>
            private bool _Quit = false;
    
    
            /// <summary>
            /// 初始化队列的大小;
            /// </summary>
            public BlockingQueue(int maxSize)
            {
                this._maxSize = maxSize;
            }
    
            /// <summary>
            /// 取消操作
            /// </summary>
            public void Quit()
            {
                lock (_locker)
                {
                    _Quit = true;
    
                    Monitor.PulseAll(_locker);
                }
            }
    
            /// <summary>
            /// 队列的进入;
            /// </summary>
            /// <param name="t"></param>
            public bool Enqueue(T t)
            {
    
                lock (_locker) //确保线程的安全;
                {
                    while (!this._Quit && this._Queue.Count >= this._maxSize) Monitor.Wait(_locker); //进入等待的队列中;
    
                    if (_Quit) return false;
                    //进入队列
                    _Queue.Enqueue(t);
    
                    //释放信号;
                    Monitor.PulseAll(_locker);  //一旦队列中有了,就通知 去去;
    
                }
                return true;
    
            }
    
            /// <summary>
            ///取出队列中的值;
            /// </summary>
            /// <param name="t"></param>
            /// <returns></returns>
            public bool Dequeue(out T t)
            {
                t = default(T);
    
                lock (_locker)
                {
                    while (!this._Quit && this._Queue.Count == 0) Monitor.Wait(_locker);
    
                    if (_Queue.Count == 0) return false;  //这个大概是以多余的安全机制吧;
    
                    t = this._Queue.Dequeue();
    
                    Monitor.PulseAll(_locker);
                }
                return true;
    
            }
            //整体来说,这个方法还挺好使用使用的,效果不错的感觉;
    
    
            //线程的执行是没有太多额顺序的的; 如果三个线程读,两个线程写; 可能window 线程如何去协调这些线程执行呢;
    
    
        }
    
    
        class Program
        {
            //with one fast producer and two slow consumers:
            static void Test()
            {
                var q = new BlockingQueue<int>(4);
                //一个线程生成;
                new Thread(() =>
                {
                    for (int x = 0; ; x++)
                    {
                        if (!q.Enqueue(x)) break;
                        Trace.WriteLine(x.ToString("0000") + " >");
    
                    }
                    Trace.WriteLine("Producer quitting");
                }).Start();
    
    
                //消费者;
                // Consumers
                for (int i = 0; i < 2; i++)
                {
                    new Thread(() =>
                    {
                        for (;;)
                        {
                            Thread.Sleep(100);
                            int x = 0;
                            if (!q.Dequeue(out x)) break;
                            Trace.WriteLine("     < " + x.ToString("0000"));
                        }
                        Trace.WriteLine("Consumer quitting");
                    }).Start();
                }
    
                Thread.Sleep(3000);
    
                Trace.WriteLine("Quitting");
    
                q.Quit();
    
    
            }
    
            static void Main(string[] args)
            {
    
    
                Test();
    
    
            }
        }

    要锻炼自己写源代码的能力,所以要多看,多学习,看多额,才能写点东西出来的呀;

    这样的效果还是挺好;

  • 相关阅读:
    make -j 8参数的作用
    使用请求头认证来测试需要授权的 API 接口
    查看Linux系统的平均负载
    服务器负载均衡的基本功能和实现原理
    Oracle RAC学习笔记:基本概念及入门
    详解物化视图(汇总比较有用的资料)
    程序优化注意的一些点
    PR 审批界面增加显示项方法
    Most Common Solutions to FRM-41839 and .tmp Files Not Being Deleted
    APPCORE Routine APIs
  • 原文地址:https://www.cnblogs.com/mc67/p/7569004.html
Copyright © 2011-2022 走看看