zoukankan      html  css  js  c++  java
  • 基于异步队列的生产者消费者C#并发设计

    继上文<<基于阻塞队列的生产者消费者C#并发设计>>的并发队列版本的并发设计,原文code是基于<<.Net中的并行编程-4.实现高性能异步队列>>修改过来的,前面的几篇文章也详细介绍了并发实现的其它方案及实现。直接给code:

    public class MyAsyncQueue<T>
        {
            //队列是否正在处理数据
            private int isProcessing;
            //有线程正在处理数据
            private const int Processing = 1;
            //没有线程处理数据
            private const int UnProcessing = 0;
            //队列是否可用 单线程下用while来判断,多线程下用if来判断,随后用while来循环队列的数量
            private volatile bool enabled = true;
            // 消费者线程
            private Task currentTask;
            // 消费者线程处理事件
            public event Action<T> ProcessItemFunction;
            //
            public event EventHandler<EventArgs<Exception>> ProcessException;
            // 并发队列
            private ConcurrentQueue<T> queue;
            // 消费者的数量
            private int _internalTaskCount;
            // 存储消费者队列
            List<Task> tasks = new List<Task>();
    
            public MyAsyncQueue()
            {
                _internalTaskCount = 3;
                queue = new ConcurrentQueue<T>();
                Start();
            }
    
            public int Count
            {
                get
                {
                    return queue.Count;
                }
            }
            // 开启监听线程
            private void Start()
            {
                Thread process_Thread = new Thread(PorcessItem);
                process_Thread.IsBackground = true;
                process_Thread.Start();
            }
    
            // 生产者生产
            public void Enqueue(T items)
            {
                if (items == null)
                {
                    throw new ArgumentException("items");
                }
    
                queue.Enqueue(items);
                DataAdded();
            }
    
            //数据添加完成后通知消费者线程处理
            private void DataAdded()
            {
                if (enabled)
                {
                    if (!IsProcessingItem())
                    {
                        // 开启消费者消费队列
                        ProcessRangeItem();
                    }
                }
            }
    
            //判断是否队列有线程正在处理 
            private bool IsProcessingItem()
            {
                return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0);
            }
    
            private void ProcessRangeItem()
            {
                for(int i=0; i< _internalTaskCount; i++)
                {
                    currentTask = Task.Factory.StartNew(() => ProcessItemLoop());
                    tasks.Add(currentTask);
                }
            }
            // 消费者处理事件
            private void ProcessItemLoop()
            {
                Console.WriteLine("正在执行的Task的Id: {0}", Task.CurrentId);
                // 队列为空,并且队列不可用
                if (!enabled && queue.IsEmpty)
                {
                    Interlocked.Exchange(ref isProcessing, 0);
                    return;
                }
                //处理的线程数 是否小于当前最大任务数
                //if (Thread.VolatileRead(ref runingCore) <= this.MaxTaskCount)
                //{
                T publishFrame;
    
                while(enabled)
                {
                    if (queue.TryDequeue(out publishFrame))
                    {
                        try
                        {
                            // 消费者处理事件
                            ProcessItemFunction(publishFrame);
                        }
                        catch (Exception ex)
                        {
                            OnProcessException(ex);
                        }
                    }
                    else
                    {
                        Console.WriteLine("线程Id{0}取队列失败,跳出循环", Task.CurrentId);
                        break;
                    }
                }
            }
    
            /// <summary>
            ///定时处理线程调用函数  
            ///主要是监视入队的时候线程 没有来的及处理的情况
            /// </summary>
            private void PorcessItem(object state)
            {
                int sleepCount = 0;
                int sleepTime = 1000;
                while (enabled)
                {
                    //如果队列为空则根据循环的次数确定睡眠的时间
                    if (queue.IsEmpty)
                    {
                        // Task消费者消费完了队列中的数据....注销掉消费者线程
                        if(tasks.Count==_internalTaskCount)
                        {
                            Flush();
                        }
                        if (sleepCount == 0)
                        {
                            sleepTime = 1000;
                        }
                        else if (sleepCount <= 3)
                        {
                            sleepTime = 1000 * 3;
                        }
                        else
                        {
                            sleepTime = 1000 * 50;
                        }
                        sleepCount++;
                        Thread.Sleep(sleepTime);
                    }
                    else
                    {
                        //判断是否队列有线程正在处理 
                        if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0)
                        {
                            if (!queue.IsEmpty)
                            {
                                currentTask = Task.Factory.StartNew(ProcessItemLoop);
                                tasks.Add(currentTask);
                            }
                            else
                            {
                                //队列为空,已经取完了
                                Interlocked.Exchange(ref isProcessing, 0);
                            }
                            sleepCount = 0;
                            sleepTime = 1000;
                        }
                    }
                }
            }
    
            //更新并关闭消费者
            public void Flush()
            {
                Stop();
                foreach(var t in tasks)
                {
                    if (t != null)
                    {
                        t.Wait();
                        Console.WriteLine("Task已经完成");
                    }
                }
    
                // 消费者未消费完
                while (!queue.IsEmpty)
                {
                    try
                    {
                        T publishFrame;
                        if (queue.TryDequeue(out publishFrame))
                        {
                            ProcessItemFunction(publishFrame);
                        }
                    }
                    catch (Exception ex)
                    {
                        OnProcessException(ex);
                    }
                }
                currentTask = null;
                tasks.Clear();
            }
    
            public void Stop()
            {
                this.enabled = false;
            }
    
            private void OnProcessException(System.Exception ex)
            {
                var tempException = ProcessException;
                Interlocked.CompareExchange(ref ProcessException, null, null);
    
                if (tempException != null)
                {
                    ProcessException(ex, new EventArgs<Exception>(ex));
                }
            }
    }

    调用code:

    class ComInfo
        {
            public int ComId { get; set; }
    
            public DateTime Date { get; set; }
        }
        class Program
        {
            static MyAsyncQueue<ComInfo> queue = new MyAsyncQueue<ComInfo>();
            static void Main(string[] args)
            {
                Console.WriteLine("开始======");
                queue.ProcessItemFunction += A;
                queue.ProcessException += C; //new EventHandler<EventArgs<Exception>>(C);
    
                ComInfo info = new ComInfo();
    
                for (int i = 1; i < 50; i++)
                {
                    Task.Factory.StartNew((param) =>
                    {
                        info = new ComInfo();
                        info.ComId = int.Parse(param.ToString());
                        info.Date = DateTime.Now.Date;
                        queue.Enqueue(info);
                    }, i);
                }
    
                Console.WriteLine("结束======");
                
                Console.ReadKey();
            }
    
            static void A(ComInfo info)
            {
                Console.WriteLine(info.ComId + "====" + queue.Count);
            }
    
            static void C(object ex, EventArgs<Exception> args)
            {
                Console.WriteLine("出错了");
            }
        }

    并发系列应该就这样完了,回头整理成目录,自己查起来也方便

  • 相关阅读:
    C#的System.Diagnostics.Trace.WriteLine 写入到文件中案例
    ubuntu开放指定端口
    mysql 启报错报 The server quit without updating PID file
    【WebMisCentral WMC】基于Extjs 4.2x的企业级用户授权认证中心系统(SSO+AM+SM),多租户SAAS应用
    Ajaxupload.js在最新版chrome 83版浏览器oncomplete失效问题解决方法
    SQLServer 父子结构group汇总显示
    jqweui 关于$(document.body).infinite的bug
    EntityFramework 动态构造排序 Func<IQueryable<T>, IOrderedQueryable<T>> Dynamic
    Safari 3D transform变换z-index层级渲染异常的研究
    Asp.net Core中使用NLog,并封装成公共的日志方法
  • 原文地址:https://www.cnblogs.com/zhiyong-ITNote/p/8315621.html
Copyright © 2011-2022 走看看