/** * * @author hc * @version 1.0 * * @param <Job> */ public interface ThreadPool<Job extends Runnable>{ //执行一个job void execute(Job job); //关闭线程 void shutdown(); //增加工作者线程 void addWorkers(int num); //减少工作者线程 void removeWorkers(int num); //正在等待执行的线程数量 int getJobSize(); }
import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; /** * * @author hc * @version 1.0 * @param <Job> */ public class ThreadPoolImpl<Job extends Runnable> implements ThreadPool<Job> { //最大的线程池数量 private static final int Max_Worker_Numbers=10; //最少工作线程数量 private static final int Min_Worker_Numbers=1; //默认的工作线程数量 private static final int Default_Worker_Numbers=5; //工作列表 private final LinkedList<Job> jobs=new LinkedList<Job>(); //工作的执行者列表 private final List<ThreadPoolImpl<Job>.Worker>workers=Collections.synchronizedList(new ArrayList<ThreadPoolImpl<Job>.Worker>()); //工作线程数量 private int workerNum=Default_Worker_Numbers; //线程编号生成 private AtomicLong threadNum=new AtomicLong(); public ThreadPoolImpl(){ initializeWorkers(Default_Worker_Numbers); } /** * * @param num 初始化的工作线程数量 */ public ThreadPoolImpl(int num){ if(num>Max_Worker_Numbers){ initializeWorkers(Max_Worker_Numbers); workerNum=Max_Worker_Numbers; }else if(num<Min_Worker_Numbers){ initializeWorkers(Min_Worker_Numbers); workerNum=Min_Worker_Numbers; }else{ initializeWorkers(num); workerNum=num; } } //添加需要执行的任务 @Override public void execute(Job job) { // TODO Auto-generated method stub if(job!=null){ synchronized (jobs) { jobs.addLast(job); jobs.notify(); } } } /** * 停止执行 */ @Override public void shutdown() { // TODO Auto-generated method stub for(ThreadPoolImpl.Worker worker: workers){ worker.shutdown(); } } /** * 增加工作线程 */ @Override public void addWorkers(int num) { // TODO Auto-generated method stub synchronized (jobs) { if(workers.size()+num>Max_Worker_Numbers){ num=Max_Worker_Numbers-workers.size(); } initializeWorkers(num); workerNum+=num; } } /** * 减少工作线程数量 */ @Override public void removeWorkers(int num) { // TODO Auto-generated method stub synchronized (jobs) { if(num>this.workerNum){ throw new IllegalArgumentException("超过实际工作线程"); } int count=0; while(count<num){ Worker worker= workers.get(count); if(workers.remove(worker)){ worker.shutdown(); count++; } workerNum--; } } } @Override public int getJobSize() { // TODO Auto-generated method stub return jobs.size(); } //工作者初始化 private void initializeWorkers(int num){ for(int i=0;i<num;i++){ Worker worker=new Worker(); workers.add(worker); Thread thread=new Thread(worker,"Thread-Worker-"+threadNum.incrementAndGet()); thread.start(); } } //工作的执行者,内部类 class Worker implements Runnable{ private volatile boolean isRunning=true; @Override public void run() { while(isRunning){ Job job=null; synchronized (jobs) { while(jobs.isEmpty()){ try { jobs.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block Thread.currentThread().interrupt(); return; } } //获取到需要执行的工作 job=jobs.removeFirst(); } if(job!=null){ try { job.run(); } catch (Exception e) { // TODO: handle exception } } } } public void shutdown(){ isRunning=false; } } }