zoukankan      html  css  js  c++  java
  • Java中的线程池

    线程池

    java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。ExecutorService是一个接口,继承了Executor接口。Executor接口只包含了一个方法:void execute(Runnable command);,该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。jdk默认提供了四种线程池供我们使用。

    大体的继承结构如下:

    一、线程池的状态

    ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

    image-20201018204822162

    状态名 高三位 接受新任务 处理阻塞队列任务 说明
    RUNNING 111 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
    SHUTDOWN 000 不会接收新任务,但会处理阻塞队列剩余任务(调用shutdown()方法)
    STOP 001 会中断正在执行的任务,并抛弃阻塞队列任务(调用shutdownNow()方法)
    TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结
    TERMINATED 011 - - 终结状态

    从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,因为最高位表示符号位,这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。

    状态转换图如下:

    二、ThreadPoolExecutor构造方法

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) 
    
    • corePoolSize: 核心线程数目 (最多保留的线程数),线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会 被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。

    • maximumPoolSize :最大线程数目,一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接执行,如果没有则会缓存到工作队列中,如果工作队列满了,才会创建一个新线程(这就是救急线程),然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建救急线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize的数量减去corePoolSize的数量来确定,最多能达到maximunPoolSize即最大线程池线程数量。

    • keepAliveTime :生存时间 - 针对救急线程

    • unit: 时间单位 - 针对救急线程

    • workQueue :新任务被提交后,如果没有空闲的核心线程就会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中常见的任务队列:

      ①ArrayBlockingQueue

      基于数组的有界阻塞队列,按FIFO排序。有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。

      ②LinkedBlockingQuene

      基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

      ③SynchronousQuene

      一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

      ④PriorityBlockingQueue

      具有优先级的无界阻塞队列,优先级通过参数Comparator实现。

    • threadFactory :线程工厂 - 可以为线程创建时起个好名字

    • handler: 拒绝策略

      ​ 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现:

      image-20201018212537896

      • AbortPolicy策略:抛出 RejectedExecutionException 异常,这是默认策略

        image-20201018212901799

      • CallerRunsPolicy策略: 在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。

        image-20201018213014259

      • DiscardPolicy策略: 放弃本次任务

        image-20201018213333323

      • DiscardOldestPolicy策略: 放弃队列中最早的任务,本任务取而代之

        image-20201018213434194

    三、四种线程池

    Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

    1. public static ExecutorService newFixedThreadPool(int nThreads) : 创建固定数目线程的线程池。

       public static ExecutorService newFixedThreadPool(int nThreads) {
              return new ThreadPoolExecutor(nThreads, nThreads,
                                            0L, TimeUnit.MILLISECONDS,
                                            new LinkedBlockingQueue<Runnable>());
          }
      

      特点:

      • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间。
      • 阻塞队列是无界的,可以放任意数量的任务
      • 适用于任务量已知,相对耗时的任务
    2. public static ExecutorService newCachedThreadPool():创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

       public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
              return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                            60L, TimeUnit.SECONDS,
                                            new SynchronousQueue<Runnable>(),
                                            threadFactory);
          }
      

      特点:

      • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)

      • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)

      • 救急线程可以无限创建。

      • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线

        程。 适合任务数比较密集,但每个任务执行时间较短的情况

    3. public static ExecutorService newSingleThreadExecutor(): 创建一个单线程化的Executor。

        public static ExecutorService newSingleThreadExecutor() {
              return new FinalizableDelegatedExecutorService
                  (new ThreadPoolExecutor(1, 1,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>()));
          }
      

      特点:

      • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

      • 相较于其它三种线程池,newSingleThreadExecutor()创建的是FinalizableDelegatedExecutorService对象,其应用的是装饰者模式 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法。FinalizableDelegatedExecutorService继承自DelegatedExecutorService中的方法都是调用ExecutorService 接口的方法

        image-20201018215704221

      • 适用于希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

    4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

       public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
              return new ScheduledThreadPoolExecutor(corePoolSize);
          }
      
       public ScheduledThreadPoolExecutor(int corePoolSize) {
              super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                    new DelayedWorkQueue());
          }
      

      ScheduledExecutorService接口的方法

      /**
      *指定延迟时间后执行
      */
      public ScheduledFuture<?> schedule(Runnable command,
                                             long delay, TimeUnit unit);
      
      public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                                 long delay, TimeUnit unit);
      
      /**
      *scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完*
      毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
      */
      public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                        long initialDelay,
                                                        long period,
                                                        TimeUnit unit);
      /**
      *scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行。
      */
      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                           long initialDelay,
                                                           long delay,
                                                           TimeUnit unit);
      

    四、ExecutorService的常用方法

    // 执行任务
    void execute(Runnable command);
    
    // 提交任务 task,用返回值 Future 获得任务执行结果
    <T> Future<T> submit(Callable<T> task);
    
    // 提交 tasks 中所有任务,会阻塞,必须等待所有的任务执行完成后统一返回结果
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
    
    // 提交 tasks 中所有任务,带超时时间,这里的超时时间是针对的所有tasks,而不是单个task的超时时间。同样会阻塞,会堵塞,必须等待所有的任务执行完成后统一返回。但是如果超时,会取消没有执行完的所有任务,并抛出超时异常
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,会阻塞,直到返回结果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    
    // 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间,直到返回结果,取消没有执行完的所有任务,并抛出超时异常
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
     throws InterruptedException, ExecutionException, TimeoutException;
    
    
    /*
    线程池状态变为 SHUTDOWN
    - 不会接收新任务
    - 但已提交任务会执行完
    - 不会中断当前运行的线程
    - 此方法不会阻塞调用线程的执行
    */
    void shutdown();
    
    
    /*
    线程池状态变为 STOP
    - 不会接收新任务
    - 会将队列中的任务返回
    - 并用 interrupt 的方式中断正在执行的任务
    */
    List<Runnable> shutdownNow()
        
    // 不在 RUNNING 状态的线程池,此方法就返回 true
    boolean isShutdown();
    
    
    // 线程池状态是否是 TERMINATED
    boolean isTerminated();
    
    
    // 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
    //情,可以利用此方法等待
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    
    
  • 相关阅读:
    混合开发的坑(3) ---关于es6
    混合开发的坑(2) ---pdf展示
    混合开发的坑(1) ---ajax请求
    vue.js
    vue中 import引入组件
    vue中 静态文件引用注意事项
    Oracle 数据库链接
    Oracle中的NVL,NVL2,NULLIF以及COALESCE函数使用
    Merge into 使用
    C# —— IList, ArrayList与List的区别详解
  • 原文地址:https://www.cnblogs.com/myblogstart/p/13848456.html
Copyright © 2011-2022 走看看