线程池中的几个重要概念:
1.核心线程(corePool):线程池最终执行任务的角色肯定还是线程,同时我们也会限制线程的数量,所以我们可以这样理解核心线程,有新任务提交时,首先检查核心线程数,如果核心线程都在工作,而且数量也已经达到最大核心线程数,则不会继续新建核心线程,而会将任务放入等待队列。
2.等待队列 (workQueue):等待队列用于存储当核心线程都在忙时,继续新增的任务,核心线程在执行完当前任务后,也会去等待队列拉取任务继续执行,这个队列一般是一个线程安全的阻塞队列,它的容量也可以由开发者根据业务来定制。
3.非核心线程:当等待队列满了,如果当前线程数没有超过最大线程数,则会新建线程执行任务,那么核心线程和非核心线程到底有什么区别呢?说出来你可能不信,本质上它们没有什么区别,创建出来的线程也根本没有标识去区分它们是核心还是非核心的,线程池只会去判断已有的线程数(包括核心和非核心)去跟核心线程数和最大线程数比较,来决定下一步的策略。
4.线程活动保持时间 (keepAliveTime):线程空闲下来之后,保持存货的持续时间,超过这个时间还没有任务执行,该工作线程结束。
5.拒绝策略 (RejectedExecutionHandler):当等待队列已满,线程数也达到最大线程数时,线程池会根据饱和策略来执行后续操作,默认的策略是抛弃要加入的任务。
关于线程池的状态,有5种,
RUNNING, 运行状态,值也是最小的,刚创建的线程池就是此状态。
SHUTDOWN,停工状态,不再接收新任务,已经接收的会继续执行
STOP,停止状态,不再接收新任务,已经接收正在执行的,也会中断
清空状态,所有任务都停止了,工作的线程也全部结束了
TERMINATED,终止状态,线程池已销毁
它们的流转关系如下:
线程池中的线程是如何复用的:
线程池中的线程在循环中尝试取任务执行,这一步会被阻塞,
如果设置了allowCoreThreadTimeOut为true,则线程池中的所有线程都会在keepAliveTime时间超时后还未取到任务而退出。
或者线程池已经STOP,那么所有线程都会被中断,然后退出。
源码如下:
1 //Worker的run方法调用的是ThreadPoolExecutor的runWorker方法 2 public void run() { 3 runWorker(this); 4 } 5 6 7 final void runWorker(Worker w) { 8 Thread wt = Thread.currentThread(); 9 //取出需要执行的任务, 10 Runnable task = w.firstTask; 11 w.firstTask = null; 12 w.unlock(); // allow interrupts 13 boolean completedAbruptly = true; 14 try { 15 //如果task不是null,或者去队列中取任务,注意这里会阻塞,后面会分析getTask方法 16 while (task != null || (task = getTask()) != null) { 17 //这个lock在这里是为了如果线程被中断,那么会抛出InterruptedException,而退出循环,结束线程 18 w.lock(); 19 //判断线程是否需要中断 20 if ((runStateAtLeast(ctl.get(), STOP) || 21 (Thread.interrupted() && 22 runStateAtLeast(ctl.get(), STOP))) && 23 !wt.isInterrupted()) 24 wt.interrupt(); 25 try { 26 //任务开始执行前的hook方法 27 beforeExecute(wt, task); 28 Throwable thrown = null; 29 try { 30 task.run(); 31 } catch (RuntimeException x) { 32 thrown = x; throw x; 33 } catch (Error x) { 34 thrown = x; throw x; 35 } catch (Throwable x) { 36 thrown = x; throw new Error(x); 37 } finally { 38 ////任务开始执行后的hook方法 39 afterExecute(task, thrown); 40 } 41 } finally { 42 task = null; 43 w.completedTasks++; 44 w.unlock(); 45 } 46 } 47 completedAbruptly = false; 48 } finally { 49 //Worker退出 50 processWorkerExit(w, completedAbruptly); 51 } 52 } 53 54 private Runnable getTask() { 55 boolean timedOut = false; // Did the last poll() time out? 56 57 for (;;) { 58 int c = ctl.get(); 59 int rs = runStateOf(c); 60 61 // Check if queue empty only if necessary. 62 //检查线程池的状态,如果已经是STOP及以上的状态,或者已经SHUTDOWN,队列也是空的时候,直接return null,并将Worker数量-1 63 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 64 decrementWorkerCount(); 65 return null; 66 } 67 68 int wc = workerCountOf(c); 69 70 // 注意这里的allowCoreThreadTimeOut参数,字面意思是否允许核心线程超时,即如果我们设置为false,那么只有当线程数wc大于corePoolSize的时候才会超时 71 //更直接的意思就是,如果设置allowCoreThreadTimeOut为false,那么线程池在达到corePoolSize个工作线程之前,不会让闲置的工作线程退出 72 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 73 //确认超时,将Worker数-1,然后返回 74 if ((wc > maximumPoolSize || (timed && timedOut)) 75 && (wc > 1 || workQueue.isEmpty())) { 76 if (compareAndDecrementWorkerCount(c)) 77 return null; 78 continue; 79 } 80 81 try { 82 //从队列中取任务,根据timed选择是有时间期限的等待还是无时间期限的等待 83 Runnable r = timed ? 84 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 85 workQueue.take(); 86 if (r != null) 87 return r; 88 timedOut = true; 89 } catch (InterruptedException retry) { 90 timedOut = false; 91 } 92 } 93 }