实现了一个简化版的线程池。
实现线程池的关键有两个:一是阻塞队列,用于任务的存取,二是内部的线程对象如何持续性的执行任务,并在空闲时被回收。
线程池代码:
1 package learnConcurrent; 2 3 import java.util.ArrayList; 4 import java.util.Collection; 5 import java.util.LinkedList; 6 import java.util.List; 7 import java.util.concurrent.ArrayBlockingQueue; 8 import java.util.concurrent.BlockingQueue; 9 import java.util.concurrent.Callable; 10 import java.util.concurrent.ExecutionException; 11 import java.util.concurrent.ExecutorService; 12 import java.util.concurrent.Future; 13 import java.util.concurrent.TimeUnit; 14 import java.util.concurrent.TimeoutException; 15 import java.util.concurrent.atomic.AtomicBoolean; 16 import java.util.concurrent.atomic.AtomicInteger; 17 import java.util.concurrent.locks.ReentrantLock; 18 19 public class MyThreadPool implements ExecutorService{ 20 //线程队列 21 private List<Worker> workers; 22 //任务队列 23 private BlockingQueue<Runnable> rQueue; 24 //线程池核心大小 25 private int corePoolSize; 26 //线程池最大大小 27 private int maxPoolSize; 28 //空闲线程最长存活时间 29 private int keepAliveTime = 60; 30 31 private static final int ALIVE = 0; 32 33 private static final int SHUTDOMN = 1; 34 35 private int state = ALIVE; 36 37 private ReentrantLock lock = new ReentrantLock(); 38 39 public MyThreadPool(int corePoolSize, int maxPoolSize){ 40 this.corePoolSize = corePoolSize; 41 this.maxPoolSize = maxPoolSize; 42 43 this.workers = new LinkedList<Worker>(); 44 //阻塞队列,最大容量为maxPoolSize 45 this.rQueue = new ArrayBlockingQueue<Runnable>(maxPoolSize, true); 46 } 47 48 @Override 49 public void execute(Runnable command) { 50 if(isShutdown()) 51 return; 52 //FIXME size在获取时和判断时 可能发生改变 53 lock.lock(); 54 int size = workers.size(); 55 if(size < corePoolSize){//当线程池线程数小于核心数量时,增加线程 56 addWorker(); 57 }else if(size < maxPoolSize && !rQueue.isEmpty()){//当线程大于核心数量且任务队列中任务排队时,增加线程 58 addWorker(); 59 } 60 lock.unlock(); 61 62 rQueue.offer(command); 63 } 64 65 @Override 66 public void shutdown() { 67 //关闭线程池的简单实现,设置状态让任务队列不在接受任务,线程也会因为超时被回收 68 //缺点时空闲的线程资源得不到立即释放 69 lock.lock(); 70 state = SHUTDOMN; 71 lock.unlock(); 72 } 73 74 /** 75 * 立即停止线程池,试图停止正在活动的线程,返回还在等待的任务列表 76 */ 77 @Override 78 public List<Runnable> shutdownNow() { 79 if(isShutdown()) 80 return null; 81 lock.lock(); 82 state = SHUTDOMN; 83 List<Runnable> restRunnable = new ArrayList<Runnable>(); 84 while(!rQueue.isEmpty()){ 85 restRunnable.add(rQueue.poll()); 86 } 87 for(Worker w : workers){ 88 w.interrupt(); 89 } 90 lock.unlock(); 91 return restRunnable; 92 } 93 94 @Override 95 public boolean isShutdown() { 96 lock.lock(); 97 boolean res = state != ALIVE; 98 lock.unlock(); 99 return res; 100 } 101 102 @Override 103 public boolean isTerminated() { 104 return isShutdown() && rQueue.isEmpty(); 105 } 106 107 @Override 108 public boolean awaitTermination(long timeout, TimeUnit unit) 109 throws InterruptedException { 110 // TODO Auto-generated method stub 111 return false; 112 } 113 114 @Override 115 public <T> Future<T> submit(Callable<T> task) { 116 // TODO Auto-generated method stub 117 return null; 118 } 119 120 @Override 121 public <T> Future<T> submit(Runnable task, T result) { 122 // TODO Auto-generated method stub 123 return null; 124 } 125 126 @Override 127 public Future<?> submit(Runnable task) { 128 return null; 129 } 130 131 @Override 132 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 133 throws InterruptedException { 134 return null; 135 } 136 137 @Override 138 public <T> List<Future<T>> invokeAll( 139 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 140 throws InterruptedException { 141 return null; 142 } 143 144 @Override 145 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 146 throws InterruptedException, ExecutionException { 147 return null; 148 } 149 150 @Override 151 public <T> T invokeAny(Collection<? extends Callable<T>> tasks, 152 long timeout, TimeUnit unit) throws InterruptedException, 153 ExecutionException, TimeoutException { 154 return null; 155 } 156 157 private Runnable getTask(){ 158 Runnable r = null; 159 try { 160 r = rQueue.poll(keepAliveTime, TimeUnit.SECONDS); 161 } catch (InterruptedException e) { 162 // TODO Auto-generated catch block 163 e.printStackTrace(); 164 } 165 return r; 166 } 167 168 private void addWorker(){ 169 Worker w = new Worker(); 170 w.start(); 171 lock.lock(); 172 workers.add(w); 173 lock.unlock(); 174 } 175 176 private void removeWorker(Worker w){ 177 lock.lock(); 178 workers.remove(w); 179 lock.unlock(); 180 } 181 182 class Worker extends Thread{ 183 184 private AtomicBoolean isAlive = new AtomicBoolean(true); 185 186 private Runnable task; 187 188 189 @Override 190 public void run() { 191 while(isAlive.get()){ 192 //阻塞一定时间,超时则回收该线程 193 task = getTask(); 194 if(task != null){ 195 task.run(); 196 }else{ 197 isAlive.set(false); 198 199 } 200 task = null; 201 } 202 System.out.println("remove worker"); 203 removeWorker(this); 204 } 205 206 } 207 208 209 }
测试代码:
1 package learnConcurrent; 2 3 4 public class ThreadPoolTest { 5 static int taskNo = 0; 6 public static void main(String[] args) throws InterruptedException { 7 MyThreadPool pool = new MyThreadPool(2, 5); 8 9 for(int i=0; i< 50; i++){ 10 Task task = new Task(taskNo++); 11 pool.execute(task); 12 Thread.sleep((int)(Math.random() * 1000)); 13 } 14 15 } 16 17 } 18 19 class Task implements Runnable{ 20 String str; 21 public Task(int taskNo){ 22 str = "TaskNo:" + taskNo; 23 } 24 @Override 25 public void run() { 26 System.out.println(str + " start work "); 27 //DO SOMETHING 28 try { 29 Thread.sleep((int)(Math.random() * 1000)); 30 } catch (InterruptedException e) { 31 // TODO Auto-generated catch block 32 e.printStackTrace(); 33 } 34 35 System.out.println(str + " done "); 36 } 37 38 }
虽然是继承了ExecutorService对象,但是只实现了几个接口,设计上也可能有未考虑到的问题。
测试代码也很简陋,仅供参考。