在很多的应用场景,需要根据任务去创建线程去异步处理问题,不过不停的创建线程和销毁线程本身是一个非常耗时和消耗系统资源的事情,所以通常这种情况使用线程池来实现,常用的场景比如web容器对于web请求的处理,就是使用的线程池。线程池有几个关键性的元素:
(1) 工作者线程,比如我们称为workers,负责需要处理的任务
(2) 任务,比如我们称为Job,也就是需要处理的工作,通过线程池加入BlockingQueue中,然后由线程池的Job分配算法来分配给某一个worker(工作线程)
(3) 线程池本身,包括线程池的初始化,以及Job的分配工作,复杂一些的包括动态的worker增减算法,在我们的简单示例里面只实现最基本的线程工作。
实现的Java代码如下:
public class ThreadPool{ private static ThreadPool instance = new ThreadPool(); private BlockingQueue<Runnable> jobQueue; private BlockingQueue<Worker> workers; private int workerNum; private ReentrantLock lock = new ReentrantLock(); final class Worker implements Runnable{ Runnable job = null; public Worker() { } @Override public void run() { // TODO Auto-generated method stub try { while ((job = jobQueue.take()) != null) { job.run(); } } catch (InterruptedException e) { e.printStackTrace(); } } } public static ThreadPool getInstance() { return instance; } private ThreadPool() { workers = new LinkedBlockingQueue<Worker>(); } public void init(int workerNum, BlockingQueue<Runnable> _jobQueue) { this.workerNum = workerNum; this.jobQueue = _jobQueue; } public void execute(Runnable job) { lock.lock(); try { if (workers.size() < workerNum) { Worker worker = new Worker(); Thread t = new Thread(worker); t.start(); workers.add(worker); } jobQueue.add(job); } catch (Exception e) { // TODO: handle exception } finally { lock.unlock(); } } }
从代码中可以看出ThreadPool代表线程池对象本身,用一个单例实现。其内部类Worker代表工作线程,用来实际处理Job,至于Job,在上面代码里面没有体现,不过jobQueue是一个容纳Runnable接口的队列,我们的Job只需要是一个实现Runnable接口的实例就可以了。BlockingQueue本身是线程安全的,所以Worker中是无需加锁的。不过对worker数量的判断和创建新的线程,需要加锁,防止创建了比设定更多的线程。
测试代码:
package test; import java.util.concurrent.LinkedBlockingQueue; import pool.thread.*; class TestJob implements Runnable { int index = 0; public TestJob(int _index) { this.index = _index; } @Override public void run() { // TODO Auto-generated method stub System.out.println("job start " + index + " thread id=" + Thread.currentThread().getId()); } } public class TestThreadPool { public static void main(String[] args) { // TODO Auto-generated method stub // TODO Auto-generated method stub ThreadPool.getInstance().init(10, new LinkedBlockingQueue<Runnable>()); for (int i=0; i<1000; ++i) { TestJob job = new TestJob(i+1); ThreadPool.getInstance().execute(job); } while (true) { try { Thread.sleep(1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
从测试代码可以看出,TestJob这个Job只是简单的实现了Runnable接口。
不过这个线程池只是为了实现线程池的核心思想,很多需要的功能没有实现,包括针对worker的动态删减,还有针对线程中所有线程的控制,比如中断线程执行等。不过核心的线程池思想还是可以通过上面的代码了解到的。