ForkJoin
什么是 ForkJoin
ForkJoin 是一个把大任务拆分为多个小任务来分别计算的并行计算框架
ForkJoin 特点:工作窃取
这里面维护的都是双端队列,因此但其中一个线程完成自己的计算任务之后,可以从其他线程任务队列另一端“窃取”任务进行计算,从而提高计算效率!
ForkJoin 执行流程
伪代码:
if(任务数小){
直接计算
}else{
将问题划分为独立的部分
分叉新的子任务来解决每个部分
加入所有子任务进行计算
将子结果进行合并
}
ForkJoinPool: 核心
ForkJoinTask:
RecursiveTask:递归任务
package juc.forkJoin;
import java.util.concurrent.RecursiveTask;
/*
求和计算的任务!
普通求和 ForkJoin Stream并行流
如何使用ForkJoin
1、ForkJoinPool 通过它来执行
2、计算任务 ForkJoinPool.execute(ForkJoinTask task)
3、ForkJoinTask 是一个接口,execute方法传入参数应为 ForkJoinTask 的子类 如 RecursiveTask
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;//开始值
private Long end;//结束值
private Long temp=10000L;//阈值,用于区分是否用ForkJoin来进行划分
public ForkJoinDemo(Long start,Long end){
this.start=start;
this.end=end;
}
@Override
protected Long compute() {
if ((end-start)<=temp){//小于等于阈值,则直接进行计算
Long sum=0L;
for (Long i = start; i <= end; i++) {
sum+=i;
}
return sum;
}else {//大于阈值使用ForkJoin进行划分
//任务拆分点
Long middle=(start+end)/2;
ForkJoinDemo task1 = new ForkJoinDemo(start, middle);
task1.fork();
ForkJoinDemo task2 = new ForkJoinDemo(middle + 1, end);
task2.fork();
return task1.join()+task2.join();
}
}
}
测试类:
package juc.forkJoin;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//计算结果:500000000500000000
test1();
test2();
test3();
test4();
}
//普通方法
public static void test1(){
Long startTime=System.currentTimeMillis();
Long sum=0L;
for (Long i = 1L; i <= 10_0000_0000L; i++) {
sum+=i;
}
Long endTime=System.currentTimeMillis();
System.out.println("计算结果:"+sum);
System.out.println("普通方法耗时:"+(endTime-startTime));
}
//ForkJoin方法
public static void test2() throws ExecutionException, InterruptedException {
Long startTime=System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Long> task = new ForkJoinDemo(1L, 10_0000_0000L);
ForkJoinTask<Long> submit = pool.submit(task);
Long result = submit.get();
Long endTime=System.currentTimeMillis();
System.out.println("计算结果:"+result);
System.out.println("ForkJoin方法耗时:"+(endTime-startTime));
}
//Stream并行流方法
public static void test3(){
Long startTime=System.currentTimeMillis();
//Stream并行流 parallel()并行流 sequential()串行流
OptionalLong reduce = LongStream.rangeClosed(1L, 10_0000_0000L).parallel().reduce(Long::sum);
Long result = reduce.getAsLong();
Long endTime=System.currentTimeMillis();
System.out.println("计算结果:"+result);
System.out.println("Stream并行流方法耗时:"+(endTime-startTime));
}
//Stream串行流方法
public static void test4(){
Long startTime=System.currentTimeMillis();
OptionalLong reduce = LongStream.rangeClosed(1L, 10_0000_0000L).sequential().reduce(Long::sum);
Long result = reduce.getAsLong();
Long endTime=System.currentTimeMillis();
System.out.println("计算结果:"+result);
System.out.println("Stream串行流方法耗时:"+(endTime-startTime));
}
}
运行结果: