/**
*
* @author hc
* @version 1.0
*
* @param <Job>
*/
public interface ThreadPool<Job extends Runnable>{
//执行一个job
void execute(Job job);
//关闭线程
void shutdown();
//增加工作者线程
void addWorkers(int num);
//减少工作者线程
void removeWorkers(int num);
//正在等待执行的线程数量
int getJobSize();
}
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
*
* @author hc
* @version 1.0
* @param <Job>
*/
public class ThreadPoolImpl<Job extends Runnable> implements ThreadPool<Job> {
//最大的线程池数量
private static final int Max_Worker_Numbers=10;
//最少工作线程数量
private static final int Min_Worker_Numbers=1;
//默认的工作线程数量
private static final int Default_Worker_Numbers=5;
//工作列表
private final LinkedList<Job> jobs=new LinkedList<Job>();
//工作的执行者列表
private final List<ThreadPoolImpl<Job>.Worker>workers=Collections.synchronizedList(new ArrayList<ThreadPoolImpl<Job>.Worker>());
//工作线程数量
private int workerNum=Default_Worker_Numbers;
//线程编号生成
private AtomicLong threadNum=new AtomicLong();
public ThreadPoolImpl(){
initializeWorkers(Default_Worker_Numbers);
}
/**
*
* @param num 初始化的工作线程数量
*/
public ThreadPoolImpl(int num){
if(num>Max_Worker_Numbers){
initializeWorkers(Max_Worker_Numbers);
workerNum=Max_Worker_Numbers;
}else if(num<Min_Worker_Numbers){
initializeWorkers(Min_Worker_Numbers);
workerNum=Min_Worker_Numbers;
}else{
initializeWorkers(num);
workerNum=num;
}
}
//添加需要执行的任务
@Override
public void execute(Job job) {
// TODO Auto-generated method stub
if(job!=null){
synchronized (jobs) {
jobs.addLast(job);
jobs.notify();
}
}
}
/**
* 停止执行
*/
@Override
public void shutdown() {
// TODO Auto-generated method stub
for(ThreadPoolImpl.Worker worker: workers){
worker.shutdown();
}
}
/**
* 增加工作线程
*/
@Override
public void addWorkers(int num) {
// TODO Auto-generated method stub
synchronized (jobs) {
if(workers.size()+num>Max_Worker_Numbers){
num=Max_Worker_Numbers-workers.size();
}
initializeWorkers(num);
workerNum+=num;
}
}
/**
* 减少工作线程数量
*/
@Override
public void removeWorkers(int num) {
// TODO Auto-generated method stub
synchronized (jobs) {
if(num>this.workerNum){
throw new IllegalArgumentException("超过实际工作线程");
}
int count=0;
while(count<num){
Worker worker= workers.get(count);
if(workers.remove(worker)){
worker.shutdown();
count++;
}
workerNum--;
}
}
}
@Override
public int getJobSize() {
// TODO Auto-generated method stub
return jobs.size();
}
//工作者初始化
private void initializeWorkers(int num){
for(int i=0;i<num;i++){
Worker worker=new Worker();
workers.add(worker);
Thread thread=new Thread(worker,"Thread-Worker-"+threadNum.incrementAndGet());
thread.start();
}
}
//工作的执行者,内部类
class Worker implements Runnable{
private volatile boolean isRunning=true;
@Override
public void run() {
while(isRunning){
Job job=null;
synchronized (jobs) {
while(jobs.isEmpty()){
try {
jobs.wait();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
Thread.currentThread().interrupt();
return;
}
}
//获取到需要执行的工作
job=jobs.removeFirst();
}
if(job!=null){
try {
job.run();
} catch (Exception e) {
// TODO: handle exception
}
}
}
}
public void shutdown(){
isRunning=false;
}
}
}