zoukankan      html  css  js  c++  java
  • 创建自定义线程池

    1. 创建一个单例模式的自定义线程池类

    public class CustomThreadPool
    {
        //#region configurable items - for demo let's have these as constants
        private const int MAX = 8; // maximum no of threads in pool
        private const int MIN = 3; // minimum no of threads in pool
        private const int MIN_WAIT = 10; // milliseconds
        private const int MAX_WAIT = 15000; // milliseconds - threshold for simple task
        private const int CLEANUP_INTERVAL = 60000; // millisecond - to free waiting threads in pool
        private const int SCHEDULING_INTERVAL = 10; // millisecond - look for task in queue in loop
        //#endregion
    
        //#region singleton instance of threadpool
        private static readonly CustomThreadPool _instance = new CustomThreadPool();
    
        private CustomThreadPool() {
            InitializeThreadPool();
        }
    
        public static CustomThreadPool Instance
        {
            get
            {
                return _instance;
            }
        }
        //#endregion
    
        private void InitializeThreadPool() { 
        //TODO: write initialization code here 
        }
    }

    2. 定义一些基本类型来和线程池进行通讯

    public delegate void UserTask();
    public class ClientHandle
    {
        public Guid ID;
        public bool IsSimpleTask = false;
    }
    public class TaskStatus
    {
        public bool Success = true;
        public Exception InnerException = null;
    }

    看到上面的代码了吗?UserTask是一个代理,代表了线程池中线程将要执行的任务。

    3. 接下来我们给我们的自定义线程池类增加一些公共接口的方法

    public ClientHandle QueueUserTask(UserTask task, Action<TaskStatus> callback)
    {
        throw new Exception("not implemented yet.");
    }
    
    public static void CancelUserTask(ClientHandle handle)
    { 
        //TODO: write implementation code here
    }

     4. 下面是线程池需要使用的一些内部类和类型

    //#region nested private types
    enum TaskState // to represent current state of a usertask
    {
        notstarted,
        processing,
        completed,
        aborted
    }
    class TaskHandle // Item in waiting queue
    {
        public ClientHandle Token; // generate this everytime an usertask is queued and return to the caller as a reference. 
        public UserTask task; // the item to be queued - supplied by the caller
        public Action<TaskStatus> callback; // optional - in case user want's a notification of completion
    }
    
    class TaskItem // running items in the pool - TaskHandle gets a thread to execute it 
    {
        public TaskHandle taskHandle;
        public Thread handler;
        public TaskState taskState = TaskState.notstarted; 
        public DateTime startTime = DateTime.MaxValue;
    }
    //#endregion  

    5. 下面我们需要做的是线程池的初始化工作,并且初始化任务队列

    //private instance members
    private Queue<TaskHandle> ReadyQueue = null;
    private List<TaskItem> Pool = null;
    private Thread taskScheduler = null;
    
    private void InitializeThreadPool()
    {
        ReadyQueue = new Queue<TaskHandle>();
        Pool = new List<TaskItem>();
        taskScheduler = new Thread(() =>
            {
                //TODO: write scheduling logic here
            });
        taskScheduler.Start();
    }  

      上面需要特别注意的是,taskScheduler这个线程类对象。
      这是始终贯穿线程池生命周期的一个额外的线程,也可以说是主线程。

    它的任务是监视用户任务队列,并且尽快地把它们带去执行,另外它还负责强制最大和最小

    线程数的限制,做一些清理工作。

    6. 接着就是实现线程初始化了,我们使用的是就近完成算法

    private void InitializeThreadPool()
    {
        ReadyQueue = new Queue<TaskHandle>();
        Pool = new List<TaskItem>();
    
        InitPoolWithMinCapacity(); // initialize Pool with Minimum capacity - that much thread must be kept ready
    
        DateTime LastCleanup = DateTime.Now; // monitor this time for next cleaning activity
    
        taskScheduler = new Thread(() =>
            {
                do
                {
                    while (ReadyQueue.Count > 0 && ReadyQueue.Peek().task == null)
                        ReadyQueue.Dequeue();
                        // remove cancelled item/s - cancelled item will have it's task set to null
                            
                    int itemCount = ReadyQueue.Count;
                    for (int i = 0; i < itemCount; i++)
                    {
                        TaskHandle readyItem = ReadyQueue.Peek(); // the Top item of queue
                        bool Added = false;
    
                        foreach (TaskItem ti in Pool)
                        {
                            if (ti.taskState == TaskState.completed)
                            {
                                // if in the Pool task state is completed then a different
                                // task can be handed over to that thread
                                ti.taskHandle = readyItem;
                                ti.taskState = TaskState.notstarted;
                                Added = true;
                                ReadyQueue.Dequeue();
                                break;
                            }
                        }
                        if (!Added && Pool.Count < MAX)
                        {
                        // if all threads in pool are busy and the count is still less than the
                        // Max limit set then create a new thread and add that to pool
                            TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
                            ti.taskHandle = readyItem;
                            // add a new TaskItem in the pool
                            AddTaskToPool(ti);
                            Added = true;
                            ReadyQueue.Dequeue();
                        }
                        if (!Added) break; // It's already crowded so try after sometime
                    }
                    if ((DateTime.Now - LastCleanup) > TimeSpan.FromMilliseconds(CLEANUP_INTERVAL))
                    // It's long time - so try to cleanup Pool once.
                    {
                        CleanupPool();
                        LastCleanup = DateTime.Now;
                    }
                    else
                    {
                        // either of these two can work - the combination is also fine for our demo. 
                        Thread.Yield();
                        Thread.Sleep(SCHEDULING_INTERVAL); // Dont run madly in a loop - wait for sometime for things to change.
                        // the wait should be minimal - close to zero
                    }
                } while (true);
            });
        taskScheduler.Priority = ThreadPriority.AboveNormal;
        taskScheduler.Start();
    }
    
    private void InitPoolWithMinCapacity()
    {
        for (int i = 0; i <= MIN; i++)
        {
            TaskItem ti = new TaskItem() { taskState = TaskState.notstarted };
            ti.taskHandle = new TaskHandle() { task = () => { } };
            ti.taskHandle.callback = (taskStatus) => { };
            ti.taskHandle.Token = new ClientHandle() { ID = Guid.NewGuid() };
            AddTaskToPool(ti);
        }
    }
    
    private void AddTaskToPool(TaskItem taskItem)
    {
        taskItem.handler = new Thread(() =>
        {
            do
            {
                bool Enter = false;
    
                // if aborted then allow it to exit the loop so that it can complete and free-up thread resource.
                // this state means it has been removed from Pool already.
                if (taskItem.taskState == TaskState.aborted) break; 
    
                if (taskItem.taskState == TaskState.notstarted)
                {
                    taskItem.taskState = TaskState.processing;
                    taskItem.startTime = DateTime.Now;
                    Enter = true;
                }
                if (Enter)
                {
                    TaskStatus taskStatus = new TaskStatus();
                    try
                    {
                        taskItem.taskHandle.task.Invoke(); // execute the UserTask
                        taskStatus.Success = true;
                    }
                    catch (Exception ex)
                    {
                        taskStatus.Success = false;
                        taskStatus.InnerException = ex;
                    }
                    if (taskItem.taskHandle.callback != null && taskItem.taskState != TaskState.aborted)
                    {
                        try
                        {
                            taskItem.taskState = TaskState.completed;
                            taskItem.startTime = DateTime.MaxValue;
    
                            taskItem.taskHandle.callback(taskStatus); // notify callback with task-status
                        }
                        catch
                        {
    
                        }
                    }
                }
                // give other thread a chance to execute as it's current execution completed already
                Thread.Yield(); Thread.Sleep(MIN_WAIT); //TODO: need to see if Sleep is required here
            } while (true); // it's a continuous loop until task gets abort request
        });
        taskItem.handler.Start();
        Pool.Add(taskItem);
    }
    
    private void CleanupPool()
    {
        throw new NotImplementedException();
    }


    7. 用户任务队列实现

    public ClientHandle QueueUserTask(UserTask task, Action<taskstatus> callback)
    {
        TaskHandle th = new TaskHandle() 
            { 
                task = task, 
                Token = new ClientHandle() 
                    { 
                        ID = Guid.NewGuid() 
                    }, 
                callback = callback 
            };
        ReadyQueue.Enqueue(th);
        return th.Token;
    }


    8. 最后便是测试代码

    CustomThreadPool MyPool;
    
    private void Form1_Load(object sender, EventArgs e)
    {
        MyPool = CustomThreadPool.Instance;
    }
    
    void showMessage(string message)
    {
        MessageBox.Show(message);
    }
    int x = 0;
    
    private void btnStart_Click(object sender, EventArgs e)
    {
        x++;
        int arg = x;
        MyPool.QueueUserTask(() => 
            { 
                showMessage(arg.ToString()); 
            }, 
            (ts) => 
            { 
                showMessage(ts.Success.ToString()); 
            });
    } 
    技术改变世界
  • 相关阅读:
    巴洛克式和哥特式的区别
    推荐阅读书籍,是时候再行动起来了。
    AtCoder ABC 159F Knapsack for All Segments
    AtCoder ABC 159E Dividing Chocolate
    AtCoder ABC 158F Removing Robots
    AtCoder ABC 158E Divisible Substring
    AtCoder ABC 157F Yakiniku Optimization Problem
    AtCoder ABC 157E Simple String Queries
    AtCoder ABC 157D Friend Suggestions
    AtCoder ABC 156F Modularness
  • 原文地址:https://www.cnblogs.com/davidgu/p/2955561.html
Copyright © 2011-2022 走看看