zoukankan      html  css  js  c++  java
  • .Net中的并行编程-7.基于BlockingCollection实现高性能异步队列

      三年前写过基于ConcurrentQueue的异步队列,今天在整理代码的时候发现当时另外一种实现方式-使用BlockingCollection实现,这种方式目前依然在实际项目中使用。关于BlockingCollection的基本使用请查阅MSDN源码实现

    下面直接上代码:(代码已经放到了我的github上)

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Threading;
    using Danny.Infrastructure.Helper;
    
    namespace Danny.Infrastructure.Collections
    {
        /// <summary>
        /// 一个基于BlockingCollection实现的多线程的处理队列
        /// </summary>
        public class ProcessQueue<T>
        {
            private  BlockingCollection<T> _queue;
            private CancellationTokenSource _cancellationTokenSource;
            private CancellationToken _cancellToken;
            //内部线程池
            private List<Thread> _threadCollection;
    
            //队列是否正在处理数据
            private int _isProcessing;
            //有线程正在处理数据
            private const int Processing = 1;
            //没有线程处理数据
            private const int UnProcessing = 0;
            //队列是否可用
            private volatile bool _enabled = true;
            //内部处理线程数量
            private int _internalThreadCount;
         
            public event Action<T> ProcessItemEvent;
            //处理异常,需要三个参数,当前队列实例,异常,当时处理的数据
            public event Action<dynamic,Exception,T> ProcessExceptionEvent;
    
            public ProcessQueue()
            {
                _queue=new BlockingCollection<T>();
                _cancellationTokenSource = new CancellationTokenSource();
                _internalThreadCount = 1;
                _cancellToken = _cancellationTokenSource.Token;
                _threadCollection = new List<Thread>();
            }
    
            public ProcessQueue(int internalThreadCount):this()
            {
                this._internalThreadCount = internalThreadCount;
            }
    
            /// <summary>
            /// 队列内部元素的数量 
            /// </summary>
            public int GetInternalItemCount()
            {
                return _queue.Count;
            }
    
            public void Enqueue(T items)
            {
                if (items == null)
                {
                    throw new ArgumentException("items");
                }
    
                _queue.Add(items);
                DataAdded();
            }
    
            public void Flush()
            {
                StopProcess();
    
                while (_queue.Count != 0)
                {
                    T item=default(T);
                    if (_queue.TryTake(out item))
                    {
                        try
                        {
                            ProcessItemEvent(item);
                        }
                        catch (Exception ex)
                        {
                            OnProcessException(ex,item);
                        }
                    }
                }
            }
    
            private void DataAdded()
            {
                if (_enabled)
                {
                    if (!IsProcessingItem())
                    {
                        ProcessRangeItem();
                        StartProcess();
                    }
                }
            }
    
            //判断是否队列有线程正在处理 
            private bool IsProcessingItem()
            {
                return !(Interlocked.CompareExchange(ref _isProcessing, Processing, UnProcessing) == UnProcessing);
            }
    
            private void ProcessRangeItem()
            {
                for (int i = 0; i < this._internalThreadCount; i++)
                {
                    ProcessItem();
                }
            }
    
            private void ProcessItem()
            {
                Thread currentThread = new Thread((state) =>
                {
                    T item=default(T);
                    while (_enabled)
                    {
                        try
                        {
                            try
                            {
                                item = _queue.Take(_cancellToken);
                                ProcessItemEvent(item);
                            }
                            catch (OperationCanceledException ex)
                            {
                                DebugHelper.DebugView(ex.ToString());
                            }
    
                        }
                        catch (Exception ex)
                        {
                            OnProcessException(ex,item);
                        }
                    }
    
                });
    
                _threadCollection.Add(currentThread);
            }
    
            private void StartProcess()
            {
                foreach (var thread in _threadCollection)
                {
                    thread.Start();
                }
            }
    
            private void StopProcess()
            {
                this._enabled = false;
                foreach (var thread in _threadCollection)
                {
                    if (thread.IsAlive)
                    {
                        thread.Join();
                    }
                }
                _threadCollection.Clear();
            }
    
            private void OnProcessException(Exception ex,T item)
            {
                var tempException = ProcessExceptionEvent;
                Interlocked.CompareExchange(ref ProcessExceptionEvent, null, null);
    
                if (tempException != null)
                {
                    ProcessExceptionEvent(this,ex,item);
                }
            }
    
        }
    }

    使用方法:

    class Program
        {
            static void Main(string[] args)
            {
                ProcessQueue<int> processQueue = new ProcessQueue<int>();
                processQueue.ProcessExceptionEvent += ProcessQueue_ProcessExceptionEvent;
                processQueue.ProcessItemEvent += ProcessQueue_ProcessItemEvent;
    
                processQueue.Enqueue(1);
                processQueue.Enqueue(2);
                processQueue.Enqueue(3);
    
            }
    
            /// <summary>
            /// 该方法对入队的每个元素进行处理
            /// </summary>
            /// <param name="value"></param>
            private static void ProcessQueue_ProcessItemEvent(int value)
            {
                Console.WriteLine(value);
            }
    
            /// <summary>
            ///  处理异常
            /// </summary>
            /// <param name="obj">队列实例</param>
            /// <param name="ex">异常对象</param>
            /// <param name="value">出错的数据</param>
            private static void ProcessQueue_ProcessExceptionEvent(dynamic obj, Exception ex, int value)
            {
                Console.WriteLine(ex.ToString());
            }
        }
  • 相关阅读:
    Linux 内核开发—内核简单介绍
    strcmp函数和strcpy函数
    POJ 3734
    怎样使用SetTimer MFC 够具体
    java 递归函数
    海量数据存储
    使用WinINet和WinHTTP实现Http訪问
    getline函数
    UDP编程
    数据文件传输通道技术解决方式
  • 原文地址:https://www.cnblogs.com/zw369/p/6675251.html
Copyright © 2011-2022 走看看