zoukankan      html  css  js  c++  java
  • 线程池的原理与实现

      java中的线程池框架为Executors,但是这里我们将自己实现简单的线程池,主要目的是理解它的原理。

    线程池主要由两个部分组成:

    (1)线程数组,用于执行任务。

    (2)任务队列。

    下面的两个实现都是按照这种思路来做的。

    一.简单的线程池,有点问题

    package com.chuiyuan.utils;
    
    import java.util.LinkedList;
    
    /**
     * Created by chuiyuan on 2/21/16.
     * start:
     * create a new thread and run the job in run()
     * run:
     * not create any new thread, just run job in current thread
     *
     * TO DO:
     * (1)how to stop the thread ?
     */
    public class MyWorkQueue {
        //Executors
        /**
         * thread pool size
         */
        private final int n ;
        /**
         * Runnable job queue,LinkedList
         */
        private final LinkedList queue ;
        /**
         *
         */
        private final PoolWorker [] threads ;
        /**
         * init n threads for workers,
         * start all work threads
         * @param n  thread pool size
         */
        public MyWorkQueue(int n){
            this.n = n;
            queue = new LinkedList();
            threads = new PoolWorker[n];
    
            for (int i=0;i<n ;i++){
                threads[i] = new PoolWorker();//if no,NullPointerException
                threads[i].start();//start all work threads
            }
        }
    
        /**
         * work thread
         */
        private class PoolWorker extends Thread{
            public void run(){
                Runnable r =null ;\if not null, may has not initialized error 
                while(true){
                    synchronized (queue){
                        //if queue is empty, wait
                        while (queue.isEmpty()){
                            try {
                                queue.wait(10);
                            }catch (InterruptedException e){
    
                            }
                        }
                        //if queue not empty,get job from queue and do it
                        //if change LinkedList to blocking queue?
                        //this must in synchronized
                        if (!queue.isEmpty()) {
                            r = (Runnable) queue.removeFirst();
                        }
                    }
                    //out of synchronized
                    try {
                        if (r!= null) r.run();
                    }catch (RuntimeException e){}
    
                }
            }
        }
    
        /**
         * add and execute Runnable job
         * @param r
         */
        public void execute(Runnable r ){
            synchronized (queue){
                queue.addLast(r);
                queue.notify();
            }
        }
    
    }
    

    存在的问题:

    没有进行线程关闭。

    值得注意的是工作线程中的run方法逻辑。在从任务队列中取任务的时候,要对队列进行加锁,但是run的时候是不加锁的。

    二.简单的线程池改进

      这里我们使用了单例模式,可以指定线程池中线程数,且在添加时,可以以单任务,任务List,任务数组的方式添加。

    package com.chuiyuan.utils;
    
    import java.util.LinkedList;
    import java.util.List;
    
    /**
     * Created by chuiyuan on 2/21/16.
     * Main:
     * WorkThread [] workThreads
     * List<Runnable> taskQueue
     */
    public final class ThreadPool {
        //default 5 thread
        private static int worker_num =5;
        //worker thread
        private WorkThread [] workThreads;
        //tasks done
        private static volatile int finished_task=0;
        //task queue, as a buffer, List not thread safe
        private List<Runnable> taskQueue = new LinkedList<Runnable>() ;
        private static ThreadPool threadPool ;
    
        private ThreadPool(){
            this(5);
        }
    
        private ThreadPool(int worker_num){
            this.worker_num = worker_num;
            workThreads = new WorkThread[worker_num];
            for (int i=0;i<worker_num;i++){
                workThreads[i] = new WorkThread();
                workThreads[i].start();//start thread in pool
            }
        }
        /**
         * singleton
         */
        public static ThreadPool getThreadPool(){
            return getThreadPool(worker_num);
        }
    
        public static ThreadPool getThreadPool(int worker_num1){
            if (worker_num1<=0){
                worker_num1 = ThreadPool.worker_num;
            }
            if (threadPool ==null){
                threadPool = new ThreadPool(worker_num1);
            }
            return threadPool;
        }
    
        /**
         * Just add task to TaskQueue, when to start task is
         * decided by ThreadPool
         * @param task
         */
        public void execute(Runnable task){
            synchronized (taskQueue){
                taskQueue.add(task);
                taskQueue.notify();
            }
        }
    
        /**
         * Add task to TaskQueue in batch
         * @param tasks
         */
        public void execute(Runnable [] tasks){
            synchronized (taskQueue){
                for (Runnable task :tasks){
                    taskQueue.add(task);
                }
                taskQueue.notify();
            }
        }
    
        /**
         * Add task in List
         * @param tasks
         */
        public void execute(List<Runnable> tasks){
            synchronized (taskQueue){
                for (Runnable task: tasks){
                    taskQueue.add(task);
                }
                taskQueue.notify();
            }
        }
    
        /**
         * shutdown all the thread if all tasks are done,if not,
         * wait till all done
         */
        public void shutdown(){
            //if not done ,sleep for a while??
            while (!taskQueue.isEmpty()){
                try {
              System.out.println("Thread "+Thread.cuurentThread().getName()+"want to shudown ThreadPool"); Thread.sleep(10); }catch (InterruptedException e){ e.printStackTrace(); } } for (int i=0;i<worker_num;i++){ workThreads[i].stopWorker(); workThreads[i] = null; } threadPool = null ; taskQueue.clear();//clear taskQueue } public int getWorkThreadNumber(){ return worker_num; } /** * tasks pop out of TaskQueue, it may haven't * done * @return */ public int getFinishedTaskNumber(){ return finished_task; } /** * tasks left in TaskQueue * @return */ public int getWaitTaskNumber(){ return taskQueue.size(); } @Override public String toString(){ return "WorkThreadNumber:"+getWorkThreadNumber()+",FinishedTaskNumber"+ getFinishedTaskNumber()+",WaitTaskNumber:"+getWaitTaskNumber(); } private class WorkThread extends Thread{ //use to stop thread private boolean isRunning = true ; /** * key in run() * if TaskQueue is not null, get task and run. * if TaskQueue is null, wait */ @Override public void run(){ Runnable r = null; //inner class has a reference of outclass.this //so it can read outclass.this.taskQueue while(isRunning){ synchronized (taskQueue){ while(isRunning&& taskQueue.isEmpty()){ try { taskQueue.wait(20); }catch (InterruptedException e){ e.printStackTrace(); } } if (!taskQueue.isEmpty()){ r = taskQueue.remove(0);//get out task } } if (r!=null){ r.run();//run task in this thread } finished_task++; r = null ; } } public void stopWorker(){ isRunning = false; } } }

     值得注意的地方有:

    (1)对象数组的初始化

    对于基本数据类型的数组,在new时,同时也对元素进行了初始化。

    对于数组对象,使用new只是对数组本身分配空间,但是数组元素并没有初始化,也就是数组元素都为空。

    还要对数组的每个元素进行初始化。

    (2)单例模式的使用

      将构造函数私有化,再通过公共的静态getThreadPool对私有的构造函数进行调用。

    (3)execute方法

      execute方法只是添加了Runnable任务,但是任务的调度则是由ThreadPool进行的。在添加任务的时候,要对TaskQueue进行加锁,添加完成后要notify()。

    notify(),notifyAll(),wait()为Object方法,只能在synchronized块中使用,而sleep()为Thread()方法,可以在非同步块中使用。

    (4)shutdown方法

      里面的意思是,当外部线程(一般是主线程)想关闭ThreadPool,如果任务队列中还有任务没有执行,则主线程sleep(10),线程池则接着工作,过10后再看任务队列是否为空,如此循环。

    wait():causes the current thread to wait until either another thread invokes notify() or notifyAll(), or the specified time has passed. The current thread must own this object's monitor.

    (5)工作线程

      核心。

  • 相关阅读:
    jQuery使用(十三):工具方法
    jQuery使用(十二):工具方法之type()之类型判断
    马化腾成中国新首富:一个多月身家增长77亿美元
    滴滴:设立1000万美元专项基金,援助海外司机骑手
    疫情查询
    自动获取时间html代码
    搜索引擎你还用百度吗?为什么?
    实现QQ内打开链接跳转至浏览器
    QQ靓号资料空白且空间开通教程
    斐波那契数列计算html代码
  • 原文地址:https://www.cnblogs.com/chuiyuan/p/5215403.html
Copyright © 2011-2022 走看看