zoukankan      html  css  js  c++  java
  • 创建自定义线程池

    1. 创建一个单例模式的自定义线程池类

    public class CustomThreadPool
    {
        //#region configurable items - for demo let's have these as constants
        private const int MAX = 8; // maximum no of threads in pool
        private const int MIN = 3; // minimum no of threads in pool
        private const int MIN_WAIT = 10; // milliseconds
        private const int MAX_WAIT = 15000; // milliseconds - threshold for simple task
        private const int CLEANUP_INTERVAL = 60000; // millisecond - to free waiting threads in pool
        private const int SCHEDULING_INTERVAL = 10; // millisecond - look for task in queue in loop
        //#endregion
    
        //#region singleton instance of threadpool
        private static readonly CustomThreadPool _instance = new CustomThreadPool();
    
        private CustomThreadPool() {
            InitializeThreadPool();
        }
    
        public static CustomThreadPool Instance
        {
            get
            {
                return _instance;
            }
        }
        //#endregion
    
        private void InitializeThreadPool() { 
        //TODO: write initialization code here 
        }
    }

    2. 定义一些基本类型来和线程池进行通讯

    public delegate void UserTask();
    public class ClientHandle
    {
        public Guid ID;
        public bool IsSimpleTask = false;
    }
    public class TaskStatus
    {
        public bool Success = true;
        public Exception InnerException = null;
    }

    看到上面的代码了吗?UserTask是一个代理,代表了线程池中线程将要执行的任务。

    3. 接下来我们给我们的自定义线程池类增加一些公共接口的方法

    public ClientHandle QueueUserTask(UserTask task, Action<TaskStatus> callback)
    {
        throw new Exception("not implemented yet.");
    }
    
    public static void CancelUserTask(ClientHandle handle)
    { 
        //TODO: write implementation code here
    }

     4. 下面是线程池需要使用的一些内部类和类型

    //#region nested private types
    enum TaskState // to represent current state of a usertask
    {
        notstarted,
        processing,
        completed,
        aborted
    }
    class TaskHandle // Item in waiting queue
    {
        public ClientHandle Token; // generate this everytime an usertask is queued and return to the caller as a reference. 
        public UserTask task; // the item to be queued - supplied by the caller
        public Action<TaskStatus> callback; // optional - in case user want's a notification of completion
    }
    
    class TaskItem // running items in the pool - TaskHandle gets a thread to execute it 
    {
        public TaskHandle taskHandle;
        public Thread handler;
        public TaskState taskState = TaskState.notstarted; 
        public DateTime startTime = DateTime.MaxValue;
    }
    //#endregion  

    5. 下面我们需要做的是线程池的初始化工作,并且初始化任务队列

    //private instance members
    private Queue<TaskHandle> ReadyQueue = null;
    private List<TaskItem> Pool = null;
    private Thread taskScheduler = null;
    
    private void InitializeThreadPool()
    {
        ReadyQueue = new Queue<TaskHandle>();
        Pool = new List<TaskItem>();
        taskScheduler = new Thread(() =>
            {
                //TODO: write scheduling logic here
            });
        taskScheduler.Start();
    }  

      上面需要特别注意的是,taskScheduler这个线程类对象。
      这是始终贯穿线程池生命周期的一个额外的线程,也可以说是主线程。

    它的任务是监视用户任务队列,并且尽快地把它们带去执行,另外它还负责强制最大和最小

    线程数的限制,做一些清理工作。

    6. 接着就是实现线程初始化了,我们使用的是就近完成算法

    private void InitializeThreadPool()
    {
        ReadyQueue = new Queue<TaskHandle>();
        Pool = new List<TaskItem>();
    
        InitPoolWithMinCapacity(); // initialize Pool with Minimum capacity - that much thread must be kept ready
    
        DateTime LastCleanup = DateTime.Now; // monitor this time for next cleaning activity
    
        taskScheduler = new Thread(() =>
            {
                do
                {
                    while (ReadyQueue.Count > 0 && ReadyQueue.Peek().task == null)
                        ReadyQueue.Dequeue();
                        // remove cancelled item/s - cancelled item will have it's task set to null
                            
                    int itemCount = ReadyQueue.Count;
                    for (int i = 0; i < itemCount; i++)
                    {
                        TaskHandle readyItem = ReadyQueue.Peek(); // the Top item of queue
                        bool Added = false;
    
                        foreach (TaskItem ti in Pool)
                        {
                            if (ti.taskState == TaskState.completed)
                            {
                                // if in the Pool task state is completed then a different
                                // task can be handed over to that thread
                                ti.taskHandle = readyItem;
                                ti.taskState = TaskState.notstarted;
                                Added = true;
                                ReadyQueue.Dequeue();
                                break;
                            }
                        }
                        if (!Added && Pool.Count < MAX)
                        {
                        // if all threads in pool are busy and the count is still less than the
                        // Max limit set then create a new thread and add that to pool
                            TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
                            ti.taskHandle = readyItem;
                            // add a new TaskItem in the pool
                            AddTaskToPool(ti);
                            Added = true;
                            ReadyQueue.Dequeue();
                        }
                        if (!Added) break; // It's already crowded so try after sometime
                    }
                    if ((DateTime.Now - LastCleanup) > TimeSpan.FromMilliseconds(CLEANUP_INTERVAL))
                    // It's long time - so try to cleanup Pool once.
                    {
                        CleanupPool();
                        LastCleanup = DateTime.Now;
                    }
                    else
                    {
                        // either of these two can work - the combination is also fine for our demo. 
                        Thread.Yield();
                        Thread.Sleep(SCHEDULING_INTERVAL); // Dont run madly in a loop - wait for sometime for things to change.
                        // the wait should be minimal - close to zero
                    }
                } while (true);
            });
        taskScheduler.Priority = ThreadPriority.AboveNormal;
        taskScheduler.Start();
    }
    
    private void InitPoolWithMinCapacity()
    {
        for (int i = 0; i <= MIN; i++)
        {
            TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
            ti.taskHandle = new TaskHandle() { task = () => { } };
            ti.taskHandle.callback = (taskStatus) => { };
            ti.taskHandle.Token = new ClientHandle() { ID = Guid.NewGuid() };
            AddTaskToPool(ti);
        }
    }
    
    private void AddTaskToPool(TaskItem taskItem)
    {
        taskItem.handler = new Thread(() =>
        {
            do
            {
                bool Enter = false;
    
                // if aborted then allow it to exit the loop so that it can complete and free-up thread resource.
                // this state means it has been removed from Pool already.
                if (taskItem.taskState == TaskState.aborted) break; 
    
                if (taskItem.taskState == TaskState.notstarted)
                {
                    taskItem.taskState = TaskState.processing;
                    taskItem.startTime = DateTime.Now;
                    Enter = true;
                }
                if (Enter)
                {
                    TaskStatus taskStatus = new TaskStatus();
                    try
                    {
                        taskItem.taskHandle.task.Invoke(); // execute the UserTask
                        taskStatus.Success = true;
                    }
                    catch (Exception ex)
                    {
                        taskStatus.Success = false;
                        taskStatus.InnerException = ex;
                    }
                    if (taskItem.taskHandle.callback != null && taskItem.taskState != TaskState.aborted)
                    {
                        try
                        {
                            taskItem.taskState = TaskState.completed;
                            taskItem.startTime = DateTime.MaxValue;
    
                            taskItem.taskHandle.callback(taskStatus); // notify callback with task-status
                        }
                        catch
                        {
    
                        }
                    }
                }
                // give other thread a chance to execute as it's current execution completed already
                Thread.Yield(); Thread.Sleep(MIN_WAIT); //TODO: need to see if Sleep is required here
            } while (true); // it's a continuous loop until task gets abort request
        });
        taskItem.handler.Start();
        Pool.Add(taskItem);
    }
    
    private void CleanupPool()
    {
        throw new NotImplementedException();
    }


    7. 用户任务队列实现

    public ClientHandle QueueUserTask(UserTask task, Action<taskstatus> callback)
    {
        TaskHandle th = new TaskHandle() 
            { 
                task = task, 
                Token = new ClientHandle() 
                    { 
                        ID = Guid.NewGuid() 
                    }, 
                callback = callback 
            };
        ReadyQueue.Enqueue(th);
        return th.Token;
    }


    8. 最后便是测试代码

    CustomThreadPool MyPool;
    
    private void Form1_Load(object sender, EventArgs e)
    {
        MyPool = CustomThreadPool.Instance;
    }
    
    void showMessage(string message)
    {
        MessageBox.Show(message);
    }
    int x = 0;
    
    private void btnStart_Click(object sender, EventArgs e)
    {
        x++;
        int arg = x;
        MyPool.QueueUserTask(() => 
            { 
                showMessage(arg.ToString()); 
            }, 
            (ts) => 
            { 
                showMessage(ts.Success.ToString()); 
            });
    } 
    技术改变世界
  • 相关阅读:
    完成卸载vs2010后再安装
    图片集合,可用作商品列表
    无可奈何花落去
    Uncaught TypeError: Cannot read property 'msie' of undefined
    CodeGenerator.cs
    年月日控件
    SQL GETDATE()日期格式化函数
    股票操作要点
    Rust 错误处理, 包裹错误
    使用 Rust 实现并查集
  • 原文地址:https://www.cnblogs.com/davidgu/p/2955561.html
Copyright © 2011-2022 走看看