zoukankan      html  css  js  c++  java
  • 第6章 Java并发容器和框架

      6.1 ConcurrentHashMap

        略(请查看已有的其它笔记)

      6.2 ConcurrentLinkedQueue

        在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法

        使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。(上一章的那个BoundedQueue)

        非阻塞的实现方式则可以使用循环CAS的方式来实现。

        本节让我们一起来研究一下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并发编程的技巧。

      ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现,该算法在Michael&Scott算法上进行了一些修改

        略(请看其它笔记)

      6.3 Java中的阻塞队列

        本节将介绍什么是阻塞队列,以及Java中阻塞队列的4种处理方式,并介绍Java7中提供的7种阻塞队列,最后分析阻塞队列的一种实现方式。

        6.3.1 什么是阻塞队列

        阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

        1)阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,知道队列不满

        2)阻塞的移除方法:当队列空时,队列会阻塞获取元素的线程,知道队列非空

        在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如表6-1所示

          

         抛出异常:当队列满或空时如果再向队列插入元素或获取元素,会抛出异常

        返回特殊值:插入成功为true,没有返回则null

        一直阻塞:等待中

        超时退出:先阻塞后超时退出

        6.3.2 Java里的阻塞队列

        jdk7提供了7个阻塞队列,如下:

          ArrayBlockingQueue: 一个有数组组成的有界阻塞队列

          LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列

          PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列

          DelayQueue:一个使用优先级队列实现的无界阻塞队列

          SynchronousDueue:一个不存储元素的阻塞队列

          LinkedTransferQueue:一个由链表结构组成的无界阻塞队列

          LinkedBlockingDeque:一个由链表结构组成的无向阻塞队列

    6.4 Fork/join框架

      6.4.1 什么是Fork/join框架

        Fork/join 框架是Java7提供的一个用于并行执行任务的框架,是把一个大人物分割成若干个小任务,最终汇总每个小任务的结果后得到大任务结果的框架。

    我们再来通过Fork和Join这两个单词来理解一下For/join框架。Fork就是把一个大任务切分为若干子任务并行执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+...+10000,可以分割成10个子任务,每个任务分别对1000个数进行求和,最终汇总这10个子任务的执行结果。Fork/join的运行流程图如下6.6所示。

                  

      6.4.2工作窃取算法

        工作窃取算法(work-stealing)是指某个线程从其他任务队列里窃取任务来执行。那么,为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程之间的竞争,把这些子任务分别放到不同的队列里,并未每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如线程A负责A队列里的任务。但是有的线程会先把自己队列里的任务做完,而其他线程对应的队列里还有任务等待处理。干完活的线程预期等待,不如帮助其他线程工作,于是它就会去其他线程的队列里窃取一个任务来执行。而在这时,他们会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程用于从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。工作窃取的运行流程如图6-7所示。

                        

     6.4.3Fork/join框架的设计

      如何来设计一个Fork/join的框架呢?

      步骤一:分割任务:不停切分,直到分割出的子任务足够小。

      步骤二:执行任务并合并结果:分割的子任务放在双端队列,然后启动多个线程从队列里面拿数据执行任务,子任务执行完成的结果都统一放在一个队列里,启动一个线程从队列里面拿数据,然后合并这些数据。

      Fork/join使用两个类来完成上面两件事情

        1、ForkJoinTask:使用Fork/join框架,必须首先创建一个ForkJoin任务。他提供在任务中fork()和join()操作的机制。通常情况下,我们不选哟直接继承ForkJoinTask类只需要继承它的子类,它提供了以下两个子类。

          RecursiveAction:用于没有返回结果的任务

          RecursiveTask:用于有返回结果的任务

        2、ForkJoinPool:ForJoinTask需要通过ForkJoinPool来执行。

              

      6.4.4使用Fork/Join框架

        下面一个简单的需求,计算 1+2+3+4的结果。

    package com.example.demo.test;
    
    import java.util.TimerTask;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.Future;
    import java.util.concurrent.RecursiveTask;
    
    public class CountTask extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 2;
        private int start;
        private int end;
        
        public CountTask(int start,int end) {
            this.start = start;
            this.end = end;
        }
    
        @Override
        protected Integer compute() {
            int sum = 0;
            boolean canCompute = (end - start) <= THRESHOLD;
            if(canCompute) {
                for(int i=start;i<=end;i++) {
                    sum +=i;
                }
            }else {
                int middle = (start+end)/2;
                CountTask leftTask = new CountTask(start, middle);
                CountTask rightTask = new CountTask(middle+1, end);
                leftTask.fork();
                rightTask.fork();
                int leftResult = leftTask.join();
                int rightResult= rightTask.join();
                sum = leftResult + rightResult;
            }
            return sum;
        }
        
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            CountTask countTask = new CountTask(1, 1000000);
            Future<Integer> result = forkJoinPool.submit(countTask);
            if(countTask.isCompletedAbnormally()) {
                System.out.println(countTask.getException());
            }
            try {
                System.out.println(result.get());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    
    }

      6.4.5 底层原理

      

    文字描述不太好理解,我们通过示意图来看下任务入队和“工作窃取”的整个过程:

    假设现在通过ForkJoinPool的submit方法提交了一个FuturetTask任务,参考使用示例

    初始

    初始状态下,线程池中的任务队列为空,workQueues == null,也没有工作线程:

    clipboard.png


    外部提交FutureTask任务

    此时会初始化任务队列数组WorkQueue[],大小为2的幂次,然后在某个槽位(偶数位)初始化一个任务队列(WorkQueue),并插入任务:

    clipboard.png

    注意,由于是非工作线程通过外部方法提交的任务,所以这个任务队列并没有绑定工作线程。

    之所以是2的幂次,是由于ForkJoinPool采用了一种随机算法(类似ConcurrentHashMap的随机算法),该算法通过线程池随机数(ThreadLocalRandom的probe值)和数组的大小计算出工作线程所映射的数组槽位,这种算法要求数组大小为2的幂次。

    创建工作线程

    首次提交任务后,由于没有工作线程,所以会创建一个工作线程,同时在某个奇数槽的位置创建一个与它绑定的任务队列,如下图:

    clipboard.png


    窃取任务

    ForkJoinWorkThread_1会随机扫描workQueues中的队列,直到找到一个可以窃取的队列——workQueues[2],然后从该队列的base端获取任务并执行,并将base加1:

    clipboard.png

    窃取到的任务是FutureTask,ForkJoinWorkThread_1最终会调用它的compute方法(子类继承ForkJoinTask,覆写compute,参考本文的使用示例),该方法中会新建两个子任务,并执行它们的fork方法:

    @Override
    protected Long compute() {
        long sum = 0;
     
        if (end - begin + 1 < THRESHOLD) {      // 小于阈值, 直接计算
            for (int i = begin; i <= end; i++) {
                sum += array[i];
            }
        } else {
            int middle = (end + begin) / 2;
            ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle);
            ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end);
     
            subtask1.fork();
            subtask2.fork();
     
            long sum1 = subtask1.join();
            long sum2 = subtask2.join();
     
            sum = sum1 + sum2;
        }
        return sum;
    }

    之前说过,由于是由工作线程ForkJoinWorkThread_1来调用FutureTask的fork方法,所以会将这两个子任务放入ForkJoinWorkThread_1自身队列中:

    clipboard.png

    然后,ForkJoinWorkThread_1会阻塞等待任务1和任务2的结果(先在subtask1.join等待):

      long sum1 = subtask1.join();
      long sum2 = subtask2.join();
    从这里也可以看出,任务放到哪个队列,其实是由调用线程来决定的(根据线程探针值probe计算队列索引)。如果调用线程是工作线程,则必然有自己的队列(task queue),则任务都会放到自己的队列中;如果调用线程是其它线程(如主线程),则创建没有工作线程绑定的任务队列(submissions queue),然后存入任务。

    新的工作线程

    ForkJoinWorkThread_1调用两个子任务1和2的fork方法,除了将它们放入自己的任务队列外,还会导致新增一个工作线程ForkJoinWorkThread_2:

    clipboard.png

    ForkJoinWorkThread_2运行后会像ForkJoinWorkThread_1那样从其它队列窃取任务,如下图,从ForkJoinWorkThread_1队列的base端窃取一个任务(直接执行,并不会放入自己队列):

    clipboard.png

    窃取完成后,ForkJoinWorkThread_2会直接执行任务1,又回到了FutureTask子类的compute方法,假设此时又fork出两个任务——任务3、任务4,则ForkJoinWorkThread_2最终会在任务3的join方法上等待:

    clipboard.png

    如果此时还有其它工作线程,则重复上述步骤:窃取、执行、入队、join阻塞、返回。ForkJoinTask的join方法内部逻辑非常复杂,上述ForkJoinWorkThread_1和ForkJoinWorkThread_2目前都在等待任务的完成,但事实上,ForkJoinTask存在一种互助机制,即工作线程之间可以互相帮助执行任务,这里不详细展开,只需要知道,ForkJoinWorkThread_1和ForkJoinWorkThread_2可能会被其它工作线程唤醒。

    我们这里假设ForkJoinWorkThread_2被其它某个工作线程唤醒,任务3和任务4的join方法依次返回了结果,那么任务1的结果也会返回,于是ForkJoinWorkThread_1也被唤醒(它在任务1的join上等待),然后ForkJoinWorkThread_1会继续执行任务2的join方法,如果任务2不再分解,则最终返回任务1和任务2的合并结果,计算结束。


    自身队列的任务执行

    ForkJoinWorkThread_1和ForkJoinWorkThread_2唤醒执行完窃取到的任务后,还没有结束,它们还会去看看自身队列中有无任务可以执行。

    /**
     * Executes the given task and any remaining local tasks.
     */
    final void runTask(ForkJoinTask<?> task) {
        if (task != null) {
            scanState &= ~SCANNING; // mark as busy
            (currentSteal = task).doExec();
            U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
            execLocalTasks();
            ForkJoinWorkerThread thread = owner;
            if (++nsteals < 0)      // collect on overflow
                transferStealCount(pool);
            scanState |= SCANNING;
            if (thread != null)
                thread.afterTopLevelExec();
        }
    }

    上述ForkJoinPool.WorkQueue.runTask方法中,doExec()就是执行窃取的任务,而execLocalTasks用于执行队列本身的任务。

    我们假设此时的线程池是下面这种状态:

    clipboard.png

    工作线程ForkJoinWorkThread_1调用execLocalTasks方法一次性执行自己队列中的所有任务,这时分成两种情况:

    1.异步模式(asyncMode==true)

    如果构造线程池时,asyncMode为true,表示以异步模式执行工作线程自身队列中的任务,此时会从 base -> top遍历并执行所有任务。

    2.同步模式(asyncMode==false)

    如果构造线程池时,asyncMode为false(默认情况),表示以同步模式执行工作线程自身队列中的任务,此时会从 top -> base 遍历并执行所有任务。

    任务的入队总是在top端,所以当以同步模式遍历时,其实相当于栈操作(从栈顶pop元素);
    如果是异步模式,相当于队列的出队操作(从base端poll元素)。
    异步模式比较适合于那些不需要返回结果的任务。其实如果将队列中的任务看成一棵树(无环连通图)的话,异步模式类似于图的广度优先遍历,同步模式类似于图的深度优先遍历

    假设此处以默认的同步模式遍历,ForkJoinWorkThread_1从栈顶开始执行并移除任务,先执行任务2并移除,再执行任务1并:

    clipboard.png

    clipboard.png

    六、总结

    本章简要概述了Fork/Join框架的思想、主要组件及基本使用,Fork/Join框架的核心包含四大组件:ForkJoinTask任务类、ForkJoinPool线程池、ForkJoinWorkerThread工作线程、WorkQueue任务队列。

    本章通过示例,描述了各个组件的关系以及ForkJoin线程池的整个调度流程,F/J框架的核心来自于它的工作窃取及调度策略,可以总结为以下几点:

      1. 每个Worker线程利用它自己的任务队列维护可执行任务;
      2. 任务队列是一种双端队列,支持LIFO的pushpop操作,也支持FIFO的take操作;
      3. 任务fork的子任务,只会push到它所在线程(调用fork方法的线程)的队列;
      4. 工作线程既可以使用LIFO通过pop处理自己队列中的任务,也可以FIFO通过poll处理自己队列中的任务,具体取决于构造线程池时的asyncMode参数;
      5. 当工作线程自己队列中没有待处理任务时,它尝试去随机读取(窃取)其它任务队列的base端的任务;
      6. 当线程进入join操作,它也会去处理其它工作线程的队列中的任务(自己的已经处理完了),直到目标任务完成(通过isDone方法);
      7. 当一个工作线程没有任务了,并且尝试从其它队列窃取也失败了,它让出资源(通过使用yields, sleeps或者其它优先级调整)并且随后会再次激活,直到所有工作线程都空闲了——此时,它们都阻塞在等待另一个顶层线程的调用。

    参考文章:https://segmentfault.com/a/1190000016781127

  • 相关阅读:
    hdu 1057 (simulation, use sentinel to avoid boudary testing, use swap trick to avoid extra copy.) 分类: hdoj 2015-06-19 11:58 25人阅读 评论(0) 收藏
    hdu 1053 (huffman coding, greedy algorithm, std::partition, std::priority_queue ) 分类: hdoj 2015-06-18 19:11 22人阅读 评论(0) 收藏
    hdu 1052 (greedy algorithm) 分类: hdoj 2015-06-18 16:49 35人阅读 评论(0) 收藏
    hdu 1051 (greedy algorithm, how a little modification turn 15ms to 0ms) 分类: hdoj 2015-06-18 12:54 29人阅读 评论(0) 收藏
    hdu 1050 (preinitilization or postcleansing, std::fill) 分类: hdoj 2015-06-18 11:33 34人阅读 评论(0) 收藏
    hdu 1047 (big integer sum, fgets or scanf, make you func return useful infos) 分类: hdoj 2015-06-18 08:21 39人阅读 评论(0) 收藏
    hdu 1041 (OO approach, private constructor to prevent instantiation, sprintf) 分类: hdoj 2015-06-17 15:57 25人阅读 评论(0) 收藏
    hdu 1039 (string process, fgets, scanf, neat utilization of switch clause) 分类: hdoj 2015-06-16 22:15 38人阅读 评论(0) 收藏
    hdu 1036 (I/O routines, fgets, sscanf, %02d, rounding, atoi, strtol) 分类: hdoj 2015-06-16 19:37 32人阅读 评论(0) 收藏
    查漏补缺(一)
  • 原文地址:https://www.cnblogs.com/helloworldmybokeyuan/p/11752683.html
Copyright © 2011-2022 走看看