6.1 ConcurrentHashMap
略(请查看已有的其它笔记)
6.2 ConcurrentLinkedQueue
在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。
使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。(上一章的那个BoundedQueue)
非阻塞的实现方式则可以使用循环CAS的方式来实现。
本节让我们一起来研究一下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的,相信从大师身上我们能学到不少并发编程的技巧。
略(请看其它笔记)
6.3 Java中的阻塞队列
本节将介绍什么是阻塞队列,以及Java中阻塞队列的4种处理方式,并介绍Java7中提供的7种阻塞队列,最后分析阻塞队列的一种实现方式。
6.3.1 什么是阻塞队列
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
1)阻塞的插入方法:当队列满时,队列会阻塞插入元素的线程,知道队列不满
2)阻塞的移除方法:当队列空时,队列会阻塞获取元素的线程,知道队列非空
在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如表6-1所示
抛出异常:当队列满或空时如果再向队列插入元素或获取元素,会抛出异常
返回特殊值:插入成功为true,没有返回则null
一直阻塞:等待中
超时退出:先阻塞后超时退出
6.3.2 Java里的阻塞队列
jdk7提供了7个阻塞队列,如下:
ArrayBlockingQueue: 一个有数组组成的有界阻塞队列
LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列
PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列
DelayQueue:一个使用优先级队列实现的无界阻塞队列
SynchronousDueue:一个不存储元素的阻塞队列
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列
LinkedBlockingDeque:一个由链表结构组成的无向阻塞队列
6.4 Fork/join框架
6.4.1 什么是Fork/join框架
Fork/join 框架是Java7提供的一个用于并行执行任务的框架,是把一个大人物分割成若干个小任务,最终汇总每个小任务的结果后得到大任务结果的框架。
我们再来通过Fork和Join这两个单词来理解一下For/join框架。Fork就是把一个大任务切分为若干子任务并行执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+...+10000,可以分割成10个子任务,每个任务分别对1000个数进行求和,最终汇总这10个子任务的执行结果。Fork/join的运行流程图如下6.6所示。
6.4.2工作窃取算法
工作窃取算法(work-stealing)是指某个线程从其他任务队列里窃取任务来执行。那么,为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程之间的竞争,把这些子任务分别放到不同的队列里,并未每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如线程A负责A队列里的任务。但是有的线程会先把自己队列里的任务做完,而其他线程对应的队列里还有任务等待处理。干完活的线程预期等待,不如帮助其他线程工作,于是它就会去其他线程的队列里窃取一个任务来执行。而在这时,他们会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程用于从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。工作窃取的运行流程如图6-7所示。
6.4.3Fork/join框架的设计
如何来设计一个Fork/join的框架呢?
步骤一:分割任务:不停切分,直到分割出的子任务足够小。
步骤二:执行任务并合并结果:分割的子任务放在双端队列,然后启动多个线程从队列里面拿数据执行任务,子任务执行完成的结果都统一放在一个队列里,启动一个线程从队列里面拿数据,然后合并这些数据。
Fork/join使用两个类来完成上面两件事情
1、ForkJoinTask:使用Fork/join框架,必须首先创建一个ForkJoin任务。他提供在任务中fork()和join()操作的机制。通常情况下,我们不选哟直接继承ForkJoinTask类只需要继承它的子类,它提供了以下两个子类。
RecursiveAction:用于没有返回结果的任务
RecursiveTask:用于有返回结果的任务
2、ForkJoinPool:ForJoinTask需要通过ForkJoinPool来执行。
6.4.4使用Fork/Join框架
下面一个简单的需求,计算 1+2+3+4的结果。
package com.example.demo.test; import java.util.TimerTask; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start,int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if(canCompute) { for(int i=start;i<=end;i++) { sum +=i; } }else { int middle = (start+end)/2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle+1, end); leftTask.fork(); rightTask.fork(); int leftResult = leftTask.join(); int rightResult= rightTask.join(); sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask countTask = new CountTask(1, 1000000); Future<Integer> result = forkJoinPool.submit(countTask); if(countTask.isCompletedAbnormally()) { System.out.println(countTask.getException()); } try { System.out.println(result.get()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ExecutionException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
6.4.5 底层原理
文字描述不太好理解,我们通过示意图来看下任务入队和“工作窃取”的整个过程:
假设现在通过ForkJoinPool的submit方法提交了一个FuturetTask任务,参考使用示例。
初始
初始状态下,线程池中的任务队列为空,workQueues == null
,也没有工作线程:
外部提交FutureTask任务
此时会初始化任务队列数组WorkQueue[]
,大小为2的幂次,然后在某个槽位(偶数位)初始化一个任务队列(WorkQueue
),并插入任务:
注意,由于是非工作线程通过外部方法提交的任务,所以这个任务队列并没有绑定工作线程。
之所以是2的幂次,是由于ForkJoinPool采用了一种随机算法(类似ConcurrentHashMap的随机算法),该算法通过线程池随机数(ThreadLocalRandom的probe值)和数组的大小计算出工作线程所映射的数组槽位,这种算法要求数组大小为2的幂次。
创建工作线程
首次提交任务后,由于没有工作线程,所以会创建一个工作线程,同时在某个奇数槽的位置创建一个与它绑定的任务队列,如下图:
窃取任务
ForkJoinWorkThread_1会随机扫描workQueues中的队列,直到找到一个可以窃取的队列——workQueues[2]
,然后从该队列的base
端获取任务并执行,并将base
加1:
窃取到的任务是FutureTask,ForkJoinWorkThread_1最终会调用它的compute
方法(子类继承ForkJoinTask,覆写compute,参考本文的使用示例),该方法中会新建两个子任务,并执行它们的fork
方法:
@Override
protected Long compute() {
long sum = 0;
if (end - begin + 1 < THRESHOLD) { // 小于阈值, 直接计算
for (int i = begin; i <= end; i++) {
sum += array[i];
}
} else {
int middle = (end + begin) / 2;
ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle);
ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end);
subtask1.fork();
subtask2.fork();
long sum1 = subtask1.join();
long sum2 = subtask2.join();
sum = sum1 + sum2;
}
return sum;
}
之前说过,由于是由工作线程ForkJoinWorkThread_1来调用FutureTask的fork
方法,所以会将这两个子任务放入ForkJoinWorkThread_1自身队列中:
然后,ForkJoinWorkThread_1会阻塞等待任务1和任务2的结果(先在subtask1.join
等待):
long sum1 = subtask1.join();
long sum2 = subtask2.join();
从这里也可以看出,任务放到哪个队列,其实是由调用线程来决定的(根据线程探针值probe计算队列索引)。如果调用线程是工作线程,则必然有自己的队列(task queue),则任务都会放到自己的队列中;如果调用线程是其它线程(如主线程),则创建没有工作线程绑定的任务队列(submissions queue),然后存入任务。
新的工作线程
ForkJoinWorkThread_1调用两个子任务1和2的fork
方法,除了将它们放入自己的任务队列外,还会导致新增一个工作线程ForkJoinWorkThread_2:
ForkJoinWorkThread_2运行后会像ForkJoinWorkThread_1那样从其它队列窃取任务,如下图,从ForkJoinWorkThread_1队列的base
端窃取一个任务(直接执行,并不会放入自己队列):
窃取完成后,ForkJoinWorkThread_2会直接执行任务1,又回到了FutureTask子类的compute
方法,假设此时又fork
出两个任务——任务3、任务4,则ForkJoinWorkThread_2最终会在任务3的join
方法上等待:
如果此时还有其它工作线程,则重复上述步骤:窃取、执行、入队、join阻塞、返回
。ForkJoinTask的join方法内部逻辑非常复杂,上述ForkJoinWorkThread_1和ForkJoinWorkThread_2目前都在等待任务的完成,但事实上,ForkJoinTask存在一种互助机制,即工作线程之间可以互相帮助执行任务,这里不详细展开,只需要知道,ForkJoinWorkThread_1和ForkJoinWorkThread_2可能会被其它工作线程唤醒。
我们这里假设ForkJoinWorkThread_2被其它某个工作线程唤醒,任务3和任务4的join方法依次返回了结果,那么任务1的结果也会返回,于是ForkJoinWorkThread_1也被唤醒(它在任务1的join上等待),然后ForkJoinWorkThread_1会继续执行任务2的join方法,如果任务2不再分解,则最终返回任务1和任务2的合并结果,计算结束。
自身队列的任务执行
ForkJoinWorkThread_1和ForkJoinWorkThread_2唤醒执行完窃取到的任务后,还没有结束,它们还会去看看自身队列中有无任务可以执行。
/**
* Executes the given task and any remaining local tasks.
*/
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
上述ForkJoinPool.WorkQueue.runTask
方法中,doExec()
就是执行窃取的任务,而execLocalTasks
用于执行队列本身的任务。
我们假设此时的线程池是下面这种状态:
工作线程ForkJoinWorkThread_1调用execLocalTasks方法一次性执行自己队列中的所有任务,这时分成两种情况:
1.异步模式(asyncMode==true)
如果构造线程池时,asyncMode为true,表示以异步模式执行工作线程自身队列中的任务,此时会从 base -> top
遍历并执行所有任务。
2.同步模式(asyncMode==false)
如果构造线程池时,asyncMode为false(默认情况),表示以同步模式执行工作线程自身队列中的任务,此时会从 top -> base
遍历并执行所有任务。
任务的入队总是在top
端,所以当以同步模式遍历时,其实相当于栈操作(从栈顶pop元素);
如果是异步模式,相当于队列的出队操作(从base端poll元素)。
异步模式比较适合于那些不需要返回结果的任务。其实如果将队列中的任务看成一棵树(无环连通图)的话,异步模式类似于图的广度优先遍历,同步模式类似于图的深度优先遍历
假设此处以默认的同步模式遍历,ForkJoinWorkThread_1从栈顶开始执行并移除任务,先执行任务2并移除,再执行任务1并:
六、总结
本章简要概述了Fork/Join框架的思想、主要组件及基本使用,Fork/Join框架的核心包含四大组件:ForkJoinTask任务类、ForkJoinPool线程池、ForkJoinWorkerThread工作线程、WorkQueue任务队列。
本章通过示例,描述了各个组件的关系以及ForkJoin线程池的整个调度流程,F/J框架的核心来自于它的工作窃取及调度策略,可以总结为以下几点:
- 每个Worker线程利用它自己的任务队列维护可执行任务;
- 任务队列是一种双端队列,支持LIFO的push和pop操作,也支持FIFO的take操作;
- 任务fork的子任务,只会push到它所在线程(调用fork方法的线程)的队列;
- 工作线程既可以使用LIFO通过pop处理自己队列中的任务,也可以FIFO通过poll处理自己队列中的任务,具体取决于构造线程池时的asyncMode参数;
- 当工作线程自己队列中没有待处理任务时,它尝试去随机读取(窃取)其它任务队列的base端的任务;
- 当线程进入join操作,它也会去处理其它工作线程的队列中的任务(自己的已经处理完了),直到目标任务完成(通过isDone方法);
- 当一个工作线程没有任务了,并且尝试从其它队列窃取也失败了,它让出资源(通过使用yields, sleeps或者其它优先级调整)并且随后会再次激活,直到所有工作线程都空闲了——此时,它们都阻塞在等待另一个顶层线程的调用。