累加器--RecursiveTask有返回值
package com.dwz.forkjoin; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.stream.IntStream; /** * 累加器--RecursiveTask有返回值 */ public class ForkJoinRecursiveTask { private final static int MAX_THRESHOLD = 3; private static class CalculatedRecursiveTask extends RecursiveTask<Integer> { private final int start; private final int end; public CalculatedRecursiveTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { if(end - start <= MAX_THRESHOLD) { return IntStream.rangeClosed(start, end).sum(); } else { int middle = (start + end) / 2; CalculatedRecursiveTask leftTask = new CalculatedRecursiveTask(start, middle); CalculatedRecursiveTask rightTask = new CalculatedRecursiveTask(middle + 1, end); leftTask.fork(); rightTask.fork(); return leftTask.join() + rightTask.join(); } } public static void main(String[] args) { final ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Integer> future = forkJoinPool.submit(new CalculatedRecursiveTask(0, 12)); try { Integer result = future.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }
累加器--RecursiveAction无返回值
package com.dwz.forkjoin; import java.util.Optional; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; /** * 累加器--RecursiveAction无返回值 */ public class ForkJoinRecursiveAction { private final static int MAX_THRESHOLD = 3; private final static AtomicInteger SUM = new AtomicInteger(0); private static class CalculateRecursiveAction extends RecursiveAction { private final int start; private final int end; private CalculateRecursiveAction(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { if((start - end) <= MAX_THRESHOLD) { SUM.addAndGet(IntStream.rangeClosed(start, end).sum()); } else { int middle = (start + end)/2; CalculateRecursiveAction leftAction = new CalculateRecursiveAction(start, middle); CalculateRecursiveAction rightAction = new CalculateRecursiveAction(middle + 1, end); leftAction.fork(); rightAction.fork(); } } } public static void main(String[] args) throws InterruptedException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); forkJoinPool.submit(new CalculateRecursiveAction(0, 10)); forkJoinPool.awaitTermination(10, TimeUnit.SECONDS); Optional.of(SUM).ifPresent(System.out::println); } }