参考资料
- 《Java并发编程的艺术》
- 《深入理解JAVA虚拟机》
- GitHub:https://github.com/libinkai/eagle
目标
自己实现一个线程池用于执行多线程任务,并且可以平滑关闭(当准备关闭线程池时,不接受新的任务,等待已提交的任务执行完毕之后再关闭线程池)
线程池V1.0
有问题的一个线程池,当关闭线程池时,程序并没有按预期退出。原因是线程死锁,借此记录简单的线程死锁排查方法
线程池接口
public interface EagleThreadPool<Job extends Runnable> { // 执行一个Job void execute(Job job); // 获取线程池当前待执行任务数目 int getJobSize(); // 添加工作线程 int addWorkers(int num); // 减少工作线程 int removeWorkers(int num); // 关闭线程池 void shutdown(); }
线程池接口实现
@Slf4j public class DefaultThreadPoolLocked<Job extends Runnable> implements EagleThreadPool<Job> { // 线程池工作者数目 private static final int maxWorkerNumber = 16; private static final int defaultWorkerNumber = 4; private static final int minWorkerNumber = 1; // 工作者编号(线程名) private AtomicInteger workerId = new AtomicInteger(); // 工作者队列 private final List<Worker> workerList = new LinkedList<>(); // 工作任务队列 private final List<Job> jobList = new LinkedList<>(); // 是否接受新的任务 private volatile boolean isWorking = true; // 工作者内部类 class Worker implements Runnable { private volatile boolean isRunning = true; private volatile boolean isHandling = false; @Override public void run() { while (isRunning) { Job job = null; synchronized (jobList) { while (isRunning && jobList.isEmpty()) { try { // 当前线程在jobList上等待,问题的根源 jobList.wait(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } // 取一个任务 job = jobList.remove(0); } if (job != null) { try { isHandling = true; job.run(); } catch (Exception e) { } finally { isHandling = false; } } } } public void close() { this.isRunning = false; } } public DefaultThreadPoolLocked() { initWorkers(defaultWorkerNumber); } public DefaultThreadPoolLocked(int initialWorkerNumber) { initWorkers(initialWorkerNumber); } // 初始化工作者线程 public int initWorkers(int num) { if (num < minWorkerNumber) { num = minWorkerNumber; } int freeCapacity = maxWorkerNumber - workerList.size(); if (num >= freeCapacity) { num = freeCapacity; } for (int i = 0; i < num; i++) { Worker worker = new Worker(); workerList.add(worker); Thread thread = new Thread(worker, "Worker-" + workerId.incrementAndGet()); thread.start(); } return num; } @Override public void execute(Job job) { if (isWorking && job != null) { synchronized (jobList) { jobList.add(job); jobList.notify(); } } else { log.debug("thread pool is waiting to close or job is null"); } } @Override public int getJobSize() { return jobList.size(); } @Override public int addWorkers(int num) { synchronized (jobList) { return initWorkers(num); } } @Override public int removeWorkers(int num) { int count = 0; synchronized (jobList) { for (int i = 0; i < num; i++) { Worker worker = workerList.get(i); if (!worker.isHandling) { worker.close(); count++; } } } return count; } @Override public void shutdown() { isWorking = false; while (!jobList.isEmpty()) { log.debug("sorry, jobList is not null, jobList size :{}, waiting to close", jobList.size()); } for (Worker worker : workerList) { worker.close(); } } // 测试线程池 public static void main(String[] args) { DefaultThreadPoolLocked defaultThreadPool = new DefaultThreadPoolLocked(); int count = 1000; while (count > 0) { int finalCount = count; defaultThreadPool.execute(new Runnable() { @Override public void run() { int flag = 10; while (flag > 0) { log.debug("job{} say {}", finalCount, flag); flag--; } log.debug("job{} done", finalCount); } }); if (count == 500) { defaultThreadPool.shutdown(); } count--; } } }
发现&解决问题
程序关闭线程池,已提交任务执行完毕之后,JVM并没有退出。而JVM规定当JVM中不存在非守护线程时,JVM退出,故程序可能出现了死锁
执行jps,查看所有的HotSpot进程,注意是“进程”
执行jstack 4936 (4936为jps结果中DefaultThreadPoolLocked的进程号),查看进程堆栈信息
发现存在工作者线程一直在任务队列上等待,故使用超时等待方案或许可以解决问题
线程池接口实现改进
package com.equator.eagle.threadpool; import lombok.extern.slf4j.Slf4j; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; /** * @Author: Equator * @Date: 2019/12/4 20:39 **/ @Slf4j public class DefaultThreadPool<Job extends Runnable> implements EagleThreadPool<Job> { // 线程池工作者数目 private static final int maxWorkerNumber = 16; private static final int defaultWorkerNumber = 4; private static final int minWorkerNumber = 1; // 工作者编号(线程名) private AtomicInteger workerId = new AtomicInteger(); // 工作者队列 private final List<Worker> workerList = new LinkedList<>(); // 工作任务队列 private final List<Job> jobList = new LinkedList<>(); // 是否接受新的任务 private volatile boolean isWorking = true; // 工作者内部类 class Worker implements Runnable { private volatile boolean isRunning = true; private volatile boolean isHandling = false; @Override public void run() { while (isRunning) { Job job = null; synchronized (jobList) { while (isRunning && jobList.isEmpty()) { try { // 超时等待 jobList.wait(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } // 取一个任务,这里需要判断一下任务队列是否为空,否则会发生NPE(空指针异常) if (jobList.size() > 0) { job = jobList.remove(0); } } if (job != null) { try { isHandling = true; job.run(); } catch (Exception e) { } finally { isHandling = false; } } } } public void close() { this.isRunning = false; } } public DefaultThreadPool() { initWorkers(defaultWorkerNumber); } public DefaultThreadPool(int initialWorkerNumber) { initWorkers(initialWorkerNumber); } public int initWorkers(int num) { int freeCapacity = maxWorkerNumber - workerList.size(); if (num >= freeCapacity) { num = freeCapacity; } if (num < minWorkerNumber) { num = minWorkerNumber; } for (int i = 0; i < num; i++) { Worker worker = new Worker(); workerList.add(worker); Thread thread = new Thread(worker, "Worker-" + workerId.incrementAndGet()); thread.start(); } return num; } @Override public void execute(Job job) { if (isWorking && job != null) { synchronized (jobList) { jobList.add(job); jobList.notify(); } } else { log.debug("thread pool is waiting to close or job is null"); } } @Override public int getJobSize() { return jobList.size(); } @Override public int addWorkers(int num) { synchronized (jobList) { return initWorkers(num); } } @Override public int removeWorkers(int num) { int count = 0; synchronized (jobList) { for (int i = 0; i < num; i++) { Worker worker = workerList.get(i); if (!worker.isHandling) { worker.close(); count++; } } } return count; } @Override public void shutdown() { isWorking = false; while (!jobList.isEmpty()) { log.debug("sorry, jobList is not null, jobList size :{}, waiting to close", jobList.size()); } for (Worker worker : workerList) { worker.close(); } } public static void main(String[] args) { DefaultThreadPool defaultThreadPool = new DefaultThreadPool(); int count = 1000; while (count > 0) { int finalCount = count; defaultThreadPool.execute(new Runnable() { @Override public void run() { int flag = 10; while (flag > 0) { log.debug("job{} say {}", finalCount, flag); flag--; } log.debug("job{} done", finalCount); } }); if (count == 500) { defaultThreadPool.shutdown(); } count--; } } }
至此,线程池就可以正常工作啦!