zoukankan      html  css  js  c++  java
  • 读《Java并发编程的艺术》学习笔记(十)

    第10章  Executor框架
            Java线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元和执行机制分离开来。工作单元包括Runnable和Callable,而执行机制有Executor框架提供

    10.1  Executor框架简介
        10.1.1  Executor框架的两级调度模型
            在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程。当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。 

            在上层,Java多线程程序通常会把应用分解成若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。 其模型图如下: 

      1、Executor框架的结构 
              Executor框架主要又3大部分组成: 

                  1. 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。 

                  2. 任务的执行:包括任务执行机制的核心接口Executor,及其子接口ExecutorService接口。 Executor框架有两个关节类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)    

                  3. 异步计算的结果:包括接口Future和其实现类FutureTask类。 

        Executor框架的类与接口关系示意图: 

     

      下面是这些类和接口的简介。

              Executor是一个接口,它是Executor框架的基础,它将任务的提交和任务的执行分离开来。

              ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

              ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。SchduledThreadPoolExecutor比Timer更灵活,功能更强大。

              Future接口和实现Future接口的FutureTask类,代表异步计算的结果

              Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或SchduledThreadPoolExecutor执行

        2、Executor框架的成员
            Executor框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors。

            (1)ThreadPoolExecutor

              ThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建3种类型的ThreadPoolExecutor: SingleThreadExecutor、FixedThreadPool、CachedThreadPool。

                  1) FixedThreadPool:创建固定线程数的线程池,构造函数中可以指定线程数量,适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

                    FixedThreadPool内部使用无界队列LinkedBlockingQueue作为任务队列,队列的容量为Integer.MAX_VALUE,由于是无界队列,所以不会拒绝任务,可能会造成任务无限堆积,从而导致系统资源耗尽的情况。

                  2) SingleThreadExecutor:创建单个线程的线程池,可以保证顺序执行任务。与FixedThreadPool类似,只是SingleThreadExecutor的线程数固定为1

                   3) CachedThreadPool:可以根据需要创建新的线程,CachedThreadPool是大小无界的线程池,适用于执行很多短期异步任务的小程序,或者是负载比较轻的服务器。CachedThreadPool的corePool为空,maximumPoolSize为Integer.MAX_VALUE,keepAliveTime为60L,这意味着线程空闲超过60秒则会进行回收。CachedThreadPool内部使用不存储元素的SynchronousQueue作为任务队列(一个put操作等待着一个take操作),这意味着如果任务的提交速度高于线程的处理速度,那么CachedThreadPool则会不断的创建新的线程,在极端的情况下,会耗尽CPU和内存资源。            

            (2) SchduledThreadPoolExecutor

              SchduledThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建2中类型的SchduledThreadPoolExecutor,如下:

                      SchduledThreadPoolExecutor。包含若干个线程的SchduledThreadPoolExecutor。

                      SingleThreadSchduledExecutor。只包含一个线程的SchduledThreadPoolExecutor。

            (3)Future接口

              Future接口和实现Future接口的FutureTask类用来表示异步计算的结果,当我们把Runnable接口或者Callable接口的实现类提交(submit)给ThreadPoolExecutor或者SchduledThreadPoolExecutor时,ThreadPoolExecutor或者SchduledThreadPoolExecutor会向我们返回一个FutureTask对象。下面是对应的API

           (4) Runnable和Callable接口

              Runnable和Callable接口的实现类都可以被hreadPoolExecutor或者SchduledThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。

              除了可以自已创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable。

              当我们把一个Callable对象提交给ThreadPoolExecutor或者SchduledThreadPoolExecutor执行时,summit()会向我们返回一个FutureTask对象。我们可以执行FutureTask.get()来等待任务执行完成。当任务完成后FutureTask.get()将会返回任务的结果。

    10.2  ThreadPoolExecutor详解
            ThreadPoolExecutor是Executor框架最核心的类。主要由corePool(核心线程池大小)、maximumPool(最大线程池大小)、BlockingQueue(工作队列)和RejecteExecutionHandler(饱和策略)构成。 

            ThreadPoolExecutor通常使用Executors来创建。Executors可以创建3中类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

        10.2.1  FixedThreadPool详解 
         FixedThreadPool 被称为可重用固定线程数的线程池。 

      FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。当线程池数大于corePoolSize时,keepAliveTime为多余空闲线程等待新任务的最长时间,超过这个时间多余线程就会被终止。keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。 其运行示意图如下: 

         1>如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。 

                 2>在线程池完成预热之后(当前线程中有一个运行的线程),将任务加入LinkedBlockingQueue。 

                 3>线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。 

              FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响: 

                  a>当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程不会超过corePoolSize。 

                  b>maximumPoolSize变为一个无效参数。 

                  c>keepAliveTime也变为一个无效参数。 

                  d>永远不会执行饱和策略。 

     10.2.2 SingleThreadExecutor详解
            SingleThreadExecutor是使用单个worker线程的Executor。 下面是其源代码: 

        SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。并使用LinkedBlockingQueue无界队列作为工作队列。 其运行示意图如下: 

      

        1>如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。 

              2>在线程池完成预热之后(当前线程中有一个运行的线程),将任务加入LinkedBlockingQueue。 

              3>线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。 

     10.2.3 CachedThreadPool详解
            CacheThreadPool是一个会更加需要创建新线程的线程池。 下面是其源代码: 

       CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool时无界的。keepAliveTime被设置为60L,意味着空闲线程超过60秒后被终止。 

            CachedThread使用没有容量的SynchronousQueue作为线程池的工作队列,但其maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度是,CachedThreadPool会不断创建新的线程,极端情况下,会耗尽CPU和内存资源。 其执行示意图如下: 

          1>首先执行SynchronousQueue.offer(Runnable task)。如果当前有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECODES),则配对成功,将任务交给空闲线程执行。 

                 2>当没有空闲线程时,创建一个新线程执行任务。 

                 3>线程在执行任务完毕后,执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECODES),向队列请求任务,并等待60秒。如果60之后仍没有新任务,则被终止。如果有新任务则继续执行。 

    10.3  ScheduledThreadPoolExecutor 详解
            ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或定期执行任务。 ScheduledThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor:

                1>ScheduledThreadPoolExecutor适用于需要执行周期任务,同时为了满足资源管理的需求而限制后台线程的数量的应用场景。 

                2>SingleThreadScheduledExecutor:只包含一个线程的ScheduledThreadPoolExecutor,适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各任务的应用场景。 

      10.3.1  ScheduledThreadPoolExecutor的运行机制
              其执行示意图如下: 

       DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没什么意义。ScheduledThreadPoolExecutor的执行分为两大部分:

                1>当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()或scheduleWithFixedDelay()时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask。

                2>线程池中的线程从DelayQueue中获取ScheduledFutureTask并执行。 

     10.3.2  ScheduledThreadPoolExecutor的实现 
              ScheduledThreadPoolExecutor把待调度的任务放在一个DelayQueue中。ScheduledFutureTask主要包含3个成员变量: 

                      long型变量 time,表示这个任务将要被执行的具体时间。 

                      long型变量 sequenceNumbe,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。 

                      long型成员变量 period,表示任务执行的间隔周期。 

              DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对对列中的ScheduledFutureTask进行排序。time小的先执行,被排在前面。如果两个任务time相同则比较sequenceNumbe。 

              ScheduledThreadPoolExecutor执行任务的步骤如下: 

         1>线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。 

                    2>线程1执行这个ScheduledFutureTask。 

                    3>线程1修改ScheduledFutureTask的time变量为下次要被执行的时间。 

                    4>线程1把修改time之后的ScheduledFutureTask放入DelayQueue(DelayQueue.add())。 

            以下是DelayQueue.take()方法源码: 

      DelayQueue.add()方法源码: 

    10.4  FutureTask详解
            Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。

         10.4.1  FutureTask 简介
              FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。FutureTask有未启动、已启动、已完成3种状态。 

              FutureTask的状态迁移示意图如下: 

         1>未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。 

              2>已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。 

              3>已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel()),或FutureTask.run()时抛出异常而异常结束,FutureTask处于已完成状态。 

        10.4.2  FutureTask的使用
                下面是FutureTask在不同状态时调用FutureTask.get()及Future.cancel()方法的执行示意图: 

      10.4.3 FutureTask的实现
        FutureTask实现基于AQS.

        FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)。java.util.concurrent中的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。

        AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。

        JDK 6中AQS被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。
        每一个基于AQS实现的同步器都会包含两种类型的操作,如下。
          1)至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。
          2)至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。
        基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。

  • 相关阅读:
    Devops的衍生-腾讯优测
    如何评估软件测试的效率
    优测云服务平台如何破解兼容性测试操作难点
    测试工程师进阶面试题目大合集
    测试人员必看-做好自动化测试的7大技能
    史上最全软件开发|程序员必备的工具集
    腾讯优测优分享 | 高质量产品、高质量照片
    腾讯优测优分享 | 多媒体,多问题
    腾讯优测优分享 | 双卡双待-工程师难言的痛
    C#面向对象基础
  • 原文地址:https://www.cnblogs.com/mYunYu/p/13077111.html
Copyright © 2011-2022 走看看