zoukankan      html  css  js  c++  java
  • 读《Java并发编程的艺术》学习笔记(六)

    第6章  Java并发容器和框架

    6.1   ConcurrentHashMap的实现原理与使用
            ConcurrentHashMap是线程安全且高效的HashMap。

        6.1.1  为什么要使用ConcurrentHashMap
            在并发编程中使用HashMap可能导致程序死循环,而使用线程安全的HashTable效率又非常低,基于以上2个原因,便有了ConcurrentHashMap的出现。

        (1)线程不安全的HashMap
            再多线程环境下,使用HashMap进行put操作时会引起死循环,导致CPU利用率接近100%,原因是多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

        (2)效率低下的HashTable
            HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。

        (3)ConcurrentHashMap的锁分段技术可优先提升并发访问效率
            首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

        6.1.2  ConcurrentHashMap的结构
            ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。

       6.1.3  ConcurrentHashMap的初始化
              ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor、concurrencyLevel几个参数来初始化segments数组、段偏移量segmentShift,段掩码segmentMask和每个segment里的HashEntry数组 。

       1、初始化segments数组  

         

        由上面的代码可知segments数组的长度ssize通过concurrencyLevel计算得出。为了能通过按位与的哈希算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个是大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14,15或16,ssize都会等于16,即容器里锁的个数也是16。 

      2、初始化segmentShift和segmentMask: 
              这两个全局变量在定位segment时的哈希算法里需要使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与hash运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的。segmentMask是哈希运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。 

         3、初始化每个segment: 
              输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。 

      

        上面代码中的变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。 

        【备注】:参数concurrencyLevel是用户估计的并发级别,就是说你觉得最多有多少线程共同修改这个map,根据这个来确定Segment数组的大小,默认为16。 

        6.1.4  定位Segment 
            既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过哈希算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再哈希。

      

       之所以进行再哈希,其目的是为了减少哈希冲突,使元素能够均匀的分布在不同的Segment上,从而提高容器的存取效率。假如哈希的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。

        6.1.5  ConcurrentHashMap的操作 
            本节介绍ConcurrentHashMap的3种操作——get、put、size

          1、get操作
            Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素。

      

      get操作的高效之处在于整个get过程不需要加锁,除非读到值为空才会加锁重读。因为用于统计当前Segment大小的count字段和用于存储值得HashEntry的value都被定义成volatile变量,而在get操作里只需要读不需要写共享变量count和value。在定位元素的代码里我们可以发现,定位HashEntry和定位Segment的散列算法虽然一样,都与数组的长度减去1再相“与”,但是相“与”的值不一样,定位Segment使用的是元素的hashcode通过再散列后得到的值得高位,而定位HashEntry直接使用的是再散列后的值。其目的是避免两次散列后的值一样,虽然元素在Segment里散列开了,但是却没有在HashEntry里散列开。 

        2、put操作
            由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment,然后再Segment里进行插入操作。插入操作需要经历两个步骤,第一步在插入元素之前判断Segment里的HashEntry数组是否需要扩容,如果HashEntry数组超过容量,则创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap只针对某个Segment进行扩容而不是整个容器。第二步定位添加元素的位置,然后将其放在HashEntry数组里。 

        3、size操作 
            如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count虽然被定义为volatile变量,但如果在累加前使用的count发生了变化,那么统计结果就不准了。最安全的做法是在统计size时,锁住所有的Segment的put,remove,clean方法,但显然很低效。 

            ConcurrentHashMap统计size的方法是,尝试2次不锁住Segment的方式来统计各个Segment大小,如果在统计过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。ConcurrentHashMap中modCount变量在调用put,remove和clean方法素前加1,从而来记录容器大小是否发生变化。

    6.2  ConcurrentLinkedQueue
            在并发编程中,如果要实现一个线程安全的队列,则有两种方式: 

            (1) 使用阻塞算法,入队和出队使用锁来控制。 

            (2) 非阻塞算法:即循环CAS方式来实现。 

            ConcurrentLinkedQueue就是使用非阻塞方式实现的基于链接节点的无界线程安全队列。它采用先进先出的规则对节点进行排序。新添加的元素会被添加到对尾,获取元素时,它会返回头部的元素。 

     6.2.1  ConcurrentLinkedQueue的结构
            ConcurrentLinkedQueue的类图如下: 

      

       ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,由此组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。 

        6.2.2 入队列 
          1、入队列的过程
            入队列就是将入队节点添加到队列的尾部。 

      

      入队主要做两件事情:

              1、入队节点设置为当前队列尾节点的下一个节点。 

              2、更新tail节点,如果tail节点的next节点不为空,则将入队节点设置为tail节点,如果tail节点的next为空,则将入队节点设置为tail节点的next节点(注意:此时并未更新tail节点为尾节点)。所以,tail节点并不总是尾节点。 

            如果在单线程中执行没有任何问题,但如果在多线程中可能出现插队的情况。如果有一个线程正在入队,那么首先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这是如果另一个线程插队,则队列尾节点发生变化,当前线程需要暂停入队操作,重新获取新的尾节点。所以使用CAS算法来将入队节点设置为尾节点的next节点: 

      

      2、定位尾节点
        tail节点并不总是尾结点,所以每次入队都必须先通过tail节点来找到尾结点。尾节点可能是tail节点或tail节点的next节点。

        

        如果队列尾节点p与p的next节点都为空,则表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。 

        3、设置入队节点为尾节点 
            p.casNext(null,n)方法用于将入队节点设置为当前队列尾节点的next节点,如果p是null,表示p是当前队列的尾节点,如果不为null,表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。 

        4、HOPS的设计意图
            为什么不保证tail节点总是尾节点 ?

              如果保证tail节点总是尾节点的话,那么入队操作直接通过tail节点定位到尾节点,然后把尾节点next节点更新为新的入队节点,随后更新tail节点为新的尾节点不就可以了吗?但这么做的有一个很明显的缺陷:每次都需要使用循环CAS更新tail节点为尾节点。一定程度上降低了入队的效率。所以在ConcurrentLinkedQueue入队时,并不是每次更新tail节点为尾节点,只有当tail节点和尾节点距离大于等于常量HOPS的值(默认为1)时才会更新tail节点。tail节点与尾节点距离越长,使用CAS更新tail节点次数越少,但每次入队时通过tail节点定位尾节点的时间就越长。但这样仍然可以提升入队效率,因为本质上来看通过增加volatile变量的读操作来减少volatile变量的写操作,而对volatile变量写操作的开销远远大于读操作。 

      

      【备注】:入队方法永远返回true,所以不要通过返回值判断入队是否成功。 

        6.2.3  出队列 
            出队列就是从队列里返回一个节点元素,并清空该节点对元素的引用。 

            以下为从队列获取4个元素的快照图:

      

        从上图可以看出,并不是每次出队列都需要更新head节点为首节点,当head节点为空时,更新head节点为首节点,如果head节点不为空,则直接弹出head节点里的元素,并不会更新head节点为新的首节点。之所以这样设计,同样是为了减少CAS更新head节点从而提高出队效率。

         首先获取首节点,然后判断首节点是否为空,如果为空,则证明另一个线程已经进行了一次出队操作,需要获取新首节点即原首节点的next节点。如果不为空则使用CAS方式将首节点引用设置为null,如果成功,则直接返回首节点的元素,如果不成功,则表示另外一个线程已经进行了一次出队操作并更新了head节点,导致元素发生变化,需要重新获取首节点。

    6.3  Java中的阻塞队列
        6.3.1  什么是阻塞队列? 
          阻塞队列是一个支持两个附加操作的队列。 这两个附加操作支持阻塞的插入和移除方法。

              1)支持阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,直到队列不满。 

              2)支持阻塞的移除方法:在队列为空时,获取元素的线程会等待队列变成非空。 

          阻塞队列常用于生产者和消费者的场景,生产者向队列里添加元素,消费者从队列里取元素。在阻塞队列不可用时(消费者取时,队列为空,生产者添加时,队列已满),这两个附加操作提供了4中处理方式: 

       

      - 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(“Queuefull”)异常。当队列空时,从队列获取元素会抛出NoSuchElmentException异常。 

         - 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。当从队列取元素时,如果没有则返回null。 

         - 一直阻塞:当队列满时,如果生产者线程往队列put元素,则队列会一直阻塞生产者线程直到队列可用或响应中断。当队列空时,如果消费者线程从队列里take元素,队列会阻塞消费者线程直到队列不为空。 

         - 超时退出:当队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。当队列空时,如果消费者线程从队列取元素,队列会阻塞消费者线程一段时间,直到超过指定的时间,消费者线程退出。 

         【备注】:如果是无界阻塞队列,队列不可能出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,永远返回true。 

     6.3.2  Java里的阻塞队列
            JDK7 提供的7个阻塞队列 :

        (1)ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。 

            此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证线程公平的访问队列。但可通过以下代码创建一个公平的阻塞队列: 

       

       为了保证公平性,通常会降低吞吐量,其实现是依靠可重入锁: 

      

     (2)LinkedBlockQueue:一个由链表结构组成的有界阻塞队列。 

              此队列的默认和最大长度为Integer.MAX_VALUE。按照先进先出(FIFO)的原则对元素进行排序。但不保证线程公平的访问队列,也不提供创建公平访问队列的方法。 

        (3)PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。 

              默认情况下元素采用自然顺序升序排序。也可自定义类实现compareTo()方法来执行元素排序规则,或初始化PriorityBlockingQueue时,执行构造参数Comparator来对元素进行排序。但PriorityBlockingQueue不能保证同优先级元素的顺序。 

        (4)DelayQueue:一个使用优先级队列实现的无界阻塞队列。 

              DelayQueue是一个支持演示获取元素的无界阻塞队列。队列使用PriorityQueue实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。 

            DelayQueue可以用在以下场景: 

            1)缓存系统的设计:用来保存缓存元素的有效期。使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。 

            2)定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueued中获取到任务就开始执行,如TimerQueue就是使用DelayQueue实现的。

        (5)SynchronousQueue:一个不存储元素的阻塞队列。 

              此队列每一个put操作必须等待一个take操作,否则不能继续添加元素。默认情况下线程采用非公平性策略访问队列,但可通过以下方法设置以公平策略访问: 

              SynchronousQueue本身不存储任何元素,只是负责把生产者线程处理的数据直接传递给消费者线程。其吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。 

        (6)LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 

              相比其他的阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。 

              transfer方法:如果当前有消费者正在等待接收元素,transfer方法可以把生产者传入的元素立刻transfer给消费者。如果没有消费者等待接受元素,transfer方法会将元素存放在队列的tail节点,直到该元素被消费者消费才返回。 

              tryTransfer方法:tryTransfer方法用来试探生产者传入的元素是否能直接传给消费者消费。如果没有消费者等待接收元素,返回false,反之,返回true。tryTransfer方法会立即返回,而不用等待元素被消费以后才返回。 

              对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没有消费元素,则返回false,如果在超时时间内消费了元素则返回true。 

        (7)LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。 

        LinkedBlockingDeque是一个可以从队列两端插入和移除元素的队列。因为与其他阻塞队列相比,多了一个操作队列的入口,所以在多线程同时入队时,也就减少一半的竞争。 

        6.3.3  阻塞队列的实现原理
              阻塞队列使用通知模式实现,即当生产者往满的队列里添加元素时会阻塞生产者线程,当消费者消费了队列中一个元素后,会通知生产者当前队列可用。 

              以下为ArrayBlockingQueue源码:

        

        当往队列插入一个元素,如果队列不可用,那么阻塞生产者主要通过LockSupport.park(this)来实现。

    6.4  Fork/Join框架 
        6.4.1  Fork/Join框架的定义 
            Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,它可以把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 其运行流程如下: 

      

     6.4.2  工作窃取算法 
              工作窃取算法是指某个线程从其他队列里窃取任务来执行。当我们需要做一个大任务时,可以把这个任务分割成若干互不依赖的子任务,为了减少线程竞争,把子任务分别放入不同队列,并为每个队列创建一个单独的工作线程,假设,有其他线程提前把自己队列任务做完之后,还需要等待其他线程,这时,为了提高效率,已经做完任务的线程会去其他队列窃取任务执行,以帮助其他未完成的线程。此时他们访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列。被窃取任务线程永远从双端队列头部取任务执行,窃取任务线程永远从双端队列尾部拿任务执行。其运行流程图如下:

      

      工作窃取算法优点:就是充分利用线程进行并行计算,减少了线程间的竞争。 

            工作窃取算法缺点:当双端队列里只有一个任务时,还时会存在竞争。而且该算法创建多个线程和多个双端队列,会消耗更过的系统资源。 

        6.4.3  Fork/Join框架的设计 
          Fork/Join框架有以下两个步骤: 

              - 分割任务:首先我们需要有一个fork类来把大任务分割成子任务,如果子任务仍然很大,还需要不停分割,直到分割出的子任务足够小。 

              - 执行任务并合并结果:分割的子任务分别放在双端队列,然后启动线程分别从队列取任务执行,执行完的结果统一放在一个队列里,启动一个线程从队列里拿数据并合并数据。 

          Fork/Join使用两个类来完成上述工作: 

              - ForkJoinTask:要使用ForkJoin框架,必须先创建一个ForkJoin任务。它提供在任务中执行fork和join操作机制,在使用时,我们通常继承ForkJoinTask的子类:RecursiveAction或RecursiveTask,两者区别是RecursiveAction用于没有返回结果的任务,RecursiveTask用于有返回结果的任务。 

              - ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。被分割出来的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部窃取一个任务执行。 

        6.4.4  使用Fork/Join框架
        6.4.5  Fork/Join框架的异常处理 
              ForkJoinTask在执行时可能抛出异常,但我们无法再主线程里捕捉异常,但可以通过ForkJoinTask的isCompletedAbnnormally()方法来检查任务是否已经抛出异常或已经被取消,调用getException()可以获取异常,此方法返回Throwable对象,如果任务取消则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。 

        6.4.6  实现原理 
              ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。 

      (1)ForkJoinTask的fork方法实现原理 

           当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步地执行这个任务,然后立即返回结果。 

        

         pushTask方法把当前任务存放在ForkJoinTask数组队列里,然后调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务。 

        

      (2)ForkJoinTask的join方法实现原理

        Join方法的主要作用是阻塞当前线程并等待获取结果。

        

        首先,它调用doJoin()方法获取当前任务状态,任务有4中状态:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。如果任务状态是已完成,则直接返回任务结果。如果任务被取消,则抛出CancellationException。如果任务抛出异常,则直接抛出对应异常。 

            doJoin()方法源代码: 

        

      在doJoin()方法中,会查看任务状态,如果任务已经执行完成,则直接返回任务状态,如果没有执行完成,则从任务数组里取出任务并执行。如果顺利执行完毕,则将任务状态设置为NORMAL,如果出现异常,则将任务状态设置为EXCEPTION。

  • 相关阅读:
    leetcode Convert Sorted List to Binary Search Tree
    leetcode Convert Sorted Array to Binary Search Tree
    leetcode Binary Tree Level Order Traversal II
    leetcode Construct Binary Tree from Preorder and Inorder Traversal
    leetcode[105] Construct Binary Tree from Inorder and Postorder Traversal
    证明中序遍历O(n)
    leetcode Maximum Depth of Binary Tree
    限制 button 在 3 秒内不可重复点击
    HTML 和 CSS 画三角形和画多边行基本原理及实践
    在线前端 JS 或 HTML 或 CSS 编写 Demo 处 JSbin 与 jsFiddle 比较
  • 原文地址:https://www.cnblogs.com/mYunYu/p/12939855.html
Copyright © 2011-2022 走看看