zoukankan      html  css  js  c++  java
  • c# 异步任务队列(可选是否使用单线程执行任务,以及自动取消任务)

    
    

    使用demo,(.net framework 4.0 自行添加async wait 扩展库)

        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("主线程"+Thread.CurrentThread.ManagedThreadId);
                var asyncTaskQueue = new AsyncTaskQueue
                {
                    AutoCancelPreviousTask = true, // 自动取消之前的任务
                    UseSingleThread = true // 使用单线程执行任务
                };
    
                // 快速启动20个任务
                for (var i = 1; i < 20; i++)
                {
                    Test(asyncTaskQueue, i);
                }
                Console.WriteLine("运行结束");
                Console.ReadKey();
            }
    
            public static async void Test(AsyncTaskQueue taskQueue, int num)
            {
                var result = await taskQueue.Run(() =>
                {
                    // 长时间耗时任务
                    Thread.Sleep(5000);
                    Console.WriteLine("输入的是" + num);
                    return num * 100;
                });
                Console.WriteLine("当前线程" + Thread.CurrentThread.ManagedThreadId + "输出的的" + result);
            }
        }

    这里是实现代码

    #region summary
    
    //   ------------------------------------------------------------------------------------------------
    //   <copyright file="AsyncTaskQueue.cs" >
    //     作者:mokeyish
    //   </copyright>
    //   ------------------------------------------------------------------------------------------------
    
    #endregion
    
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Security.Permissions;
    using System.Runtime.CompilerServices;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace Test
    {
        /// <summary>
        /// 异步任务队列
        /// </summary>
        public class AsyncTaskQueue : IDisposable
        {
            private bool _isDisposed;
            private readonly ConcurrentQueue<AwaitableTask> _queue = new ConcurrentQueue<AwaitableTask>();
            private Thread _thread;
            private AutoResetEvent _autoResetEvent;
    
            /// <summary>
            /// 异步任务队列
            /// </summary>
            public AsyncTaskQueue()
            {
                _autoResetEvent = new AutoResetEvent(false);
                _thread = new Thread(InternalRuning) {IsBackground = true};
                _thread.Start();
            }
    
            private bool TryGetNextTask(out AwaitableTask task)
            {
                task = null;
                while (_queue.Count > 0)
                {
                    if (_queue.TryDequeue(out task) && (!AutoCancelPreviousTask || _queue.Count == 0)) return true;
                    task.Cancel();
                }
                return false;
            }
    
            private AwaitableTask PenddingTask(AwaitableTask task)
            {
                lock (_queue)
                {
                    Debug.Assert(task != null);
                    _queue.Enqueue(task);
                    _autoResetEvent.Set();
                }
                return task;
            }
    
            private void InternalRuning()
            {
                while (!_isDisposed)
                {
                    if (_queue.Count == 0)
                    {
                        _autoResetEvent.WaitOne();
                    }
                    while (TryGetNextTask(out var task))
                    {
                        if (task.IsCancel) continue;
    
                        if (UseSingleThread)
                        {
                            task.RunSynchronously();
                        }
                        else
                        {
                            task.Start();
                        }
                    }
                }
            }
    
            /// <summary>
            /// 是否使用单线程完成任务.
            /// </summary>
            public bool UseSingleThread { get; set; } = true;
    
            /// <summary>
            /// 自动取消以前的任务。
            /// </summary>
            public bool AutoCancelPreviousTask { get; set; } = false;
    
            /// <summary>
            /// 执行任务
            /// </summary>
            /// <param name="action"></param>
            /// <returns></returns>
            public AwaitableTask Run(Action action)
                => PenddingTask(new AwaitableTask(new Task(action, new CancellationToken(false))));
    
            /// <summary>
            /// 执行任务
            /// </summary>
            /// <typeparam name="TResult"></typeparam>
            /// <param name="function"></param>
            /// <returns></returns>
            public AwaitableTask<TResult> Run<TResult>(Func<TResult> function)
                => (AwaitableTask<TResult>) PenddingTask(new AwaitableTask<TResult>(new Task<TResult>(function)));
    
    
            /// <inheritdoc />
            public void Dispose()
            {
                Dispose(true);
                GC.SuppressFinalize(this);
            }
            
            /// <summary>
            /// 析构任务队列
            /// </summary>
            ~AsyncTaskQueue() => Dispose(false);
    
            private void Dispose(bool disposing)
            {
                if (_isDisposed) return;
                if (disposing)
                {
                    _autoResetEvent.Dispose();
                }
                _thread = null;
                _autoResetEvent = null;
                _isDisposed = true;
            }
    
            /// <summary>
            /// 可等待的任务
            /// </summary>
            public class AwaitableTask
            {
                private readonly Task _task;
    
                /// <summary>
                /// 初始化可等待的任务。
                /// </summary>
                /// <param name="task"></param>
                public AwaitableTask(Task task) => _task = task;
    
                /// <summary>
                /// 任务的Id
                /// </summary>
                public int TaskId => _task.Id;
    
                /// <summary>
                /// 任务是否取消
                /// </summary>
                public bool IsCancel { get; private set; }
    
                /// <summary>
                /// 开始任务
                /// </summary>
                public void Start() => _task.Start();
    
                /// <summary>
                /// 同步执行开始任务
                /// </summary>
                public void RunSynchronously() => _task.RunSynchronously();
    
                /// <summary>
                /// 取消任务
                /// </summary>
                public void Cancel() => IsCancel = true;
    
                /// <summary>
                /// 获取任务等待器
                /// </summary>
                /// <returns></returns>
                public TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
    
                /// <summary>Provides an object that waits for the completion of an asynchronous task. </summary>
                [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
                public struct TaskAwaiter : INotifyCompletion
                {
                    private readonly AwaitableTask _task;
    
                    /// <summary>
                    /// 任务等待器
                    /// </summary>
                    /// <param name="awaitableTask"></param>
                    public TaskAwaiter(AwaitableTask awaitableTask) => _task = awaitableTask;
    
                    /// <summary>
                    /// 任务是否完成.
                    /// </summary>
                    public bool IsCompleted => _task._task.IsCompleted;
    
                    /// <inheritdoc />
                    public void OnCompleted(Action continuation)
                    {
                        var This = this;
                        _task._task.ContinueWith(t =>
                        {
                            if (!This._task.IsCancel) continuation?.Invoke();
                        });
                    }
                    /// <summary>
                    /// 获取任务结果
                    /// </summary>
                    public void GetResult() => _task._task.Wait();
                }
            }
    
            /// <summary>
            /// 可等待的任务
            /// </summary>
            /// <typeparam name="TResult"></typeparam>
            public class AwaitableTask<TResult> : AwaitableTask
            {
                /// <summary>
                /// 初始化可等待的任务
                /// </summary>
                /// <param name="task">需要执行的任务</param>
                public AwaitableTask(Task<TResult> task) : base(task) => _task = task;
    
    
                private readonly Task<TResult> _task;
    
                /// <summary>
                /// 获取任务等待器
                /// </summary>
                /// <returns></returns>
                public new TaskAwaiter GetAwaiter() => new TaskAwaiter(this);
    
                /// <summary>
                /// 任务等待器
                /// </summary>
                [HostProtection(SecurityAction.LinkDemand, ExternalThreading = true, Synchronization = true)]
                public new struct TaskAwaiter : INotifyCompletion
                {
                    private readonly AwaitableTask<TResult> _task;
    
                    /// <summary>
                    /// 初始化任务等待器
                    /// </summary>
                    /// <param name="awaitableTask"></param>
                    public TaskAwaiter(AwaitableTask<TResult> awaitableTask) => _task = awaitableTask;
    
                    /// <summary>
                    /// 任务是否已完成。
                    /// </summary>
                    public bool IsCompleted => _task._task.IsCompleted;
    
                    /// <inheritdoc />
                    public void OnCompleted(Action continuation)
                    {
                        var This = this;
                        _task._task.ContinueWith(t =>
                        {
                            if (!This._task.IsCancel) continuation?.Invoke();
                        });
                    }
    
                    /// <summary>
                    /// 获取任务结果。
                    /// </summary>
                    /// <returns></returns>
                    public TResult GetResult() => _task._task.Result;
                }
            }
        }
    }
    View Code
  • 相关阅读:
    六度分离_hdu_1869(floyd算法).java
    MFC版的Hello World
    Google Protobuf的安装、配置、以及简单demo编译
    在Android工程中加入AIDL文件时,gen目录生成的文件报错-问题解决
    SharePoint 2013的100个新功能之网站管理(二)
    SharePoint 2013的100个新功能之网站管理(一)
    SharePoint 2013的100个新功能之社交
    SharePoint 2013的100个新功能之内容管理(四)
    SharePoint 2010 Ribbon with wrong style in Chrome and Safari
    SharePoint 2013的100个新功能之内容管理(三)
  • 原文地址:https://www.cnblogs.com/mokeyish/p/6986560.html
Copyright © 2011-2022 走看看