zoukankan      html  css  js  c++  java
  • 基于阻塞队列的生产者消费者C#并发设计

    这是从上文的<<图文并茂的生产者消费者应用实例demo>>整理总结出来的,具体就不说了,直接给出代码,注释我已经加了,原来的code请看<<.Net中的并行编程-7.基于BlockingCollection实现高性能异步队列>>,我改成适合我的版本了,直接给code:

    调用code:

    static void Main(string[] args)
            {
                ProcessQueue<int> processQueue = new ProcessQueue<int>();
                processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent;
                processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent;
                
    
                for (int i = 0; i < 50; i++)
                {
                    processQueue.Enqueue(i);
                }
    
                Console.WriteLine("阻塞队列的数量: {0}", processQueue.GetInternalItemCount());
    
                processQueue.Flush();
    
                Console.Read();
            }
    
            /// <summary>
            /// 该方法对入队的每个元素进行处理
            /// </summary>
            /// <param name="value"></param>
            private static void ProcessQueue_ProcessItemEvent(int value)
            {
                Console.WriteLine("输出: {0}", value);
            }
    
            /// <summary>
            ///  处理异常
            /// </summary>
            /// <param name="obj">队列实例</param>
            /// <param name="ex">异常对象</param>
            /// <param name="value">出错的数据</param>
            private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value)
            {
                Console.WriteLine(ex.ToString());
            }

    封装的队列:

    public class ProcessQueue<T>
        {
            private BlockingCollection<T> _queue;
            private CancellationTokenSource _cancellationTokenSource;
            private CancellationToken _cancellToken;
            //内部线程池
            private List<Thread> _threadCollection;
    
            //队列是否正在处理数据
            private int _isProcessing;
            //有线程正在处理数据
            private const int Processing = 1;
            //没有线程处理数据
            private const int UnProcessing = 0;
            //队列是否可用   单个线程下用while来判断,多个线程下用if判断,随后while循环队列的数量
            private volatile bool _enabled = true;
            //内部处理线程数量
            private int _internalThreadCount;
            // 消费者处理事件
            public event Action<T> ProcessItemEvent;
            //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据
            public event Action<dynamic, Exception, T> ProcessExceptionEvent;
    
            public ProcessQueue()
            {
                _queue = new BlockingCollection<T>();
                _cancellationTokenSource = new CancellationTokenSource();
                _internalThreadCount = 3;
                _cancellToken = _cancellationTokenSource.Token;
                _threadCollection = new List<Thread>();
            }
    
            public ProcessQueue(int internalThreadCount) : this()
            {
                this._internalThreadCount = internalThreadCount;
            }
    
            /// <summary>
            /// 队列内部元素的数量 
            /// </summary>
            public int GetInternalItemCount()
            {
                //return _queue.Count;
                return _threadCollection.Count;
            }
            //生产者生产
            public void Enqueue(T items)
            {
                if (items == null)
                {
                    throw new ArgumentException("items");
                }
    
                _queue.Add(items);
                DataAdded();
            }
    
            public void Flush()
            {
                StopProcess();
    
                while (_queue.Count != 0)
                {
                    T item = default(T);
                    if (_queue.TryTake(out item))
                    {
                        try
                        {
                            ProcessItemEvent(item);
                        }
                        catch (Exception ex)
                        {
                            OnProcessException(ex, item);
                        }
                    }
                }
            }
            // 通知消费者消费队列元素
            private void DataAdded()
            {
                if (_enabled)
                {
                    if (!IsProcessingItem())
                    {
                        Console.WriteLine("DataAdded");
                        ProcessRangeItem();
                        StartProcess();
                    }
                }
            }
    
            //判断是否队列有线程正在处理 
            private bool IsProcessingItem()
            {
                // 替换第一个参数, 如果相等
                //int x = Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing);
                return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing);
            }
            // 多消费者消费
            private void ProcessRangeItem()
            {
                for (int i = 0; i < this._internalThreadCount; i++)
                {
                    ProcessItem();
                }
            }
            // 开启消费处理
            private void ProcessItem()
            {
                Thread currentThread = new Thread((state) =>
                {
                    T item = default(T);
                    while (_enabled)
                    {
                        try
                        {
                            try
                            {
                                if (!_queue.TryTake(out item))
                                {
                                    //Console.WriteLine("阻塞队列为0时的item: {0}", item);
                                    //Console.WriteLine("ok!!!");
                                    break;
                                }
                                // 处理事件
                                ProcessItemEvent(item);
                            }
                            catch (OperationCanceledException ex)
                            {
                                DebugHelper.DebugView(ex.ToString());
                            }
    
                        }
                        catch (Exception ex)
                        {
                            OnProcessException(ex, item);
                        }
                    }
                });
                _threadCollection.Add(currentThread);
            }
            // 开启消费者
            private void StartProcess()
            {
                //Console.WriteLine("线程的数量: {0}", _threadCollection.Count);
                foreach (var thread in _threadCollection)
                {
                    thread.Start();
                    thread.IsBackground = true;
                }
            }
            // 终止运行
            private void StopProcess()
            {
                this._enabled = false;
                foreach (var thread in _threadCollection)
                {
                    if (thread.IsAlive)
                    {
                        thread.Join();
                    }
                }
                _threadCollection.Clear();
            }
    
            private void OnProcessException(Exception ex, T item)
            {
                var tempException = ProcessExceptionEvent;
                Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null);
    
                if (tempException != null)
                {
                    ProcessExceptionEvent(this, ex, item);
                }
            }
    
        }
  • 相关阅读:
    分页工具类
    ajax乱码的问题
    ibatis配置文件中的XML解析错误The content of elements must consist of well-formed character data or markup.
    nginx 反向代理导致的session丢失的问题
    后台返回的值ajax接收不到
    C/C++中vector与list的区别
    C/C++中内存泄漏、内存溢出与野指针的解释与说明
    C++中深拷贝与浅拷贝
    C++中的构造函数与析构函数及组合类的调用
    Linux中request_irq()中断申请与处理说明
  • 原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8312965.html
Copyright © 2011-2022 走看看