zoukankan      html  css  js  c++  java
  • Java 并发之 Executor 框架

    1 前言

    在学习 JUC 的过程中我发现,JUC 这个包下面的文档写的十分的好,清楚又易于理解,这篇博客便是参考 JUC 中和 Executor 框架相关的一些类文档汇总出来的。

    当然了,Executor 框架涉及到的类还是不少的,全部汇总的话时间成本太高,有点亏,所以这里主要就集中在了 Executor 接口及其子接口和具体实现上。

    2 Executor 框架概览

    Executor 框架的起点自然就是 Executor 接口,可以说整个 Executor 框架便是建立在 Executor 接口和其子接口上的,大致结构为:

    其中,接口 Executor 表示定制的 thread-like 子系统,包括线程池、异步 I/O 和轻量级的任务框架。同时,依赖于具体的 Executor 实现,新的任务可能在新线程、已存在的任务执行线程或调用 Executor.execute 方法的线程中执行,不同任务的执行可能是依序执行的或并行执行的。

    而 Executor 的子接口 ExecutorService 提供了更完整的异步任务执行支持,每个 ExecutorService 会对任务的调度进行管理,并且允许受控的 取消 操作。它的子接口 ScheduledExecutorService 还添加了对延迟和定期任务执行的支持。

    由于 Runnable 接口定义的 run 方法是没有返回值的,因此,ExecutorService 还支持通过 Callable 来定义一些异步任务,这些异步任务可以有一个执行结果,我们可以通过相应的 Future 来获取这个结果。

    提交到 ExecutorService 的异步任务通常都会返回一个对应的 Future 对象,我们可以通过这个对象来获取异步任务的执行状态和执行结果,或者取消该任务的执行。

    对于上述的接口来说,Executor 框架提供了一些默认的实现,很多时候,这些实现已经足够我们的使用:

    • Executor 框架提供了两个灵活的 可配置 的线程池实现 ThreadPoolExecutor(ExecutorService) 和 ScheduledThreadPoolExecutor(ScheduledExecutorService)
    • 可以通过 Executors 的工厂方法来创建指定配置的线程池,同时通过一些其他实用的方法来使用它们

    另外,类 ForkJoinPool 也提供了一个 Executor 来处理 ForkJoinTask 及其子类的实例。

    参考:

    3 Executor

    一般来说,我们可以手动创建 Thread 对象来执行 Runnable 任务,但是,在有了 Executor 框架后,更好的选择是将这些异步任务转交给 Executor 的具体实现来执行。

    比如说将 Thread(new(RunnableTask())).start() 替换为:

    Executor executor = anExecutor;
    executor.execute(new RunnableTask1());
    executor.execute(new RunnableTask2());
    ...
    

    当然,我们需要明白的是,不同的 Executor 实现是不一样的,我们提交的异步任务不一定就在别的线程执行,比如下面这样的实现:

    class DirectExecutor implements Executor {
      public void execute(Runnable r) {
        r.run();
      }
    }
    

    但是,Executor 这个接口定义的功能很有限,同时也只支持 Runnale 形式的异步任务:

    void execute(Runnable command);
    

    参考:

    4 ExecutorService

    ExecutorService 为异步任务的执行提供了更多的支持,包括用于 终止 的方法以及可以产生用于跟踪一个或多个异步任务进度的 Future的方法。

    首先,和 Executor 不一样的是,ExecutorService 是可以终止的,当 ExecutorService 终止后,便不会接受新提交的任务。可以通过两个方法来终止 ExecutorService:

    • 通过 shutdown 方法来终止 ExecutorService,允许在执行完先前提交的任务后在终止 ExecutorService
    • 通过 shutdownNow 方法来终止 ExecutorService,会阻止等待的任务启动并尝试停止当前正在执行的任务

    ExecutorService 终止后,就表示 ExecutorService 不存在正在或等待执行的任务,同时,会拒绝新任务的提交,通常应该关闭未使用的 ExecutorService 以便回收资源。

    然后,和 Executor.execute 方法不一样,在 ExecutorService 中可以通过 ExecutorService.submit 方法来提交任务,这个方法会返回与提交的任务相关联的 Future 对象,我们可以通过这个 Future 对象来等待/取消任务的执行,并获取执行结果。还可以通过 invokeAny 和 invokeAll 来提交一组任务,等待其中一个或所有任务的执行。

    同时,相较于只支持 Runnable 的 Executor,ExecutorService 还支持 Callable 形式的异步任务:

    submit(Callable<T> task);
    submit(Runnable task);
    submit(Runnable task, T result);
    

    参考:

    5 ScheduledExecutorService

    接口 ScheduledExecutorService 相较于 ExecutorService 来说添加了对延迟和定期任务执行的支持,还是比较好理解的:

    // 单次延迟任务
    schedule(Callable<V> callable, long delay, TimeUnit unit)
    schedule(Runnable command, long delay, TimeUnit unit)
    // 循环延迟任务
    scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
    scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
    

    参考:

    6 ThreadPoolExecutor

    ThreadPoolExecutor 是 ExecutorService 的一种具体实现,一般情况下我们可以通过 Executors 来创建新的线程池,但是,了解 ThreadPoolExecutor 提供的各配置项还是很有用的,而 ThreadPoolExecutor 文档中对这些配置项给出了很详细的描述。

    Core and maximum pool sizes - 线程池核心线程数和最大线程数,线程池根据这两个参数来自动调整线程池的大小:

    1. 在提交新任务且运行的线程数少于 corePoolSize 时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理新任务
    2. 在运行的线程数大于 corePoolSize 但小于 maximumPoolSize 时,就仅在 队列已满 时才创建新线程
    3. 这两个参数可以在创建线程池时设置,也可以在创建后动态修改

    On-demand construction - ThreadPoolExecutor 默认情况下是在新任务提交后在创建启动线程,但是可以通过覆盖 prestartCoreThread() 或 prestartAllCoreThreads() 改变这一行为,这在初始队列不为空时会很有用。

    Creating new threads - ThreadPoolExecutor 通过 ThreadFactory 来创建新的线程,默认情况下会使用 Executors.defaultThreadFactory(), 这个 ThreadFactory 创建的所有线程拥有相同的 ThreadGroup 和 NORM_PRIORITY 级别的优先级,同时也是非守护线程。

    Keep-alive times - 当当前线程池中的线程数超过 corePoolSize 时,多余的线程将在闲置时间超过 keepAliveTime 时终止。默认情况下参数 keepAliveTime 仅在线程数超过 corePoolSize 时起作用,但是也可以通过 allowCoreThreadTimeOut(boolean) 方法让核心线程在闲置一段时间后也被终止。

    Queuing - 任意的 BlockingQueue 都可以用于传输和保留提交的任务,队列的使用和当前线程池的大小相关:

    • 在当前线程池中的线程数小于 corePoolSize 时,优先创建新的线程
    • 在当前线程池中的线程数超过 corePoolSize 时,优先选择将任务放入队列
    • 在新的任务 无法放入队列 且线程数小于 maximumPoolSize 时,会创建新的线程,否则会拒绝新的任务

    线程池中一般可以选择下面三种队列使用策略:

    • 直接交付,比如说使用 SynchronousQueue 队列,直接将任务传递给工作线程,如果没有合适的工作线程来处理任务,那么就会选择创建新的线程获拒绝任务,这时一般会将 maximumPoolSize 设的大一点
    • 无界队列,比如说使用 LinkedBlockingQueue 队列,这种情况下因为新的任务必然可以放入队列,因此,参数 maximumPoolSize便失去了意义,此时最多只会有 corePoolSize 个线程在运行
    • 有界队列,比如说使用 ArrayBlockingQueue 队列,这时我们可以通过灵活调整 corePoolSizemaximumPoolSize 和队列大小来更加充分的利用线程池

    Rejected tasks - 当 ExecutorService 被关闭或者任务无法放入队列且线程数量超过 maximumPoolSize 时,新任务的提交会被拒绝,这时便会调用 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 来处理被拒绝的任务,可选的处理策略有:

    1. ThreadPoolExecutor.AbortPolicy(default) - 抛出运行时异常 RejectedExecutionException
    2. ThreadPoolExecutor.CallerRunsPolicy - 在调用 executor 的 线程执行该任务
    3. ThreadPoolExecutor.DiscardPolicy - 直接删除忽略新的任务
    4. ThreadPoolExecutor.DiscardOldestPolicy - 如果 ExecutorService 没有被关闭,那么就丢弃队列头的任务重新提交这个任务

    Hook methods - 方法 beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable) 会在每个任务执行的前后调用,也可以覆盖 terminated() 方法在 Executor 终止  执行一些额外的操作。

    Queue maintenance - 可以通过 getQueue() 方法访问工作队列,以进行监视和调试。强烈建议不要将此方法用于任何其他目的,可以通过 remove(Runnable) 和 purge() 清理队列中的任务。

    Finalization - 在线程池不在被引用  没有剩余工作线程时,线程池将会被关闭。可以考虑将 corePoolSize 设小并通过 allowCoreThreadTimeOut(boolean) 保证核心线程闲置久了也会被回收,那么,忘记调用 shutdown 也不要担心资源的浪费了。

    这么多的配置项,如此强大的功能,我只想说,Doug Lea NB(破音)!!!

    附,文档上的一个例子:

    // 可以暂停的线程池
    class PausableThreadPoolExecutor extends ThreadPoolExecutor {
      private boolean isPaused;
      private ReentrantLock pauseLock = new ReentrantLock();
      private Condition unpaused = pauseLock.newCondition();
    
      public PausableThreadPoolExecutor(...) { super(...); }
    
      protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        pauseLock.lock();
        try {
          while (isPaused) unpaused.await();
        } catch (InterruptedException ie) {
          t.interrupt();
        } finally {
          pauseLock.unlock();
        }
      }
    
      public void pause() {
        pauseLock.lock();
        try {
          isPaused = true;
        } finally {
          pauseLock.unlock();
        }
      }
    
      public void resume() {
        pauseLock.lock();
        try {
          isPaused = false;
          unpaused.signalAll();
        } finally {
          pauseLock.unlock();
        }
      }
    }
    

    参考:

    7 ScheduledThreadPoolExecutor

    ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor 并实现了 ScheduledExecutorService 接口,是比 Timer 更好的选择。它使用指定大小的 corePoolSize 和无界队列,因此,参数 maximumPoolSize 没有意义。

    ScheduledThreadPoolExecutor 能够保证任务的执行时间 不早于 指定的时间,但是不能保证一定在那个时间执行。对于指定在同一时间执行的任务,将会按照 FIFO 的规则执行。

    另外,假如在开始执行前任务就已经被取消了,那么 ScheduledThreadPoolExecutor 就不会在执行那个任务,但是默认不会从队列中取出该任务。但是可以通过 setRemoveOnCancelPolicy(boolean) 要求 ScheduledThreadPoolExecutor 在任务取消时就立即从队列中取出该任务。

    参考:

    8 Executors

    Executors 提供了一些针对 ExecutorExecutorServiceScheduledExecutorService 和 ThreadFactory 的实用方法,文档上的话就没有什么好说的了,其实就是一个熟悉接口的问题。

    参考:

    9 结语

    通过文档的阅读对 Executors 有了大致的了解,但是,如果你打开源码就会发现,除了类上面的 Javadoc 以外,在类内部,通常还会有很长一段注释告诉你这个东西它是怎么实现的!!!

    这对于阅读源码的人来说简直太友好了,多的不说,至少思路告诉你了,在看源码就容易多了 ‍(`・ω・´)

  • 相关阅读:
    Linux rm命令详解
    标准时间格式("%Y-%m-%dT%H:%M:%S")转化(基于python 3.6)
    通过load json文件读取json指定数据(基于python 3.6)
    遍历win10文件夹并解析json文件,按照json格式存入mongo数据库(基于python 3.6)
    mongo的备份数据库导入现有数据库
    python 获取网页内容新增网页分类+删除指定后缀数组元素功能(基于python 3.6)
    sqlite3的安装和使用(基于python3.5)
    python 获取提交表单网址内容(即需要密码网址)以财务网站为例
    python 分析PDF文件 (基于使用pdf2htmlEX.exe python3.6)
    python 复制多个文件到指定目录(基于python 3.X)
  • 原文地址:https://www.cnblogs.com/java1024/p/12350692.html
Copyright © 2011-2022 走看看