zoukankan      html  css  js  c++  java
  • 线程队列

    在web应用中,单个进程或者机器的响应速度有限,类似大量数据导入导出的操作的数量如果不加限制,会导致服务器cpu被吃满,导致其他一些很简单的请求无法及时响应的问题。针对这个限制提出了如下要求。
    1. 先到达的请求先执行: 先入先出原则
    2. 只能同时执行若干请求:避免cpu被吃满
    3. 异步执行:如果长时间执行会长期占用iis的工作线程

    基于上述的要求我设计了一个队列。这个队列我们需要稍微提一个组件,ParallelExtensionsExtras

    这是微软提供的一个线程的扩展,具体的自行搜索下相关资料,这里开始的时候我并没有用这个组件,而是自己对task的封装,但是实际上task还是利用的线程池,线程池默认的线程数10个,并不能满足某些场景对多个现成的要求,于是在漫长的搜索过程中才发现了这个组建。

    这里我主要用到两个对TaskScheduler的扩展

    QueuedTaskScheduler:对task进行排队执行,执行时在Thread环境中,并且可以控制线程的数量,类似自定义的线程池。

    ThreadPerTaskScheduler: 顾名思义就是在线程中执行每个task。

    针对两种区别在于,QueuedTaskScheduler 的线程是可复用的,在线程数量固定的情况下推荐使用。

    ThreadPerTaskScheduler 只创建线程而不进行销毁,每次执行一个task都是使用一个new一个thread执行。适用于在某些线程数量动态变化的情况。

    下面是实现代码:

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Configuration;
    using System.Diagnostics;
    using System.Linq;
    using System.Runtime.CompilerServices;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Schedulers;
    using Coralcode.Framework.Data;
    using Coralcode.Framework.Extensions;
    using ThreadState = System.Threading.ThreadState;
    
    namespace Coralcode.Framework.Task
    {
        /// <summary>
        /// 线程队列
        /// 注意如果使用ScheduleType.Defaut 存在线程池数量限制
        /// </summary>
        public class TaskQueue
        {
            public static readonly TaskQueue Instance = new TaskQueue(1, ScheduleType.Thread);
    
            /// <summary>
            /// 获取队列
            /// </summary>
            /// <param name="maxCount">最大并发数</param>
            /// <param name="type">执行计划</param>
            /// <returns></returns>
            public static TaskQueue GetQueue(int maxCount,ScheduleType type=ScheduleType.Thread)
            {
                return new TaskQueue(maxCount, type);
            }
    
            /// <summary>
            /// 获取队列
            /// </summary>
            /// <param name="maxCount">最大并发数</param>
            /// <param name="name">队列名称</param>
            /// <param name="type">执行计划</param>
            /// <returns></returns>
            public static TaskQueue GetQueue(int maxCount, string name, ScheduleType type = ScheduleType.Thread)
            {
                return new TaskQueue(maxCount, name, type);
            }
    
            /// <summary>
            /// 获取队列
            /// </summary>
            /// <param name="maxCount">最大并发数</param>
            /// <param name="name">队列名称</param>
            /// <param name="token">取消令牌</param>
            /// <param name="type">执行计划</param>
            public static TaskQueue GetQueue(int maxCount, string name, CancellationToken token, ScheduleType type = ScheduleType.Thread)
            {
                return new TaskQueue(maxCount, name, token, type);
            }
    
            /// <summary>
            /// 获取队列
            /// </summary>
            /// <param name="maxCount">最大并发数</param>
            /// <param name="token">取消令牌</param>
            /// <param name="type">执行计划</param>
            public static TaskQueue GetQueue(int maxCount, CancellationToken token, ScheduleType type = ScheduleType.Thread)
            {
                return new TaskQueue(maxCount, IdentityGenerator.NewGuidString(), token, type);
            }
    
            private ConcurrentQueue<System.Threading.Tasks.Task> _tasks;
            private readonly int _limitedTaskCount;
            private int _runningTaskCount;
            private Thread _mainExcuteThread;
            private CancellationToken _token;
            private TaskScheduler _scheduler;
    
            private TaskQueue(int maxCount, ScheduleType type) :
                this(maxCount, IdentityGenerator.NewGuidString(), type)
            {
            }
    
            private TaskQueue(int maxCount, string name, ScheduleType type) :
                this(maxCount, name, CancellationToken.None, type)
    
            {
            }
    
            private TaskQueue(int maxCount, string name, CancellationToken token, ScheduleType type)
            {
                _limitedTaskCount = maxCount;
                _tasks = new ConcurrentQueue<System.Threading.Tasks.Task>();
                Name = name;
                _token = token;
                switch (type)
                {
                    case ScheduleType.Default:
                        _scheduler = new QueuedTaskScheduler(maxCount);
                        break;
                    case ScheduleType.Thread:
                        _scheduler = new ThreadPerTaskScheduler();
                        break;
                    default:
                        throw new ArgumentOutOfRangeException(nameof(type), type, null);
                }
    
            }
    
            /// <summary>
            /// 执行不带返回结果的方法
            /// </summary>
            /// <param name="func"></param>
            /// <returns></returns>
            public System.Threading.Tasks.Task Execute(Action func)
            {
                var task = new System.Threading.Tasks.Task(func, _token);
                _tasks.Enqueue(task);
                if (_mainExcuteThread == null || _mainExcuteThread.ThreadState.HasFlag(ThreadState.Stopped))
                {
                    _mainExcuteThread?.DisableComObjectEagerCleanup();
                    _mainExcuteThread = new Thread(NotifyThreadPendingWork);
                    _mainExcuteThread.Start();
                }
                return task;
            }
    
            /// <summary>
            /// 执行带返回结果的方法
            /// </summary>
            /// <typeparam name="TResult"></typeparam>
            /// <param name="func"></param>
            /// <returns></returns>
    
            [MethodImpl(MethodImplOptions.Synchronized)]
            public Task<TResult> Execute<TResult>(Func<TResult> func)
            {
                var task = new Task<TResult>(func, _token);
                _tasks.Enqueue(task);
                if (_mainExcuteThread == null || _mainExcuteThread.ThreadState.HasFlag(ThreadState.Stopped))
                {
                    _mainExcuteThread?.DisableComObjectEagerCleanup();
                    _mainExcuteThread = new Thread(NotifyThreadPendingWork);
                    _mainExcuteThread.Priority = ThreadPriority.Highest;
                    _mainExcuteThread.Start();
                }
                return task;
            }
    
            private void NotifyThreadPendingWork()
            {
                try
                {
                    while (true)
                    {
                        if (_token.IsCancellationRequested)
                        {
                            _tasks = new ConcurrentQueue<System.Threading.Tasks.Task>();
                            break;
                        }
    
                        System.Threading.Tasks.Task task;
                        if (!_tasks.TryDequeue(out task))
                            break;
    
                        task.Start(_scheduler);
                        Interlocked.Increment(ref _runningTaskCount);
                        task.ContinueWith(item =>
                        {
                            Interlocked.Decrement(ref _runningTaskCount);
                        });
                        //Debug.WriteLine("队列{0},允许执行 {1} 条,等待线程为 {2} ,执行中 {3} 条,时间为 {4} ", _name, _limitedTaskCount, _tasks.Count, _runningTaskCount, DateTime.Now);
                        while (_runningTaskCount >= _limitedTaskCount)
                        {
                            Thread.Sleep(500);
                        }
                    }
                }
                finally
                {
                    _runningTaskCount = 0;
                }
            }
    
            /// <summary>
            /// 线程队列的名字
            /// </summary>
            public string Name { get; }
    
            /// <summary>
            /// 根据返回的task的id获取到当前task排队的位置
            /// </summary>
            /// <param name="taskId"></param>
            /// <returns>返回-1表示正在执行,或者task没有加进去,返回大于等于0则表示其顺序</returns>
            public int GetCurrentTaskIndex(int taskId)
            {
                lock (_tasks)
                {
                    return _tasks.IndexOf(item => item.Id == taskId);
                }
            }
    
            /// <summary>
            /// 等待执行的线程数量
            /// </summary>
            public int WaitingTaskCount => _tasks.Count;
            /// <summary>
            /// 正在执行的线程数量
            /// </summary>
            public int RunningTaskCount => _runningTaskCount;
            /// <summary>
            /// 并发数
            /// </summary>
            public int LimitedTaskCount => _limitedTaskCount;
        }
    
        /// <summary>
        /// 
        /// </summary>
        public enum ScheduleType {
            /// <summary>
            /// 线程池,默认方式
            /// </summary>
            Default,
            /// <summary>
            /// 自定义线程,ThreadPerTaskScheduler
            /// </summary>
            Thread
        }
        
    }
    

      

    重点部分说明
    1. ConcurrentQueue 本身在处理多线程环境所以采用线程安全的队列。
    2. ContinueWith 在任务执行完毕之后需要对执行的数量-1。
    3. NotifyThreadPendingWork 这里就是启动另外一个主线程来对任务进行分发。
    4. 主线程分发来满足异步的需求。
    5. CancellationToken 提供取消执行的功能。
    6. 可以采用默认也可以自己实例化,而默认是一个线程,即考虑常用情况,也提供扩展的功能。

    测试代码


    写出来这个队列的部分可能只用了1小时,但是写测试代码和调试用了差不多半天时间才搞定。

    线程的测试在单元测试中一直是难以控制的,在这个case中多个线程并发的情况下,实时获取排队数量,执行中数量也是个很难测试的部分。

    下面是测试代码:

    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using Coralcode.Framework.Task;
    using Coralcode.Framework.Utils;
    using iTextSharp.text;
    using Microsoft.VisualStudio.TestTools.UnitTesting;
    
    namespace FrameworkTest.Task
    {
        [TestClass]
        public class TaskQueueTest
        {
            [TestMethod]
            public void TwoTimesExcuteWithFreeBetweenTwoTimsTest()
            {
                var wantExecuteTaskCount = 10;
                var maxCanExcuteTaskCount = 1;
                var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
                var addList = new List<int>();
                var executedList = new List<int>();
                for (int i = 0; i < wantExecuteTaskCount; i++)
                {
                    addList.Add(i);
                }
                /*
                 * 测试原理:
                 * 添加一批任务,等待执行完成以后在执行一次
                 * 第二次也能执行完毕,无报错的话认为,
                 * 队列在执行完毕后自动暂停,等有新任务进来的时候可以重新启动
                 */
                addList.ForEach(index =>
                {
                    var list = executedList;
                    queue.Execute(() =>
                    {
                        var item = index / maxCanExcuteTaskCount;
                        list.Add(item);
                        return true;
                    });
                });
                while (addList.Count != executedList.Count)
                {
                    Thread.Sleep(1000);
    
                }
                Assert.AreEqual(queue.WaitingTaskCount, 0);
                executedList.Clear();
                for (int i = 0; i < wantExecuteTaskCount; i++)
                {
                    addList.Add(i);
                }
                addList.ForEach(index =>
                {
                    var list = executedList;
                    queue.Execute(() =>
                    {
                        var item = index / maxCanExcuteTaskCount;
                        list.Add(item);
                        return true;
                    });
                });
                while (addList.Count != executedList.Count)
                {
                    Thread.Sleep(1000);
                }
                Assert.AreEqual(queue.WaitingTaskCount, 0);
            }
    
    
    
    
            [TestMethod]
            public void ExecuteAsAddSequenceTest()
            {
                var wantExecuteTaskCount = 10;
                var maxCanExcuteTaskCount = 1;
                var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
                var addList = new List<int>();
                var executedList = new List<int>();
                for (int i = 0; i < wantExecuteTaskCount; i++)
                {
                    addList.Add(i);
                }
    
    
                /*
                 * 测试原理:
                 * 按需添加,然后记录执行,结果也必须是顺序的
                 * 如果一次执行多条,那么多条被认为同一个批次
                 * 同一个批次的顺序应该是一致的
                 */
                addList.ForEach(index =>
                {
                    var list = executedList;
                    queue.Execute(() =>
                    {
                        var item = index / maxCanExcuteTaskCount;
                        list.Add(item);
                        return true;
                    });
                });
    
                while (addList.Count != executedList.Count)
                {
                    Thread.Sleep(1000);
    
                }
                addList = addList.Select(item => item / maxCanExcuteTaskCount).Distinct().ToList();
                executedList = executedList.Distinct().ToList();
                for (int i = 0; i < addList.Count; i++)
                {
                    Assert.AreEqual(addList[i], executedList[i]);
                }
            }
    
    
    
            [TestMethod]
            public void WaitCountTest()
            {
                var wantExecuteTaskCount = 10;
                var maxCanExecuteTaskCount = 2;
                var queue = TaskQueue.GetQueue(maxCanExecuteTaskCount);
                var executedOrExecutingCount = 0;
    
                /*
                 * 测试原理:
                 * 记录执行了的数量
                 * 队列中等待的数量 =最开始放入的数量-队列运行执行的最大线程数-已经执行的数量
                 * 当所有的线程都在被执行时,队列中等待数量为0,但是还未执行完的话,直接用0来判断
                 */
                var taskList = new List<System.Threading.Tasks.Task>();
                for (int i = 0; i < wantExecuteTaskCount; i++)
                {
                    var task = queue.Execute(() =>
                    {
                        Thread.Sleep(2000);
                        Debug.WriteLine("已执行的数量" + executedOrExecutingCount);
                        Interlocked.Increment(ref executedOrExecutingCount);
                    });
                    taskList.Add(task);
                }
                // ReSharper disable once LoopVariableIsNeverChangedInsideLoop
                while (executedOrExecutingCount < wantExecuteTaskCount)
                {
                    Thread.Sleep(700);
                    var queueWaitCount = queue.WaitingTaskCount;
                    var wantWaitCount = wantExecuteTaskCount - maxCanExecuteTaskCount - executedOrExecutingCount;
                    Debug.WriteLine("************");
                    Debug.WriteLine("队列等待执行的数量" + queueWaitCount);
                    Debug.WriteLine("队列期望等待执行的数量" + wantWaitCount);
                    Debug.WriteLine("************");
                    if (wantWaitCount < 0)
                        Assert.AreEqual(queueWaitCount, 0);
                    else
                        Assert.AreEqual(queueWaitCount, wantWaitCount);
                }
            }
    
            [TestMethod]
            public void WaitTest()
            {
    
                //System.Threading.Tasks.Task.Factory.StartNew(() =>
                //{
                //    try
                //    {
                //        var str = HttpUtil.Get<string>("http://172.16.2.3:10004",
                //        "api/Acl/PermissionAuthorityApi/GetPermissionCountByApp?app=1");
                //    }
                //    catch (Exception e)
                //    {
                //        Console.WriteLine(e);
                //        throw;
                //    }
    
                //});
                var wantExecuteTaskCount = 10;
                var maxCanExcuteTaskCount = 1;
                var queue = TaskQueue.GetQueue(maxCanExcuteTaskCount);
                var addList = new List<int>();
                var executedList = new List<int>();
                for (int i = 0; i < wantExecuteTaskCount; i++)
                {
                    addList.Add(i);
                }
    
                var tasks = new List<System.Threading.Tasks.Task>();
                /*
                 * 测试原理:
                 * 执行异步请求看最后是否成功执行
                 */
                addList.ForEach(index =>
                {
                    var list = executedList;
                    var task = queue.Execute(() =>
                    {
                        try
                        {
                            var str = HttpUtil.Get<string>("http://172.16.2.3:10004",
                                "api/Acl/PermissionAuthorityApi/GetPermissionCountByApp?app=1");
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e);
                        }
                        var item = index / maxCanExcuteTaskCount;
                        list.Add(item);
                    });
                    tasks.Add(task);
                });
                System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
    
                //System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
                addList = addList.Select(item => item / maxCanExcuteTaskCount).Distinct().ToList();
                executedList = executedList.Distinct().ToList();
                for (int i = 0; i < addList.Count; i++)
                {
                    Assert.AreEqual(addList[i], executedList[i]);
                }
            }
    
            [TestMethod]
            public void ExceptionTestTest()
            {
                var wantExecuteTaskCount = 10;
                var maxCanExecuteTaskCount = 2;
                var queue = TaskQueue.GetQueue(maxCanExecuteTaskCount);
                var executedOrExecutingCount = 0;
    
                /*
                 * 发现部分情况下队列里面的异常检测不到
                 * 这里做了测试
                 */
    
                queue.Execute(() =>
                {
                    try
                    {
                        throw new Exception("测试异常");
                    }
                    catch (Exception ex)
                    {
                        Assert.IsNotNull(ex);
                    }
                });
    
            }
    
    
            [TestMethod]
            public void GetCurrentTaskIndexTeset()
            {
                var wantExecuteTaskCount = 10;
                var maxCount = 10;
                var waitSeconds = 1000;
                var queue = TaskQueue.GetQueue(maxCount);
    
                List< System.Threading.Tasks.Task> executeingTasks = new List<System.Threading.Tasks.Task>();
                /*
                 * 测试方法:
                 * 假设并发为10,那么首先塞进去10个线程执行,
                 * 那么后面的线程都在等待,
                 * 在放进去n个,则后面的正在排队,返回的位置和放进去的位置一致
                 * 而之前放进去的10个都在执行,返回-1
                 */
                for (int i = 0; i < maxCount; i++)
                {
                   var task= queue.Execute(() =>
                    {
                        try
                        {
                            Thread.Sleep(waitSeconds * 1000);
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e);
                        }
                    });
                    executeingTasks.Add(task);
                }
                Dictionary<int,System.Threading.Tasks.Task> tasks = new Dictionary<int, System.Threading.Tasks.Task>();
                for (int i = 0; i < wantExecuteTaskCount; i++)
                {
                    var task = queue.Execute(() =>
                    {
                        try
                        {
                            Thread.Sleep(waitSeconds*1000);
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine(e);
                        }
                    });
                    tasks.Add(i,task);
                }
                foreach (var task in executeingTasks)
                {
                    Assert.AreEqual(queue.GetCurrentTaskIndex(task.Id), -1);
                }
                foreach (var task in tasks)
                {
                    Assert.AreEqual(queue.GetCurrentTaskIndex(task.Value.Id),task.Key);
                }
    
            }
    
        }
    }
    

      


    具体的代码中都有每个测试用例的设计思考。


    注意事项

    注意事项就是我在这里踩过的坑

    Session丢失


    在web应用中session的存储实际以线程为基础,类似ThreadLocal实现的线程隔离和静态使用。
    这里再执行之前需要预先取出来要使用的对象然后再在action/func中使用

    DbContext


    1. 在ef中如果获取一个dbcontext在另外一个线程中保存就会报错。
    2. 如果一个从ef取出来的对象从一个线程传递到另外一个线程,修改提交就会报错。
    3. 如果主线程和队列中线程同时操作例如一个读一个写,或者同时写,此时也会报错


    Ioc的问题

    如果ioc采用prethread或者preresolve这两种方式来管理生命周期,理论上在http请求结束的时候都要对线程中使用的对象进行释放,

    因为队列为异步那么当请求结束时候线程却还在执行,此时就会出现空指针的问题,或者ef中对象已经释放。


    最佳实践

            /// <summary>
            /// 调用示例
            /// 
            /// </summary>
            /// <typeparam name="TService"></typeparam>
            public  static void AsyncExecuteServiceAction<TService>(this TService service,string actionName,params  object[] parameters)
                where TService:CoralService
            {
                TaskQueue.Instance.Execute(() =>
                {
                    using (var newservice = UnityService.Resolve<TService>())
                    {
                        newservice.InitContext(service.AppContext,service.UserContext,service.SessionContext,service.PageContext);
                        newservice.ExecuteMethod<object>(actionName, parameters);
                    }
                });
            }
    

      

    1. service.AppContext,service.UserContext,service.SessionContext,service.PageContext 这是四种不同级别的缓存,理解为session
    2. using 的目的就是为了释放对象
    3. 反射参数的方式只是为了方便调用


    总结
    1. 代码不难,直接拷贝就可以使用,单元测试也是一样
    2. TPL我觉得是在.net技术中很好用的一部分,需要熟悉
    3. 线程中有很多对象,锁等问题,这个要随着经验不断的挖坑填坑来增长经验


    展望

    1. 前面说过两种线程计划,我这里放入prethread是在某个工具中有压力测试的部分,需要以最快的速度来创建线程,所以没有采用复用的方式,具体后面有一个接口测试工具的系列文章来介绍。
    2. 这里说的线程队列,和前面动态类型序列化是后面一个作业调度系统的基础,这部分还在设计和实现中,预计月底可以在文章中和大家见面。
    3. 其实上一篇和这一篇都是在为作业系统做铺垫。
    4. 下一篇将介绍一个定时执行任务的设计。
    5. 虽然我一直觉得设计是最重要的,实现其次,但是要落地还是要依赖于实现,所以这些基本的组件和帮助类还是需要的。

  • 相关阅读:
    emacs写cnblog博客
    emacs写cnblog博客
    linux安装jdk
    linux远程服务器启动mysql时显示:/tmp/mysql.sock 不存在的解决方法
    最新Linux系统下安装MySql 5.7.17全过程及注意事项
    Xshell实现Windows上传文件到Linux主机
    4种java定时器
    微信的redirect_uri参数错误解决办法
    要善于借势破局——宁向东的清华管理学课第4课
    Java内存区域
  • 原文地址:https://www.cnblogs.com/Skyven/p/7819277.html
Copyright © 2011-2022 走看看