zoukankan      html  css  js  c++  java
  • 《Java架构师的第一性原理》24Java基础之并发第5篇Java并发编程的艺术

    第1章 并发编程的挑战

    1.1 上下文切换

    即使是单核处理器也支持多线程执行代码,CPU通过给每个线程分配CPU时间片来实现 这个机制。时间片是CPU分配给各个线程的时间,因为时间片非常短,所以CPU通过不停地切 换线程执行,让我们感觉多个线程是同时执行的,时间片一般是几十毫秒(ms)。

    CPU通过时间片分配算法来循环执行任务,当前任务执行一个时间片后会切换到下一个 任务。但是,在切换前会保存上一个任务的状态,以便下次切换回这个任务时,可以再加载这 个任务的状态。所以任务从保存到再加载的过程就是一次上下文切换。

    1)多线程一定快吗?

    2)测试上下文切换次数和时长

    3)如何减少上下文切换

    减少上下文切换的方法有无锁并发编程、CAS算法、使用最少线程和使用协程。

    • 无锁并发编程。多线程竞争锁时,会引起上下文切换,所以多线程处理数据时,可以用一些办法来避免使用锁,如将数据的ID按照Hash算法取模分段,不同的线程处理不同段的数据。
    • CAS算法。Java的Atomic包使用CAS算法来更新数据,而不需要加锁。
    • 使用最少线程。避免创建不需要的线程,比如任务很少,但是创建了很多线程来处理,这样会造成大量线程都处于等待状态。
    • 协程:在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。

    4)减少上下文切换实战

    1.2 死锁

    锁是个非常有用的工具,运用场景非常多,因为它使用起来非常简单,而且易于理解。但同时它也会带来一些困扰,那就是可能会引起死锁,一旦产生死锁,就会造成系统功能不可用。

    避免死锁的几个常见方法:

    • 避免一个线程同时获取多个锁。
    • 避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源。
    • 尝试使用定时锁,使用lock.tryLock(timeout)来替代使用内部锁机制。
    • 对于数据库锁,加锁和解锁必须在一个数据库连接里,否则会出现解锁失败的情况。

    1.3 资源限制的挑战

    1)什么是资源限制

    资源限制是指在进行并发编程时,程序的执行速度受限于计算机硬件资源或软件资源。

    硬件资源限制有带宽的上传/下载速度、硬盘读写速度和CPU的处理速度。软件资源限制有数据库的连接数和socket连接数等。

    2)资源限制引发的问题

    在并发编程中,将代码执行速度加快的原则是将代码中串行执行的部分变成并发执行, 但是如果将某段串行的代码并发执行,因为受限于资源,仍然在串行执行,这时候程序不仅不 会加快执行,反而会更慢,因为增加了上下文切换和资源调度的时间。

    3)如何解决资源限制的问题

    对于硬件资源限制,可以考虑使用集群并行执行程序。

    对于软件资源限制,可以考虑使用资源池将资源复用。比如使用连接池将数据库和Socket 连接复用,或者在调用对方webservice接口获取数据时,只建立一个连接。

    4)在资源限制情况下进行并发编程

    如何在资源限制的情况下,让程序执行得更快呢?方法就是,根据不同的资源限制调整程序的并发度。

    第2章 Java并发机制的底层实现原理 

    Java代码在编译后会变成Java字节码,字节码被类加载器加载到JVM里,JVM执行字节 码,最终需要转化为汇编指令在CPU上执行,Java中所使用的并发机制依赖于JVM的实现和 CPU的指令。 

    2.1 volatile的应用

    volatile在多处理器开发中保证了共享变量的“可见性”。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值。如果volatile变量修饰符使用恰当的话,它比synchronized的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。

    2.2 synchronized的实现原理与应用

    1)Java对象头

    2)锁的升级与对比

    Java SE 1.6中,锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状 态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。 

    2.3 原子操作的实现原理

    原子(atomic)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意 为“不可被中断的一个或一系列操作”。在多处理器上实现原子操作就变得有点复杂。让我们 一起来聊一聊在Intel处理器和Java里是如何实现原子操作的。 

    第3章 Java内存模型 

    本章大致分4部分:

    • Java内存模型的基础,主要介绍内存模型相关的基本概念;
    • Java内存模型中的顺序一致性,主要介绍重排序与顺序一致性内存模型;
    • 同步原语,主要介绍3个同步原语(synchronized、volatile和final)的内存语义及重排序规则在处理器中的实现;
    • Java内存模型的设计,主要介绍Java内存模型的设计原理,及其与处理器内存模型 和顺序一致性内存模型的关系。 

    3.1 Java内存模型的基础

    3.1.1 并发编程模型的两个关键问题 

    在并发编程中,需要处理两个关键问题: 线程之间如何通信及线程之间如何同步(这里的 线程是指并发执行的活动实体)。 

    通信

    通信是指线程之间以何种机制来交换信息。在命令式编程 中,线程之间的通信机制有两种:共享内存和消息传递。 

    • 在共享内存的并发模型里,线程之间共享程序的公共状态,通过写-读内存中的公共状态进行隐式通信。
    • 在消息传递的并发模型里,线程之间没有公共状态,线程之间必须通过发送消息来显式进行通信。 

    同步

    同步是指程序中用于控制不同线程间操作发生相对顺序的机制。

    • 在共享内存并发模型里,同步是显式进行的。程序员必须显式指定某个方法或某段代码需要在线程之间互斥执行。
    • 在消息传递的并发模型里,由于消息的发送必须在消息的接收之前,因此同步是隐式进行的。 

    Java的并发采用的是共享内存模型,Java线程之间的通信总是隐式进行,整个通信过程对程序员完全透明。如果编写多线程程序的Java程序员不理解隐式进行的线程之间通信的工作机制,很可能会遇到各种奇怪的内存可见性问

    题。 

    3.1.2 Java内存模型的抽象结构 

    Java线程之间的通信由Java内存模型(本文简称为JMM)控制,JMM决定一个线程对共享变量的写入何时对另一个线程可见。 

    从抽象的角度来看,JMM定义了线程和主内存之间的抽 象关系:线程之间的共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory),本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM的 一个抽象概念,并不真实

    存在。它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优 化。Java内存模型的抽象示意如图3-1所示。 

     

    3.1.3 从源代码到指令序列的重排序 

    在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。 

    重排序分3种类型。

    • 1)编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
    • 2)指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应 机器指令的执行顺序。
    • 3)内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。 

    从Java源代码到最终实际执行的指令序列,会分别经历下面3种重排序,如图3-3所示。 

    上述的1属于编译器重排序,2和3属于处理器重排序。这些重排序可能会导致多线程程序 出现内存可见性问题。对于编译器,JMM的编译器重排序规则会禁止特定类型的编译器重排 序(不是所有的编译器重排序都要禁止)。对于处理器重排序,JMM的处理器重排序规则会要 求Java编译器在生成指令序列时,插入特定类型的内存屏障(Memory Barriers,Intel称之为 Memory Fence)指令,通过内存屏障指令来禁止特定类型的处理器重排序。 

    JMM属于语言级的内存模型,它确保在不同的编译器和不同的处理器平台之上,通过禁止特定类型的编译器重排序和处理器重排序,为程序员提供一致的内存可见性保证。 

    3.1.4 并发编程模型的分类

    现代的处理器使用写缓冲区临时保存向内存写入的数据。 

    在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。 

    为了保证内存可见性,Java编译器在生成指令序列的适当位置会插入内存屏障指令来禁 止特定类型的处理器重排序。JMM把内存屏障指令分为4类,如表3-3所示。 

    StoreLoad Barriers是一个“全能型”的屏障,它同时具有其他3个屏障的效果。现代的多处 理器大多支持该屏障(其他类型的屏障不一定被所有处理器支持)。执行该屏障开销会很昂 贵,因为当前处理器通常要把写缓冲区中的数据全部刷新到内存中(Buffer Fully Flush)。 

    3.1.5 happens-before简介 

    从JDK 5开始,Java使用新的JSR-133内存模型(除非特别说明,本文针对的都是JSR-133内 存模型)。JSR-133使用happens-before的概念来阐述操作之间的内存可见性。

    与程序员密切相关的happens-before规则如下。 

    • 程序顺序规则:一个线程中的每个操作,happens-before于该线程中的任意后续操作。
    • 监视器锁规则:对一个锁的解锁,happens-before于随后对这个锁的加锁。
    • volatile变量规则:对一个volatile域的写,happens-before于任意后续对这个volatile域的 读。
    • 传递性:如果A happens-before B,且B happens-before C,那么A happens-before C。 

    如图3-5所示,一个happens-before规则对应于一个或多个编译器和处理器重排序规则。对 于Java程序员来说,happens-before规则简单易懂,它避免Java程序员为了理解JMM提供的内存 可见性保证而去学习复杂的重排序规则以及这些规则的具体实现方法。 

    3.2 重排序

    重排序是指编译器和处理器为了优化程序性能而对指令序列进行重新排序的一种手段。 

    3.3 顺序一致性

    3.4 volatile的内存语义

    当声明共享变量为volatile后,对这个变量的读/写将会很特别。为了揭开volatile的神秘面 纱,下面将介绍volatile的内存语义及volatile内存语义的实现。 

    3.5 锁的内存语义

    3.6 final域的内存语义

    对于final域,编译器和处理器要遵守两个重排序规则。

    1)在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用 变量,这两个操作之间不能重排序。

    2)初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能 重排序。 

    3.7 happens-before

    3.8 双重检查锁定与延迟初始化

    3.9 Java内存模型综述

    3.10 本章小结

    第4章 Java并发编程基础

    4.1 线程简介

    1)什么是线程

    现代操作系统在运行一个程序时,会为其创建一个进程。例如,启动一个Java程序,操作系统就会创建一个Java进程。

    线程都拥有各自的计数器、堆栈和局 部变量等属性,并且能够访问共享的内存变量。处理器在这些线程上高速切换,让使用者感觉到这些线程在同时执行。

    一个Java程序从main()方法开始执行,然后按照既定的代码逻辑执行,看似没有其他线程参与,但实际上Java程序天生就是多线程程序,因为执行main()方法的是一个名称为main的线程。

    2)为什么要使用多线程

    使用多线程的原因主要有以下几点:

    • 更多的处理器核心
    • 更快的响应时间
    • 更好的编程模型

    3)线程优先级

    现代操作系统基本采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的时间片用完了就会发生线程调度,并等待着下次分配。

    在Java线程中,通过一个整型成员变量priority来控制优先级,优先级的范围从1~10,在线 程构建的时候可以通过setPriority(int)方法来修改优先级,默认优先级是5,优先级高的线程分配时间片的数量要多于优先级低的线程。

    设置线程优先级时,针对频繁阻塞(休眠或者I/O操 作)的线程需要设置较高优先级,而偏重计算(需要较多CPU时间或者偏运算)的线程则设置较低的优先级,确保处理器不会被独占。

    在不同的JVM以及操作系统上,线程规划会存在差异,有些操作系统甚至会忽略对线程优先级的设定。

    4)线程的状态

    5)Daemon线程

    Daemon线程是一种支持型线程,因为它主要被用作程序中后台调度以及支持性工作。

    4.2 启动和终止线程

    1)构造线程

    在运行线程之前首先要构造一个线程对象,线程对象在构造的时候需要提供线程所需要 的属性,如线程所属的线程组、线程优先级、是否是Daemon线程等信息。

        private void init(ThreadGroup g, Runnable target, String name,
                          long stackSize, AccessControlContext acc) {
            if (name == null) {
                throw new NullPointerException("name cannot be null");
            }
    
            this.name = name.toCharArray();
    
            Thread parent = currentThread();
            SecurityManager security = System.getSecurityManager();
            if (g == null) {
                /* Determine if it's an applet or not */
    
                /* If there is a security manager, ask the security manager
                   what to do. */
                if (security != null) {
                    g = security.getThreadGroup();
                }
    
                /* If the security doesn't have a strong opinion of the matter
                   use the parent thread group. */
                if (g == null) {
                    g = parent.getThreadGroup();
                }
            }
    
            /* checkAccess regardless of whether or not threadgroup is
               explicitly passed in. */
            g.checkAccess();
    
            /*
             * Do we have the required permissions?
             */
            if (security != null) {
                if (isCCLOverridden(getClass())) {
                    security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
                }
            }
    
            g.addUnstarted();
    
            this.group = g;
            this.daemon = parent.isDaemon();
            this.priority = parent.getPriority();
            if (security == null || isCCLOverridden(parent.getClass()))
                this.contextClassLoader = parent.getContextClassLoader();
            else
                this.contextClassLoader = parent.contextClassLoader;
            this.inheritedAccessControlContext =
                    acc != null ? acc : AccessController.getContext();
            this.target = target;
            setPriority(priority);
            if (parent.inheritableThreadLocals != null)
                this.inheritableThreadLocals =
                    ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
            /* Stash the specified stack size in case the VM cares */
            this.stackSize = stackSize;
    
            /* Set thread ID */
            tid = nextThreadID();
        }

    在上述过程中,一个新构造的线程对象是由其parent线程来进行空间分配的,而child线程 继承了parent是否为Daemon、优先级和加载资源的contextClassLoader以及可继承的

    ThreadLocal,同时还会分配一个唯一的ID来标识这个child线程。至此,一个能够运行的线程对 象就初始化好了,在堆内存中等待着运行。

    2)启动线程

    线程对象在初始化完成之后,调用start()方法就可以启动这个线程。线程start()方法的含义 是:当前线程(即parent线程)同步告知Java虚拟机,只要线程规划器空闲,应立即启动调用 start()方

    法的线程。

    3)理解中断

    中断可以理解为线程的一个标识位属性,它表示一个运行中的线程是否被其他线程进行 了中断操作。中断好比其他线程对该线程打了个招呼,其他线程通过调用该线程的interrupt() 方法对其

    进行中断操作。

    线程通过检查自身是否被中断来进行响应,线程通过方法isInterrupted()来进行判断是否 被中断,也可以调用静态方法Thread.interrupted()对当前线程的中断标识位进行复位。如果该线程已

    经处于终结状态,即使该线程被中断过,在调用该线程对象的isInterrupted()时依旧会返 回false。

    从Java的API中可以看到,许多声明抛出InterruptedException的方法(例如Thread.sleep(long millis)方法)这些方法在抛出InterruptedException之前,Java虚拟机会先将该线程的中断标识位清

    除,然后抛出InterruptedException,此时调用isInterrupted()方法将会返回false。

    4)过期的suspend()、resume()和stop()

    通过示例的输出可以看到,suspend()、resume()和stop()方法完成了线程的暂停、恢复和终

    止工作,而且非常“人性化”。但是这些API是过期的,也就是不建议使用的。

    不建议使用的原因主要有:以suspend()方法为例,在调用后,线程不会释放已经占有的资 源(比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。同样,stop()方法在终结 一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会, 因此会导致程序可能工作在不确定状态下。

    注意 正因为suspend()、resume()和stop()方法带来的副作用,这些方法才被标注为不建 议使用的过期方法,而暂停和恢复操作可以用后面提到的等待/通知机制来替代。

    5)安全地终止线程

    在4.2.3节中提到的中断状态是线程的一个标识位,而中断操作是一种简便的线程间交互 方式,而这种交互方式最适合用来取消或停止任务。除了中断以外,还可以利用一个boolean变量来控

    制是否需要停止任务并终止该线程。

    示例在执行过程中,main线程通过中断操作和cancel()方法均可使CountThread得以终止。 这种通过标识位或者中断操作的方式能够使线程在终止时有机会去清理资源,而不是武断地 将线程停止,因此这种终止线程的做法显得更加安全和优雅。

    4.3 线程间通信

    线程开始运行,拥有自己的栈空间,就如同一个脚本一样,按照既定的代码一步一步地执 行,直到终止。但是,每个运行中的线程,如果仅仅是孤立地运行,那么没有一点儿价值,或者说价

    值很少,如果多个线程能够相互配合完成工作,这将会带来巨大的价值。

    4.3.1 volatile和synchronized关键字

    1)volatile

    Java支持多个线程同时访问一个对象或者对象的成员变量,由于每个线程可以拥有这个 变量的拷贝(虽然对象以及成员变量分配的内存是在共享内存中的,但是每个执行的线程还是 可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特 性),所以程序在执行过程中,一个线程看到的变量并不一定是最新的。

    关键字volatile可以用来修饰字段(成员变量),就是告知程序任何对该变量的访问均需要 从共享内存中获取,而对它的改变必须同步刷新回共享内存,它能保证所有线程对变量访问 的可见性。

    但是,过多地使用volatile是不必要的,因为 它会降低程序执行的效率。

    2)synchronized

    关键字synchronized可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程 在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性 和排他性。

    使用了同步块和同步方法,通过使用javap工具查看生成 的class文件信息来分析synchronized关键字的实现细节。

    上面class信息中,对于同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。

    无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个 线程获取到由synchronized所保护对象的监视器。

    任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用 时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步块和同步方法的入口处,进入

    BLOCKED 状态。

    4.3.2 等待通知机制

    一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个 过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模 式隔离了“做什么”(what)和“怎么做”(How),在功能层面上实现了解耦,体系结构上具备了良 好的伸缩性,但是在Java语言中如何实现类似的功能呢?

    简单的办法是让消费者线程不断地循环检查变量是否符合预期。

    但是却存在如下问题。

    • 难以确保及时性。在睡眠时,基本不消耗处理器资源,但是如果睡得过久,就不能及时 发现条件已经变化,也就是及时性难以保证。
    • 难以降低开销。如果降低睡眠的时间,比如休眠1毫秒,这样消费者能更加迅速地发现 条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。

    以上两个问题,看似矛盾难以调和,但是Java通过内置的等待/通知机制能够很好地解决 这个矛盾并实现所需的功能。

    等待/通知的相关方法是任意Java对象都具备的,因为这些方法被定义在所有对象的超类 java.lang.Object上,方法和描述如表4-2所示。

     

    等待/通知机制,是指一个线程A调用了对象O的wait()方法进入等待状态,而另一个线程B 调用了对象O的notify()或者notifyAll()方法,线程A收到通知后从对象O的wait()方法返回,进而 执行后续操作。上述两个线程通过对象O来完成交互,而对象上的wait()和notify/notifyAll()的 关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

    调用wait()、notify()以 及notifyAll()时需要注意的细节,如下。

    • 1) 使用wait()、notify()和notifyAll()时需要先对调用对象加锁。
    • 2) 调用wait()方法后,线程状态由RUNNING变为WAITING,并将当前线程放置到对象的等待队列。
    • 3) notify()或notifyAll()方法调用后,等待线程依旧不会从wait()返回,需要调用notify()或 notifAll()的线程释放锁之后,等待线程才有机会从wait()返回。
    • 4) notify()方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而notifyAll() 方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由WAITING变为 BLOCKED。
    • 5) 从wait()方法返回的前提是获得了调用对象的锁。

    4.3.3 等待/通知经典范式 

    4.3.4 管道输入/输出流

    管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。

    4.3.5 Thread.join()的使用

    如果一个线程A执行了thread.join()语句,其含义是: 当前线程A等待thread线程终止之后才从thread.join()返回。

        public final synchronized void join(long millis)
        throws InterruptedException {
            long base = System.currentTimeMillis();
            long now = 0;
    
            if (millis < 0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
    
            if (millis == 0) {
                while (isAlive()) {
                    wait(0);
                }
            } else {
                while (isAlive()) {
                    long delay = millis - now;
                    if (delay <= 0) {
                        break;
                    }
                    wait(delay);
                    now = System.currentTimeMillis() - base;
                }
            }
        }

    当线程终止时,会调用线程自身的notifyAll()方法,会通知所有等待在该线程对象上的线程。

    4.3.6 ThreadLocal的使用

    ThreadLocal,即线程变量,是一个以ThreadLocal对象为键、任意对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。

    4.4 线程应用实例

    4.4.1 等待超时模式

    开发人员经常会遇到这样的方法调用场景:调用一个方法时等待一段时间(一般来说是给 定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之, 超时返回默认结果。

    4.4.2 一个简单的数据库连接池示例

    数据库连接池的设计也可以复用到其他的资源获取的场景,针对昂贵资源(比如数据库连 接)的获取都应该加以超时限制。

    4.4.3 线程池技术及其示例

    线程池技术能够很好地解决这个问题,它预先创建了若干数量的线程,并且不能由用户 直接对线程的创建进行控制,在这个前提下重复使用固定或较为固定数目的线程来完成任务 的执行。这样

    做的好处是,一方面,消除了频繁创建和消亡线程的系统资源开销,另一方面, 面对过量任务的提交能够平缓的劣化。

    4.4.4 一个基于线程池技术的简单Web服务器

    目前的浏览器都支持多线程访问,比如说在请求一个HTML页面的时候,页面中包含的图 片资源、样式资源会被浏览器发起并发的获取,这样用户就不会遇到一直等到一个图片完全 下载完成才能继续查看文字内容的尴尬情况。

    如果Web服务器是单线程的,多线程的浏览器也没有用武之地,因为服务端还是一个请求 一个请求的顺序处理。因此,大部分Web服务器都是支持并发访问的。常用的Java Web服务器, 如Tomcat、Jetty,在其处理请求的过程中都使用到了线程池技术。

    下面通过使用前一节中的线程池来构造一个简单的Web服务器。

      

    4.5 本章小结

    第5章 Java中的锁

    使用,通过示例演示这些组件的使用方法以及详细介绍与锁相关的API;

    实现,通过分析源码来剖析实现细节,因为理解实现的细节方能更加得心应手且正 确地使用这些组件。

    5.1 Lock接口

    这里先简单介绍一下Lock接口的API,随后的章节会详细介绍同步器 AbstractQueuedSynchronizer以及常用Lock接口的实现ReentrantLock。Lock接口的实现基本都是 通过聚合了一个同步器的子类来完成线程访问控制的。 

    5.2 队列同步器AQS

    1)AQS的接口

    队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获 取线程

    的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。 

    • 同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。 
    • 锁是面向使用者的,它定义了使用者与锁交 互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者, 它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。 

    重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。

    • getState():获取当前同步状态。
    • setState(int newState):设置当前同步状态。
    • compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态 设置的原子性。 

    同步器可重写的方法与描述如表5-3所示。 

    实现自定义同步组件时,将会调用同步器提供的模板方法,这些(部分)模板方法与描述如表5-4所示。 

    同步器提供的模板方法基本上分为3类:

    • 独占式获取与释放同步状态
    • 共享式获取与释放同步状态
    • 查询同步队列中的等待线程情况

    自定义同步组件将使用同步器提供的模板方法 来实现自己的同步语义。 

    2)同步队列的实现原理

    同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取 同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其 加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再 次尝试获取同步状态。 

    同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和 后继节点,节点的属性类型与名称以及描述如表5-5所示。 

     

    节点是构成同步队列(等待队列,在5.6节中将会介绍)的基础,同步器拥有首节点(head) 和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如图5-1所示。 

    在图5-1中,同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。 试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转 而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此

    同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式 与之前的尾节点建立关联。 

    同步器将节点加入到同步队列的过程如图5-2所示。 

    同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态 时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,该过程 如图5-3所示。 

    在图5-3中,设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节 点设置成为原首节点的后继节点并断开原首节点的next引用即可。 

    3)独占式同步状态获取与释放 

    通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同 步队列中移出,该方法代码如代码清单5-3所示。 

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

    上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是: 

    首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法 保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态) 

    并通过addWaiter(Node node) 方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该 节点以“死循环”的方式获取同步状态。 

    如果获取不到则阻塞节点中的线程,而被阻塞线程的 唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。 

    addWaiter方法源码

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }

    acquireQueued方法源码

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    release()方法源码

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }

    4)共享式同步状态获取与释放 

    acquireShared(int arg) 源码

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

    releaseShared()源码

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }

    5.3 重入锁

    5.4 读写锁

    5.5 LockSupport工具

    5.6 Condition接口

    5.7 本章小结

    第6章 Java并发容器和框架

    Java程序员进行并发编程时,相比于其他语言的程序员而言要倍感幸福,因为并发编程大 师Doug Lea不遗余力地为Java开发者提供了非常多的并发容器和框架。

    6.1 ConcurrentHashMap的实现原理与使用

    6.2 ConcurrentLinkedQueue

    在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两 种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。

    • 使用阻塞算法的队列可以用一个锁 (入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。
    • 非阻塞的实现方 式则可以使用循环CAS的方式来实现。

    本节让我们一起来研究一下Doug Lea是如何使用非阻 塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并 发编程的技巧。

    6.3 Java中的阻塞队列

    6.3.1 什么是阻塞队列

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

    • 1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
    • 2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

    在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如表6-1所示。

    • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
    • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移 除方法,则是从队列里取出一个元素,如果没有则返回null。
    • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者 线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队 列会阻塞住消费者线程,直到队列不为空。
    • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程 一段时间,如果超过了指定的时间,生产者线程就会退出。

    这两个附加操作的4种处理方式不方便记忆,所以我找了一下这几个方法的规律。put和 take分别尾首含有字母t,offer和poll都含有字母o。

    注意,如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永 远不会被阻塞,而且使用offer方法时,该方法永远返回true。 

    6.3.2 Java里的阻塞队列

    JDK 7提供了7个阻塞队列,如下。

    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
    • DelayQueue:一个支持延时获取元素、使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。
    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。 

    6.3.3 阻塞队列的实现原理

    如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列 有元素的呢?如果让你来设计阻塞队列你会如何设计,如何让生产者和消费者进行高效率的 通信呢? 

    使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生 产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。通过查看JDK源码 发现ArrayBlockingQueue使用了Condition来实现。

    6.4 Fork/Join框架

    本节将会介绍Fork/Join框架的基本原理、算法、设计方式、应用与实现等。

    6.4.1 什么是Fork/Join框架

    Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

    6.4.2 工作窃取算法

    工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

    工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。

    工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并 且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

    6.4.3 Fork/Join框架的设计

    如果让我们来设计一个Fork/Join框架,该如何设计?

    • 步骤1 分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
    • 步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分 别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程 从队列里拿数据,然后合并这些数据。

    Fork/Join使用两个类来完成以上两件事情。

    (1)ForkJoinTask: 我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。

    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask:用于有返回结果的任务。

    (2)ForkJoinPool: ForkJoinTask需要通过ForkJoinPool来执行。

    任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当 一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任 务。

    6.4.4 使用Fork/Join框架

    6.4.5 Fork/Join框架的异常处理

    ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常, 所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被 取消了,并且可以通过ForkJoinTask的getException方法获取异常。

    6.4.6 Fork/Join框架的实现原理

    ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。

    (1)ForkJoinTask的fork方法实现原理

    当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法 异步地执行这个任务,然后立即返回结果。

        public final ForkJoinTask<V> fork() {
            Thread t;
            if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
                ((ForkJoinWorkerThread)t).workQueue.push(this);
            else
                ForkJoinPool.common.externalPush(this);
            return this;
        }
            final void push(ForkJoinTask<?> task) {
                ForkJoinTask<?>[] a; ForkJoinPool p;
                int b = base, s = top, n;
                if ((a = array) != null) {    // ignore if queue removed
                    int m = a.length - 1;     // fenced write for task visibility
                    U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
                    U.putOrderedInt(this, QTOP, s + 1);
                    if ((n = s - b) <= 1) {
                        if ((p = pool) != null)
                            p.signalWork(p.workQueues, this);
                    }
                    else if (n >= m)
                        growArray();
                }
            }

    (2)ForkJoinTask的join方法实现原理

    Join方法的主要作用是阻塞当前线程并等待获取结果。让我们一起看看ForkJoinTask的join 方法的实现。

        public final V join() {
            int s;
            if ((s = doJoin() & DONE_MASK) != NORMAL)
                reportException(s);
            return getRawResult();
        }

    第7章 Java中的13个原子操作类

    7.1 原子更新基本类型类

    • AtomicBoolean:原子更新布尔类型。
    • AtomicInteger:原子更新整型。
    • AtomicLong:原子更新长整型。

    7.2 原子更新数组

    • AtomicIntegerArray:原子更新整型数组里的元素。
    • AtomicLongArray:原子更新长整型数组里的元素。
    • AtomicReferenceArray:原子更新引用类型数组里的元素。

    7.3 原子更新引用类型

    • AtomicReference:原子更新引用类型。
    • AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
    • AtomicMarkableReference:原子更新带有标记位的引用类型。

    7.4 原子更新字段类

    • AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
    • AtomicLongFieldUpdater:原子更新长整型字段的更新器。
    • AtomicStampedReference:原子更新带有版本号的引用类型。

    7.5 本章小结

    第8章 Java中的并发工具类

    在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和 Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数 据的一种手段。

    8.1 等待多线程完成的CountDownLatch

    8.2 同步屏障CyclicBarrier

    8.3 控制并发线程数的Semaphore

    8.4 线程间交换数据的Exchanger

    第9章 Java中的线程池

    在开发过程中,合理地使用线程池能够带来3个好处。

    • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
    • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。但是,要做到合理利用 线程池,必须对其实现原理了如指掌。 

    9.1 线程池的实现原理

    源码分析:上面的流程分析让我们很直观地了解了线程池的工作原理,让我们再通过源代 码来看看是如何实现的,线程池执行任务的方法如下。 

    public void execute(Runnable command) { if (command == null)
    throw new NullPointerException(); 

    // 如果线程数小于基本线程数,则创建线程并执行当前任务 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

    // 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。 if (runState == RUNNING && workQueue.offer(command)) {   if (runState != RUNNING || poolSize == 0)
        ensureQueuedTaskHandled(command); }
    // 如果线程池不处于运行中或任务无法放入队列,并且当前线程数量小于最大允许的线程数量,则创建一个线程执行任务。 else if (!addIfUnderMaximumPoolSize(command)) // 抛出RejectedExecutionException异常 reject(command); // is shutdown or saturated
    }
    }

    工作线程: 线程池创建线程时,会将线程封装成工作线程Worker,Worker在执行完任务后,还会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点。 

    9.2 线程池的使用

    9.2.1 线程池的创建

    我们可以通过ThreadPoolExecutor来创建一个线程池。

    new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

    创建一个线程池时需要输入几个参数,如下。

    1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线 程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任 务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法, 线程池会提前创建并启动所有基本线程。

    2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几 个阻塞队列。

    • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原 则对元素进行排序。
    • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通 常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用 移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工 厂方法Executors.newCachedThreadPool使用了这个队列。
    • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

    3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并 且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如 果使用了无界的任务队列这个参数就没什么效果。

    4)ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线 程设置有意义的名字,代码如下。

    new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();

    5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状 态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法 处理新任务时抛出异常。在JDK

    1.5中Java线程池框架提供了以下4种策略。

    • AbortPolicy:直接抛出异常。
    • CallerRunsPolicy:只用调用者所在线程来运行任务。
    • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    • DiscardPolicy:不处理,丢弃掉。

    当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录 日志或持久化存储不能处理的任务。

    6)keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以, 如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。

    7)TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒

    (NANOSECONDS,千分之一微秒)。 

    9.2.2 向线程池提交任务

    可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。 

    9.2.3 关闭线程池

    可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务 可能永远无法终止。 

    9.2.4 合理地配置线程池

    要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

    • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
    • 任务的优先级:高、中和低。
    • 任务的执行时间:长、中和短。
    • 任务的依赖性:是否依赖其他系统资源,如数据库连接。 

    性质不同的任务可以用不同规模的线程池分开处理。 

    • CPU密集型任务应配置尽可能小的 线程,如配置Ncpu+1个线程的线程池。 
    • 由于IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。 
    • 可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。 
    • 优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。 
    • 执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让 执行时间短的任务先执行。
    • 依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越 长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。 

    建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点 儿,比如几千。 

    9.2.5 线程池的监控

    要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

    如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根 据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的 时候可以使用以下属性。 

    • taskCount:线程池需要执行的任务数量。
    • completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。
    • largestPoolSize:线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
    • getPoolSize:线程池的线程数量。如果线程池不销毁的话,线程池里的线程不会自动销 毁,所以这个大小只增不减。
    • getActiveCount:获取活动的线程数。

    通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。例如,监控任务的平均执行时间、最大执行时间和最小执行时间等。 这几个方法在线程池里是空方法。 

    protected void beforeExecute(Thread t, Runnable r) { }

    第10章 Executor框架

    Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开 来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

    10.1 Executor框架简介

    1)Executor框架的两级调度模型 

     

    2)Executor框架的结构与成员 

    Executor框架主要由3大部分组成如下。 

    • 任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。
    • 任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的 ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口 (ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
    • 异步计算的结果。包括接口Future和实现Future接口的FutureTask类。 

    主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。 

    10.2 ThreadPoolExecutor详解

    1)FixedThreadPool

    FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。 

    2)SingleThreadExecutor

    SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与 FixedThreadPool相同。 

    3)CachedThreadPool详解

    CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着 CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。 

    CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是无界的。 

    10.3 ScheduledThreadPoolExecutor详解 

    1)ScheduledThreadPoolExecutor的运行机制 

    DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义(设置maximumPoolSize的大小没有什么效果)。 

    ScheduledThreadPoolExecutor的执行主要分为两大部分。

    • 1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith- FixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了 RunnableScheduledFuture接口的ScheduledFutureTask。
    • 2)线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。 

    2)ScheduledThreadPoolExecutor的实现

    ScheduledFutureTask主要包含3个成员变量,如下。

    • long型成员变量time,表示这个任务将要被执行的具体时间。
    • long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。
    • long型成员变量period,表示任务执行的间隔周期。 

    DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的Scheduled- FutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就 是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。 

    10.4 FutureTask详解 

    Future接口和实现Future接口的FutureTask类,代表异步计算的结果。 

    1)FutureTask简介

    根据FutureTask.run()方法被执行 的时机,FutureTask可以处于下面3种状态。

    • 1)未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一 个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
    • 2)已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
    • 3)已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel(...)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。 

    get方法和cancel方法的执行示意图。 

     

    2)FutureTask的使用

    可以把FutureTask交给Executor执行;也可以通过ExecutorService.submit(...)方法返回一个 FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(...)方法。除此以外,还可以单独 使用FutureTask。 

    3)FutureTask的实现(JDK1.7以前)

    http://hg.openjdk.java.net/jdk6/jdk6/jdk/file/5f4bfda58ef8/src/share/classes/java/util/concurrent/FutureTask.java

    FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)。

    java.util.concurrent中 的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。AQS是一个同步框架,它提供通 用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。JDK 6中AQS 被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、 CountDownLatch和FutureTask。 

    每一个基于AQS实现的同步器都会包含两种类型的操作,如下。

    • 至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。
    • 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(...)方法。 

    基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。 

    AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只 需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具 体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这 两个方法来检查和更新同步状态。 

    FutureTask的设计示意图如图10-15所示。 

    FutureTask.get()方法会调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法的执行过程如下。

    • 1)调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实 现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为: state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。
    • 2)如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行 release操作。
    • 3)当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(...))唤醒当前线 程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤 醒它的后继线程(这里会产生级联唤醒的效果,后面会介绍)。
    • 4)最后返回计算的结果或抛出异常。

    FutureTask.run()的执行过程如下。

    • 1)执行在构造函数中指定的任务(Callable.call())。
    • 2)以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置 state为执行完成状态RAN)。如果这个原子操作成功,就设置代表计算结果的变量result的值为 Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。
    • 3)AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执 行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg), 然后唤醒线程等待队列中的第一个线程。
    • 4)调用FutureTask.done()。

    当执行FutureTask.get()方法时,如果FutureTask不是处于执行完成状态RAN或已取消状态 CANCELLED,当前执行线程将到AQS的线程等待队列中等待(见下图的线程A、B、C和D)。当 某个线程执行FutureTask.run()方法或FutureTask.cancel(...)方法时,会唤醒线程等待队列的第一 个线程(见图10-16所示的线程E唤醒线程A)。 

     

    假设开始时FutureTask处于未启动状态或已启动状态,等待队列中已经有3个线程(A、B和

    C)在等待。此时,线程D执行get()方法将导致线程D也到等待队列中去等待。

    当线程E执行run()方法时,会唤醒队列中的第一个线程A。线程A被唤醒后,首先把自己从 队列中删除,然后唤醒它的后继线程B,最后线程A从get()方法返回。线程B、C和D重复A线程 的处理流程。最终,在队列中等待的所有线程都被级联唤醒并从get()方法返回。 

    4)FutureTask的实现(JDK1.7及以后)

    private volatile int state;

    从Java1.7开始Doug Lea对FutureTask进行了重写,重写的原因:

    即为了在需要竞争时保留中断状态,他对之前基于AQS的方式进行了重构,如果题主对AQS比较熟悉,可以看下1.6的FutureTask的实现,其实非常简单。

    而为了维护状态,新版本的FutureTask采用了Treiber Stack - Wikipedia 方式,即通过CAS来维护内部的竞争状态,当然也包括了waiters(队头)的状态。

    理解Future及FutureTask的实现

    第11章 Java并发编程实践

    这个内容移动到计算机思维模型里,取名,解耦思维模型。

    这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些 设计模式的过程中,先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。

    11.1 生产者消费者模式实战

    11.2 线上问题定位

    借助常用的工具定位问题:

    1)使用TOP命令查看每个进程的情况

    2)再使用top的交互命令数字1查看每个CPU的性能数据

    3)使用top的交互命令H查看每个线程的性能信息

    在这里可能会出现3种情况。

    • 第一种情况,某个线程CPU利用率一直100%,则说明是这个线程有可能有死循环,那么请记住这个PID。
    • 第二种情况,某个线程一直在TOP 10的位置,这说明这个线程可能有性能问题。
    • 第三种情况,CPU利用率高的几个线程在不停变化,说明并不是由某一个线程导致CPU 偏高。

    如果是第一种情况,也有可能是GC造成,可以用jstat命令看一下GC情况,看看是不是因 为持久代或年老代满了,产生Full GC,导致CPU利用率持续飙高,命令和回显如下。

    sudo /opt/java/bin/jstat -gcutil 31177 1000 5 
    S0 S1 E O P YGC YGCT FGC FGCT GCT

    还可以把线程dump下来,看看究竟是哪个线程、执行什么代码造成的CPU利用率高。执行 以下命令,把线程dump到文件dump17里。执行如下命令。

    sudo -u admin /opt/taobao/java/bin/jstack 31177 > /home/tengfei.fangtf/dump17

    11.3 性能测试

    1)什么叫性能测试

    因为要支持某个业务,有同事向我们提出需求,希望系统的某个接口能够支持2万的 QPS,因为我们的应用部署在多台机器上,要支持两万的QPS,我们必须先要知道该接口在单 机上能支持多少QPS,如果单机能支持1千QPS,我们需要20台机器才能支持2万的QPS。需要 注意的是,要支持的2万的QPS必须是峰值,而不能是平均值,比如一天当中有23个小时QPS不 足1万,只有一个小时的QPS达到了2万,我们的系统也要支持2万的QPS。

    2)性能测试排查工具

    测试开始后,首先登录到服务器里查看当前有多少台机器在压测服务器,因为程序的端口是12200,所以使用netstat命令查询有多少台机器连接到这个端口上。命令如下。

    netstat -nat | grep 12200 –c

    通过这个命令可以知道已经有10台机器在压测服务器。QPS达到了1400,程序开始报错获 取不到数据库连接,因为我们的数据库端口是3306,用netstat命令查看已经使用了多少个数据 库连接。命令如下。

    netstat -nat | grep 3306 –c

    增加数据库连接到20,QPS没上去,但是响应时长从平均1000毫秒下降到700毫秒,使用 TOP命令观察CPU利用率,发现已经90%多了,于是升级CPU,将2核升级成4核,和线上的机器

    保持一致。再进行压测,CPU利用率下去了达到了75%,QPS上升到了1800。执行一段时间后响 应时长稳定在200毫秒。

    增加应用服务器里线程池的核心线程数和最大线程数到1024,通过ps命令查看下线程数 是否增长了,执行的命令如下。

    ps -eLf | grep java -c

    再次压测,QPS并没有明显的增长,单机QPS稳定在1800左右,响应时长稳定在200毫秒。

    我在性能测试之前先优化了程序的SQL语句。使用了如下命令统计执行最慢的SQL,左边 的是执行时长,单位是毫秒,右边的是执行的语句,可以看到系统执行最慢的SQL是 queryNews和queryNewIds,优化到几十毫秒。

    $ grep Y /home/admin/logs/xxx/monitor/dal-rw-monitor.log |awk -F',' '{print $7$5}' | sort -nr|head -20
    
    1811 queryNews
    1764 queryNews
    1740 queryNews
    1697 queryNews
    679 queryNewIds

    3)性能测试中使用的其他命令

    (1)查看网络流量。

    cat /proc/net/dev

    (2)查看系统平均负载。

    cat /proc/loadavg

    (3)查看系统内存情况。

    cat /proc/meminfo

    (4)查看CPU的利用率。

    cat /proc/stat

    11.4 异步任务池

    11.5 本章小结 

    99 直接读这些牛人的原文

    线程入门

    管程(Moniter): 并发编程的基本心法

    全网讲线程状态最好的文章,没有之一:(低并发编程)Java 线程的状态及转换

    为什么JAVA线程中没有Running状态? 

    sowhat1412:JAVA并发都没搞明白,怎么进大厂?

    10 张图打开 CPU 缓存一致性的大门

    全网最细 | 21张图带你领略集合的线程不安全

    volatile

    低并发编程:volatile 三部曲之可见性

    打工人,从 JMM 透析 volatile 与 synchronized 原理

    26张图带你彻底搞懂volatile关键字

    【死磕Java并发】-----深入分析volatile的实现原理

    synchronized

    关于 Synchronized 的一个点,网上99%的文章都错了

    synchronized 王的后宫总管,线程是王妃

    5000字 | 24张图带你彻底理解Java中的21种锁

    Java Synchronized 重量级锁原理深入剖析上(互斥篇)

    Java Synchronized 重量级锁原理深入剖析下(同步篇)

    工具类

    Java并发必知必会第三弹:用积木讲解ABA原理 | 老婆居然又听懂了!

    程序员深夜惨遭老婆鄙视,原因竟是CAS原理太简单?| 每一张图都力求精美

    LockSupport:一个很灵活的线程工具类

    高性能解决线程饥饿的利器 StampedLock

    AQS

    Java并发之AQS全面详解

    码海:1.5w字,30图带你彻底掌握 AQS!

    程序员cxuan:我画了35张图,就是为了让你深入理解 AQS

    线程池

    全网讲线程池最好的文章,没有之一:(低并发编程)图解 | 你管这破玩意叫线程池?

    Java线程池实现原理及其在美团业务中的实践

    Java架构不脱发 Java并发编程(十二): 线程池

    今天我们不聊原理,能拿来即用的线程池最佳实践

    线程池的7种创建方式,强烈推荐你用它...

    1000个并发线程,10台机器,每台机器4核,设计线程池大小

    两个高频设计类面试题:如何设计HashMap和线程池

    几种常见的线程池及使用场景(讲解参数)

    线程池的种类和使用场景(讲解场景)

    Java多线程与并发系列从0到1全部合集,强烈建议收藏 

    JavaGuide:新手也能看懂的线程池学习总结

     


    作者:沙漏哟
    出处:计算机的未来在于连接
    本文版权归作者和博客园共有,欢迎转载,请留下原文链接
    微信随缘扩列,聊创业聊产品,偶尔搞搞技术
  • 相关阅读:
    CTF -攻防世界-crypto新手区(5~11)
    CTF密码学常见加密解密总结
    CTF -攻防世界-crypto新手区(1~4)
    跨域问题解决
    npm使用记录
    dva控制元素动态消失隐藏
    maven安装记录
    postgreSQL记录
    关于System.__ComObject一些问题
    论文中表格跨页处理
  • 原文地址:https://www.cnblogs.com/yeahwell/p/14564379.html
Copyright © 2011-2022 走看看