zoukankan      html  css  js  c++  java
  • 基于阻塞队列的生产者消费者C#并发设计

    这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>>,我改成适合我的版本了,直接给code:

    调用code:

    static void Main(string[] args)
            {
                ProcessQueue<int> processQueue = new ProcessQueue<int>();
                processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent;
                processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent;
                
    
                for (int i = 0; i < 50; i++)
                {
                    processQueue.Enqueue(i);
                }
    
                Console.WriteLine("阻塞队列的数量: {0}", processQueue.GetInternalItemCount());
    
                processQueue.Flush();
    
                Console.Read();
            }
    
            /// <summary>
            /// 该方法对入队的每个元素进行处理
            /// </summary>
            /// <param name="value"></param>
            private static void ProcessQueue_ProcessItemEvent(int value)
            {
                Console.WriteLine("输出: {0}", value);
            }
    
            /// <summary>
            ///  处理异常
            /// </summary>
            /// <param name="obj">队列实例</param>
            /// <param name="ex">异常对象</param>
            /// <param name="value">出错的数据</param>
            private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value)
            {
                Console.WriteLine(ex.ToString());
            }

    封装的队列:

    public class ProcessQueue<T>
        {
            private BlockingCollection<T> _queue;
            private CancellationTokenSource _cancellationTokenSource;
            private CancellationToken _cancellToken;
            //内部线程池
            private List<Thread> _threadCollection;
    
            //队列是否正在处理数据
            private int _isProcessing;
            //有线程正在处理数据
            private const int Processing = 1;
            //没有线程处理数据
            private const int UnProcessing = 0;
            //队列是否可用   单个线程下用while来判断,多个线程下用if判断,随后while循环队列的数量
            private volatile bool _enabled = true;
            //内部处理线程数量
            private int _internalThreadCount;
            // 消费者处理事件
            public event Action<T> ProcessItemEvent;
            //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据
            public event Action<dynamic, Exception, T> ProcessExceptionEvent;
    
            public ProcessQueue()
            {
                _queue = new BlockingCollection<T>();
                _cancellationTokenSource = new CancellationTokenSource();
                _internalThreadCount = 3;
                _cancellToken = _cancellationTokenSource.Token;
                _threadCollection = new List<Thread>();
            }
    
            public ProcessQueue(int internalThreadCount) : this()
            {
                this._internalThreadCount = internalThreadCount;
            }
    
            /// <summary>
            /// 队列内部元素的数量 
            /// </summary>
            public int GetInternalItemCount()
            {
                //return _queue.Count;
                return _threadCollection.Count;
            }
            //生产者生产
            public void Enqueue(T items)
            {
                if (items == null)
                {
                    throw new ArgumentException("items");
                }
    
                _queue.Add(items);
                DataAdded();
            }
    
            public void Flush()
            {
                StopProcess();
    
                while (_queue.Count != 0)
                {
                    T item = default(T);
                    if (_queue.TryTake(out item))
                    {
                        try
                        {
                            ProcessItemEvent(item);
                        }
                        catch (Exception ex)
                        {
                            OnProcessException(ex, item);
                        }
                    }
                }
            }
            // 通知消费者消费队列元素
            private void DataAdded()
            {
                if (_enabled)
                {
                    if (!IsProcessingItem())
                    {
                        Console.WriteLine("DataAdded");
                        ProcessRangeItem();
                        StartProcess();
                    }
                }
            }
    
            //判断是否队列有线程正在处理 
            private bool IsProcessingItem()
            {
                // 替换第一个参数, 如果相等
                //int x = Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing);
                return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing);
            }
            // 多消费者消费
            private void ProcessRangeItem()
            {
                for (int i = 0; i < this._internalThreadCount; i++)
                {
                    ProcessItem();
                }
            }
            // 开启消费处理
            private void ProcessItem()
            {
                Thread currentThread = new Thread((state) =>
                {
                    T item = default(T);
                    while (_enabled)
                    {
                        try
                        {
                            try
                            {
                                if (!_queue.TryTake(out item))
                                {
                                    //Console.WriteLine("阻塞队列为0时的item: {0}", item);
                                    //Console.WriteLine("ok!!!");
                                    break;
                                }
                                // 处理事件
                                ProcessItemEvent(item);
                            }
                            catch (OperationCanceledException ex)
                            {
                                DebugHelper.DebugView(ex.ToString());
                            }
    
                        }
                        catch (Exception ex)
                        {
                            OnProcessException(ex, item);
                        }
                    }
                });
                _threadCollection.Add(currentThread);
            }
            // 开启消费者
            private void StartProcess()
            {
                //Console.WriteLine("线程的数量: {0}", _threadCollection.Count);
                foreach (var thread in _threadCollection)
                {
                    thread.Start();
                    thread.IsBackground = true;
                }
            }
            // 终止运行
            private void StopProcess()
            {
                this._enabled = false;
                foreach (var thread in _threadCollection)
                {
                    if (thread.IsAlive)
                    {
                        thread.Join();
                    }
                }
                _threadCollection.Clear();
            }
    
            private void OnProcessException(Exception ex, T item)
            {
                var tempException = ProcessExceptionEvent;
                Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null);
    
                if (tempException != null)
                {
                    ProcessExceptionEvent(this, ex, item);
                }
            }
    
        }
  • 相关阅读:
    Windows Azure Web Site (19) Azure Web App链接到VSTS
    Windows Azure Virtual Machine (35) Azure VM通过Linked DB,执行SQL Job
    Azure PowerShell (16) 并行开关机Azure ARM VM
    Windows Azure Virtual Network (12) 虚拟网络之间点对点连接VNet Peering
    Azure ARM (21) Azure订阅的两种管理模式
    Windows Azure Platform Introduction (14) 申请海外的Windows Azure账户
    Azure ARM (20) 将非托管磁盘虚拟机(Unmanage Disk),迁移成托管磁盘虚拟机(Manage Disk)
    Azure ARM (19) 将传统的ASM VM迁移到ARM VM (2)
    Azure ARM (18) 将传统的ASM VM迁移到ARM VM (1)
    Azure Automation (6) 执行Azure SQL Job
  • 原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8312965.html
Copyright © 2011-2022 走看看