zoukankan      html  css  js  c++  java
  • ThreadPoolExecutor 线程池

    线程池的好处

    实现了任务提交和任务执行的解耦, 用户只需提供Runnable对象, 而任务的执行和调度都由线程池负责

    • 降低资源消耗
    • 提升系统响应速度
    • 提高线程的可管理性

    工作流程

    执行过程图

    1. 首先判断线程池是否已满, 未满->创建新的核心线程执行任务
    2. 已满->判断阻塞队列是否已满, 未满->加入阻塞队列
    3. 已满->线程池是否已满(自旋?), 未满->创建新线程执行任务
    4. 已满->按照饱和策略 handler处理

    线程的创建 参数讲解

    ThreadPoolExecutor 的构造方法

    ThreadPoolExecutor(
        int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler){}
    
    • corePoolSize:表示核心线程池的大小
      如果当前线程池中线程个数小于它, 新任务到来时就会创建新的线程(即使当前核心线程池有空闲的线程)

    • maximumPoolSize:表示线程池能创建线程的最大个数
      如果当阻塞队列已满时,假如线程池线程个数没有超过 maximumPoolSize 的话,就会创建新的线程来执行任务

    • keepAliveTime:空闲线程存活时间

      超过corePoolSize的空闲线程, 空闲时间如果超过该值就会被销毁

    • unit: 时间单位

    • workQueue:阻塞队列, 用于保存任务的阻塞队列
      可以使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue

    • threadFactory:创建线程的工厂类

    • handler: 饱和策略, 如果线程和线程池阻塞队列都已饱和, 就需要一种策略来处理新来的线程

      • AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecution-Exception异常;
      • CallerRunsPolicy:直接用调用者所在的线程来执行任务;
        也就是说, 让处理请求的线程来执行任务, 这会降低新任务的提交速度,影响整体性能
        对于可伸缩的应用, 建议使用该策略, 因为最大池填满之后, 这个策略会提供一个可伸缩队列, 保存任务
      • DiscardPolicy:不处理直接丢弃掉任务;
      • DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,添加当前任务

    线程池参数 - 如何设计线程池

    主要从任务的性质角度考虑: CPU 密集型任务,IO 密集型任务和混合型任务

    • 任务执行的时间: 长、中、短
    • 任务的优先级:
    • 任务的依赖性:

    不同类型的任务

    • CPU 密集型任务配置尽可能少的线程数量, 可以配置NCPU+1个线程

    • IO 密集型任务则由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2xNCPU

    • 混合型的任务,可以将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务

      只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率
      如果这两个任务执行时间相差太大,则没必要进行分解

    • 优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行
      如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行, 饿死

    • 执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行

    • 依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长 CPU 空闲时间就越长
      所以数据连接池, 线程数应该设置尽量大,才能更好的利用 CPU

    • 阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃

    ThreadPoolExecutor 的数据结构

    使用整型状态数 ctl - 线程池的状态

    通过位切分同时表示线程池状态和活动线程数量, 这样也更容易保证线程安全, 状态位在高3位,活动线程数在低29位, 正常的增减操作就可以控制线程数量

    • COUNT_BITS 为 29
      int RUNNING    = -1 << COUNT_BITS;
      int SHUTDOWN   =  0 << COUNT_BITS;
      int STOP       =  1 << COUNT_BITS;
      int TIDYING    =  2 << COUNT_BITS;
      int TERMINATED =  3 << COUNT_BITS;
      
    • running - 111
      能够接受新任务
    • shutdown - shutdown() - 000
      不接受新任务, 可以处理已加入的任务
    • stop - shutdownNow() - 001
      不接受新任务, 不处理已添加的任务, 中断正在执行的任务
      如何中断? 在安全点中断
    • tidying - 010
      所有的任务都已终止, ctl记录为0
      可以定义钩子函数 terminated()
    • terminated - 011
      线程池彻底终止后转为的彻底终止状态

    worker类 - 可以说是一把封装了线程和任务的锁

    继承了AQS, mainLock&termination 非公平锁, 成员 thread 通过 getThreadFactory().newThread(this) 实例化

    • 成员 firstTask 在实例化时由线程池提供
    • 成员 completedTasks 表明完成任务的数量
    • HashSet:workers 是线程池中所拥有的线程集合

    如何实现线程复用

    • 这个worker有点特殊, 通过工厂产生的线程并不是任务, 而是自身this

    • 那这个worker将如何工作呢? 它将以线程的形式工作, 也就是 run() 方法, run方法会调用外部的方法 runWorker(),

    • 这才是任务被执行的核心, 在runWorker()中, while (task != null || (task = getTask()) != null) 就调用 task.run() 方法来执行任务.

    那么它为什么是个锁? 为什么需要加锁?

    • worker 继承了 AbstractQueuedSynchronizer, 根据其重写的 tryAcquire 和 tryRelease, 可以看出这是一把非公平、独占的非重入锁.

    • worker内置锁在初始化的时候 state为(-1), 只有在worker.start()之后, 才执行worker.unlock()将state++, 将锁置于正常状态, 因为获取锁时都是执行 CAS(0,1) 操作

    • 如果发现state>=0, 说明该worker已经被start(), 已经开始执行任务

    • 其次worker有一个 interruptIfStarted(), state为-1时, 是无法进行中断的, 这应该是出于保护firsttask的目的
      if (getState() >= 0 && (t = thread) != null && !t.isInterrupted())

    • 线程位于sleep中,无论何时都会响应 InterruptedException, 那么对于处于sleep的Worker, 执行shtdownNow()会直接抛出异常, 结束执行

    execute 方法 - 不能保证顺序执行

    1. 比较线程池中线程的数量和core pool size的大小关系

    假如小于核心线程池大小,则调用 addWorker(),创建一个线程执行任务

    • addWorker 会实例化一个 worker 对象加入到 workers 中
    • 还要执行一些检查和cas同步操作, 修改workingCount等
    • 如何实现线程复用? 初次执行是来自于实例化时, 之后则来自于阻塞队列,
      默认是无限定时间的, 如果自己是核心线程, 就会一直从阻塞队列中take任务, 一直阻塞一直存活一直工作
      有限定时间的话, 任务线程从任务队列取任务(出队)时poll()是有最大等待时间的, 如果超时, 就对外返回null, 让当前线程结束
    • Excutors类提供了DefaultThreadFactory
    • ThreadFactory中包含ThreadGroup:group、
    • Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);

    2. 无法直接创建就要进入阻塞队列

    入队成功,我们也要二次检查能否创建一个线程,因为这个过程中可能有线程死亡,还要再检查一次线程池是否要关闭

    如果线程池将要关闭,就 remove(command)&reject(command),如果所有的线程都执行任务完毕(workingCount==0),还要创建新线程

    3. 两种尝试都失败

    如果入队也失败,尝试MAX_POOL_SIZE模式创建线程也失败,就就会 reject(command);

    线程池的关闭

    关闭线程池,可以通过shutdown和shutdownNow这两个方法

    shutdownNow

    首先将线程池的状态设置为STOP,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表;

    • 将运行状态设置为STOP - 修改ctl

    • 执行interruptWorkers() 方法
      for (Worker w : workers) w.interruptIfStarted();
      就像名字一样, 如果worker已经start了, 调用每一个worker线程的 interrupt()

      • 如果线程池状态为STOP,要保证当前线程是中断状态;
        如果不是的话,则要保证当前线程不是中断状态;
        如果是STOP, 需要保证worker在本次任务完成后自行退出
        如果不为STOP, 那么需要保证外界的interrupt不影响内部线程的执行
      • 触发了STOP的interrupted-worker, 会在getTask的时候抛出异常进而执行processWorkerExit()
        为了表明和线程池为空的区别
    • 然后执行tryTerminated()方法
      主要是interruptIdleWorkers()
      和ctl的更新

      • interruptIdleWorkers() 目标是为了终止未在执行任务的worker, 这些worker会阻塞在workingQueue处, 能够成功获取锁, 然后interrupt
      • 更新ctl, 过渡至tidying状态

    shutdown

    shutdown只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程, 会等待线程执行完

    • 区别在于先将状态改为SHUTDOWN
    • 如果workingQueue不为空就不再采取措施, 等待线程池将任务执行完毕之后自行终止
    • 当所有的线程都关闭成功,才表示线程池成功关闭,这时调用isTerminated方法才会返回 true

    线程的死亡

    1. 外部扰动, 调用了 interruptIfStarted() 方法, 在执行阻塞队列的take()方法时异常, 离开循环
    2. 外部扰动, 将运行状态改为 STOP, 变为interrupted, 转情况1
    3. 外部扰动, 线程阻塞在workingQueue的take()方法处, 外部通过interruptIdleWorkers()将线程变为interrupted, 转情况1
    4. 内部消化 (wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty()), 任务为null, 自行终止
    5. task.run() 方法执行过程中出现运行时错误, 异常在循环外被处理, 离开循环
  • 相关阅读:
    0 Explore TreeView
    按钮颜色选择器
    颜色组合框
    Get Files from Directory
    05.0 图片
    WINAPI 变量(2861个)
    为字符串增加50个空格
    让DataGridView显示行号
    相对路径
    SpecialFolder
  • 原文地址:https://www.cnblogs.com/imzhizi/p/thread-pool-executor-source-code-analysis.html
Copyright © 2011-2022 走看看