前言
并发(Concurrency)一直谈论java绕不开的一个话题,从移动开发工程师到后端工程师,几乎所有的面试都要涉及到并发/多线程的一些问题。虽然多数时候我们使用线程池,都是已经实现好的框架——jdk7中就有现成的ThreadPoolExecutor供我们使用,不过,自己实现一个简化的线程池,对于帮助我们理解其内部原理还是有一些帮助的。
设计
核心思想如下:
- 线程池实例为单例
- 线程池实例中保存着一个线程数组,用来分发任务
- 线程池中通过一个BlockingQueue实例,来实现FIFO的任务队列,这个实例同时被线程数组中的每一个线程拥有
- 线程通过while循环,不断从队列中取出任务执行(Runnable
实现
首先是线程池示例SimpleThreadPool.java,使用单例模式。
public class SimpleThreadPool { private static SimpleThreadPool ourInstance; private static final int QUEUE_SIZE = 100; // todo 调整 最优的capacity private PoolThread[] threadArray; private BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(QUEUE_SIZE); private boolean isStopped; public static SimpleThreadPool getInstance() { if (ourInstance == null) { ourInstance = new SimpleThreadPool(); } return ourInstance; } public SimpleThreadPool initPoolSize(int size) { if (size < 1 || size > 10) { throw new RuntimeException("size must be 1~10!"); } threadArray = new PoolThread[size]; for (int i = 0; i < size; i++) { threadArray[i] = new PoolThread(blockingQueue); threadArray[i].start(); } return this; } public synchronized SimpleThreadPool execute(Runnable runnable) { if (isStopped) { throw new IllegalStateException("Thread Pool is stopped!"); } blockingQueue.offer(runnable); return this; } public synchronized void doStop() { for (PoolThread pt : threadArray) { pt.doStop(); } isStopped = true; } }
然后是自定义的线程池线程,PoolThread.java
是否需要在run、doStop两个方法前声明synchronized,存疑。
public class PoolThread extends Thread { private BlockingQueue<Runnable> blockingQueue; private boolean isStopped; public PoolThread(BlockingQueue<Runnable> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { while (!isStopped) { try { Runnable runnable = blockingQueue.take(); System.out.println(getName() + " is running..."); runnable.run(); } catch (InterruptedException e) { // todo: log it } catch (Exception e) { throw new RuntimeException(e); } } } public void doStop() { interrupt(); isStopped = true; } }
写个Main函数测试一下。
public class Main { public static void main(String[] args) { SimpleThreadPool.getInstance().initPoolSize(10).execute(new Runnable() { @Override public void run() { System.out.println("1+1=" + (1 + 1)); } }).execute(new Runnable() { @Override public void run() { System.out.println("2+2=" + (2 + 2)); } }).execute(new Runnable() { @Override public void run() { System.out.println("3+3=" + (3 + 3)); } }).execute(new Runnable() { @Override public void run() { System.out.println("4+4=" + (4 + 4)); } }).execute(new Runnable() { @Override public void run() { System.out.println("5+5=" + (5 + 5)); } }) ; SimpleThreadPool.getInstance().doStop(); } }
示例输出:
Thread-9 is running... 1+1=2 Thread-9 is running... 2+2=4 Thread-9 is running... 3+3=6 Thread-3 is running... 4+4=8 Thread-4 is running... 5+5=10