java中的线程池框架为Executors,但是这里我们将自己实现简单的线程池,主要目的是理解它的原理。
线程池主要由两个部分组成:
(1)线程数组,用于执行任务。
(2)任务队列。
下面的两个实现都是按照这种思路来做的。
一.简单的线程池,有点问题
package com.chuiyuan.utils;
import java.util.LinkedList;
/**
* Created by chuiyuan on 2/21/16.
* start:
* create a new thread and run the job in run()
* run:
* not create any new thread, just run job in current thread
*
* TO DO:
* (1)how to stop the thread ?
*/
public class MyWorkQueue {
//Executors
/**
* thread pool size
*/
private final int n ;
/**
* Runnable job queue,LinkedList
*/
private final LinkedList queue ;
/**
*
*/
private final PoolWorker [] threads ;
/**
* init n threads for workers,
* start all work threads
* @param n thread pool size
*/
public MyWorkQueue(int n){
this.n = n;
queue = new LinkedList();
threads = new PoolWorker[n];
for (int i=0;i<n ;i++){
threads[i] = new PoolWorker();//if no,NullPointerException
threads[i].start();//start all work threads
}
}
/**
* work thread
*/
private class PoolWorker extends Thread{
public void run(){
Runnable r =null ;\if not null, may has not initialized error
while(true){
synchronized (queue){
//if queue is empty, wait
while (queue.isEmpty()){
try {
queue.wait(10);
}catch (InterruptedException e){
}
}
//if queue not empty,get job from queue and do it
//if change LinkedList to blocking queue?
//this must in synchronized
if (!queue.isEmpty()) {
r = (Runnable) queue.removeFirst();
}
}
//out of synchronized
try {
if (r!= null) r.run();
}catch (RuntimeException e){}
}
}
}
/**
* add and execute Runnable job
* @param r
*/
public void execute(Runnable r ){
synchronized (queue){
queue.addLast(r);
queue.notify();
}
}
}
存在的问题:
没有进行线程关闭。
值得注意的是工作线程中的run方法逻辑。在从任务队列中取任务的时候,要对队列进行加锁,但是run的时候是不加锁的。
二.简单的线程池改进
这里我们使用了单例模式,可以指定线程池中线程数,且在添加时,可以以单任务,任务List,任务数组的方式添加。
package com.chuiyuan.utils;
import java.util.LinkedList;
import java.util.List;
/**
* Created by chuiyuan on 2/21/16.
* Main:
* WorkThread [] workThreads
* List<Runnable> taskQueue
*/
public final class ThreadPool {
//default 5 thread
private static int worker_num =5;
//worker thread
private WorkThread [] workThreads;
//tasks done
private static volatile int finished_task=0;
//task queue, as a buffer, List not thread safe
private List<Runnable> taskQueue = new LinkedList<Runnable>() ;
private static ThreadPool threadPool ;
private ThreadPool(){
this(5);
}
private ThreadPool(int worker_num){
this.worker_num = worker_num;
workThreads = new WorkThread[worker_num];
for (int i=0;i<worker_num;i++){
workThreads[i] = new WorkThread();
workThreads[i].start();//start thread in pool
}
}
/**
* singleton
*/
public static ThreadPool getThreadPool(){
return getThreadPool(worker_num);
}
public static ThreadPool getThreadPool(int worker_num1){
if (worker_num1<=0){
worker_num1 = ThreadPool.worker_num;
}
if (threadPool ==null){
threadPool = new ThreadPool(worker_num1);
}
return threadPool;
}
/**
* Just add task to TaskQueue, when to start task is
* decided by ThreadPool
* @param task
*/
public void execute(Runnable task){
synchronized (taskQueue){
taskQueue.add(task);
taskQueue.notify();
}
}
/**
* Add task to TaskQueue in batch
* @param tasks
*/
public void execute(Runnable [] tasks){
synchronized (taskQueue){
for (Runnable task :tasks){
taskQueue.add(task);
}
taskQueue.notify();
}
}
/**
* Add task in List
* @param tasks
*/
public void execute(List<Runnable> tasks){
synchronized (taskQueue){
for (Runnable task: tasks){
taskQueue.add(task);
}
taskQueue.notify();
}
}
/**
* shutdown all the thread if all tasks are done,if not,
* wait till all done
*/
public void shutdown(){
//if not done ,sleep for a while??
while (!taskQueue.isEmpty()){
try {
System.out.println("Thread "+Thread.cuurentThread().getName()+"want to shudown ThreadPool");
Thread.sleep(10);
}catch (InterruptedException e){
e.printStackTrace();
}
}
for (int i=0;i<worker_num;i++){
workThreads[i].stopWorker();
workThreads[i] = null;
}
threadPool = null ;
taskQueue.clear();//clear taskQueue
}
public int getWorkThreadNumber(){
return worker_num;
}
/**
* tasks pop out of TaskQueue, it may haven't
* done
* @return
*/
public int getFinishedTaskNumber(){
return finished_task;
}
/**
* tasks left in TaskQueue
* @return
*/
public int getWaitTaskNumber(){
return taskQueue.size();
}
@Override
public String toString(){
return "WorkThreadNumber:"+getWorkThreadNumber()+",FinishedTaskNumber"+
getFinishedTaskNumber()+",WaitTaskNumber:"+getWaitTaskNumber();
}
private class WorkThread extends Thread{
//use to stop thread
private boolean isRunning = true ;
/**
* key in run()
* if TaskQueue is not null, get task and run.
* if TaskQueue is null, wait
*/
@Override
public void run(){
Runnable r = null;
//inner class has a reference of outclass.this
//so it can read outclass.this.taskQueue
while(isRunning){
synchronized (taskQueue){
while(isRunning&& taskQueue.isEmpty()){
try {
taskQueue.wait(20);
}catch (InterruptedException e){
e.printStackTrace();
}
}
if (!taskQueue.isEmpty()){
r = taskQueue.remove(0);//get out task
}
}
if (r!=null){
r.run();//run task in this thread
}
finished_task++;
r = null ;
}
}
public void stopWorker(){
isRunning = false;
}
}
}
值得注意的地方有:
(1)对象数组的初始化

对于基本数据类型的数组,在new时,同时也对元素进行了初始化。
对于数组对象,使用new只是对数组本身分配空间,但是数组元素并没有初始化,也就是数组元素都为空。

还要对数组的每个元素进行初始化。
(2)单例模式的使用
将构造函数私有化,再通过公共的静态getThreadPool对私有的构造函数进行调用。
(3)execute方法
execute方法只是添加了Runnable任务,但是任务的调度则是由ThreadPool进行的。在添加任务的时候,要对TaskQueue进行加锁,添加完成后要notify()。
notify(),notifyAll(),wait()为Object方法,只能在synchronized块中使用,而sleep()为Thread()方法,可以在非同步块中使用。
(4)shutdown方法
里面的意思是,当外部线程(一般是主线程)想关闭ThreadPool,如果任务队列中还有任务没有执行,则主线程sleep(10),线程池则接着工作,过10后再看任务队列是否为空,如此循环。
wait():causes the current thread to wait until either another thread invokes notify() or notifyAll(), or the specified time has passed. The current thread must own this object's monitor.
(5)工作线程
核心。