zoukankan      html  css  js  c++  java
  • 生产消费者队列(TaskCompletionSource)的应用

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace ConsoleApplication6
    {
        public class PCQueue : IDisposable
        {
            class WorkItem
            {
                public readonly TaskCompletionSource<object> TaskSource;
                public readonly Action Action;
                public readonly CancellationToken? CancelToken;
    
                public WorkItem(
                  TaskCompletionSource<object> taskSource,
                  Action action,
                  CancellationToken? cancelToken)
                {
                    
                    TaskSource = taskSource;
                    Action = action;
                    CancelToken = cancelToken;
                }
            }
    
            BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();
    
            /// <summary>
            /// 为每个消费者创建并启动单独的任务: 这里我启动了2个任务,用于打印easy和easy2这两个的顺序不一定
            /// </summary>
            /// <param name="workerCount"></param>
            public PCQueue(int workerCount)
            {
                // 为每个消费者创建并启动单独的任务:
                for (int i = 0; i < workerCount; i++)
                    Task.Factory.StartNew(Consume);
            }
    
            public void Dispose() { _taskQ.CompleteAdding(); }
    
            /// <summary>
            /// 默认任务取消标识为null的任务进队方法
            /// </summary>
            /// <param name="action"></param>
            /// <returns></returns>
            public Task EnqueueTask(Action action)
            {
                return EnqueueTask(action, null);
            }
            /// <summary>
            /// 任务进队方法含标识
            /// </summary>
            /// <param name="action"></param>
            /// <param name="cancelToken"></param>
            /// <returns></returns>
            public Task EnqueueTask(Action action, CancellationToken? cancelToken)
            {
                var tcs = new TaskCompletionSource<object>();
                _taskQ.Add(new WorkItem(tcs, action, cancelToken));
                //通过TaskCompletionSource返回任务本身,可查看任务的响应信息如result,exception,status等等
                return tcs.Task;
            }
    
            void Consume()
            {
                foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
                    if (workItem.CancelToken.HasValue &&
                        workItem.CancelToken.Value.IsCancellationRequested)
                    {
                        workItem.TaskSource.SetCanceled();
                    }
                    else
                        try
                        {
                            workItem.Action();
                            workItem.TaskSource.SetResult(1234);   // 表示完成
                        }
                        catch (OperationCanceledException ex)
                        {
                            if (ex.CancellationToken == workItem.CancelToken)
                                workItem.TaskSource.SetCanceled();
                            else
                                workItem.TaskSource.SetException(ex);
                        }
                        catch (Exception ex)
                        {
                            workItem.TaskSource.SetException(ex);
                        }
            }
        }
        class Program
        {
            static void Main(string[] args)
            {
                CancellationToken token1 = new CancellationToken(true);
                //
                var pcQ = new PCQueue(2);
                Task task = pcQ.EnqueueTask(() => Console.WriteLine("Easy!"),token1);//输出easy的任务不会执行
                task = pcQ.EnqueueTask(() => Console.WriteLine("Easy2!"));//会执行
             
                // ...
                Console.Read();
            }
        }
    }
  • 相关阅读:
    Java异常处理
    冒泡排序法
    21个项目-MNIST机器学习入门
    Hadoop集群搭建中ssh免密登录
    利用奇异值分解简化数据
    数据集中空值替换成对应特征的平均值
    PCA降维处理
    使用FP-growth算法高效发现频繁项集
    原生js---ajax---post方法传数据
    原生js---ajax---get方法传数据
  • 原文地址:https://www.cnblogs.com/kexb/p/6793154.html
Copyright © 2011-2022 走看看