import java.util.concurrent.*; /** * 线程池工具类 */ public class ThreadPoolUtils { private volatile static ThreadPoolExecutor threadPool; public static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() + 1; public static final int MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; public static final int KEEP_ALIVE_TIME = 1000; public static final int BLOCK_QUEUE_SIZE = 1000; public static void executor(Runnable runnable) { getThreadPoolExecutor().execute(runnable); } public static <T> Future<T> submit(Callable<T> callable) { return getThreadPoolExecutor().submit(callable); } /** * 获取线程池对象 * * @return */ public static ThreadPoolExecutor getThreadPoolExecutor() { if (threadPool != null) { return threadPool; } else { synchronized (ThreadPoolUtils.class) { if (threadPool == null) { threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(BLOCK_QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy()); } } return threadPool; } } }
线程池参数:
public ThreadPoolExecutor(int corePoolSize, 核心线程数量
int maximumPoolSize, 最大线程数量/业余线程数量+核心线程数量
long keepAliveTime, 业余线程保持时长
TimeUnit unit, 时间单位
BlockingQueue<Runnable> workQueue, 任务队列
ThreadFactory threadFactory, 线程工厂
RejectedExecutionHandler handler) 拒绝策略
线程池执行逻辑
1. 线程池开始执行 execute 方法 如上: 有四种情况
1)当前线程数量少于核心线程数 执行 addWork方法 执行new Worker .start 继而执行 runWorker方法 显性调用 task.run(); 随后通过 processWorkerExit 回归 addWorker 方法
2)如果不小于,那么就在workQueue中存取任务 (workQueue中任务通过getTack()方法拿出)
3)添加队列失败,再次执行addWork(xx,false) 判断条件就是线程数量小于最大线程数量
4)再次失败的话,就执行拒绝策略
线程池拒绝策略
new ThreadPoolExecutor.AbortPolicy()
public class Test { public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory() ,new ThreadPoolExecutor.AbortPolicy()
); for (int i = 0; i < Integer.MAX_VALUE;i++) { int finalI = i; executorService.execute(() -> { System.out.println("测试数据-execute = " + finalI); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.threadPool.Test$$Lambda$1/245565335@ee7d9f1 rejected from java.util.concurrent.ThreadPoolExecutor@15615099[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.example.threadPool.Test.main(Test.java:15)
测试数据-execute = 0---pool-1-thread-1
测试数据-execute = 1---pool-1-thread-2
测试数据-execute = 2---pool-1-thread-3
测试数据-execute = 3---pool-1-thread-4
测试数据-execute = 4---pool-1-thread-5
线程池执行任务,遇到多余任务直接拒绝 抛出异常
new ThreadPoolExecutor.CallerRunsPolicy()
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory() , new ThreadPoolExecutor.CallerRunsPolicy() ); for (int i = 0; i < 500; i++) { int finalI = i; executorService.execute(() -> { System.out.println("测试数据-execute = " + finalI + "---"+Thread.currentThread().getName()); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); }
测试数据-execute = 5---main
测试数据-execute = 6---main
测试数据-execute = 7---main
测试数据-execute = 8---main
测试数据-execute = 0---pool-1-thread-1
测试数据-execute = 9---main
测试数据-execute = 1---pool-1-thread-2
测试数据-execute = 2---pool-1-thread-3
测试数据-execute = 3---pool-1-thread-4
测试数据-execute = 4---pool-1-thread-5
测试数据-execute = 10---main
测试数据-execute = 12---pool-1-thread-5
测试数据-execute = 15---pool-1-thread-2
测试数据-execute = 14---pool-1-thread-1
........
说明主线程也执行任务,不进行异常抛出
new ThreadPoolExecutor.DiscardOldestPolicy()
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardOldestPolicy() ); for (int i = 0; i < 500; i++) { int finalI = i; executorService.execute(() -> { System.out.println("测试数据-execute = " + finalI + "---"+Thread.currentThread().getName()); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); }
测试数据-execute = 0---pool-1-thread-1
测试数据-execute = 1---pool-1-thread-2
测试数据-execute = 2---pool-1-thread-3
测试数据-execute = 3---pool-1-thread-4
测试数据-execute = 4---pool-1-thread-5
Exception in thread "main" java.lang.StackOverflowError
添加线程,添加队列,不断重试
new ThreadPoolExecutor.DiscardPolicy()
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardPolicy() ); for (int i = 0; i < 500; i++) { int finalI = i; executorService.execute(() -> { System.out.println("测试数据-execute = " + finalI + "---"+Thread.currentThread().getName()); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); }
测试数据-execute = 0---pool-1-thread-1
测试数据-execute = 1---pool-1-thread-2
测试数据-execute = 2---pool-1-thread-3
测试数据-execute = 3---pool-1-thread-4
测试数据-execute = 4---pool-1-thread-5
Process finished with exit code 0
添加失败,放弃任务,不会抛出异常