对比一下串行流和并行流的效率:
/** * @author WGR * @create 2020/3/31 */ public class Demo07Parallel { private static final int times = 500000000; long start; @Before public void init() { start = System.currentTimeMillis(); } @After public void destory() { long end = System.currentTimeMillis(); System.out.println("消耗时间:" + (end - start)); } // 并行的Stream : 消耗时间:431 @Test public void testParallelStream() { LongStream.rangeClosed(0, times).parallel().reduce(0, Long::sum); } // 串行的Stream : 消耗时间:623 @Test public void testStream() { // 得到5亿个数字,并求和 LongStream.rangeClosed(0, times).reduce(0, Long::sum); } }
我们可以看到parallelStream的效率是最高的。
Stream并行处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作。
下面具体来做一下实验:
@Test public void test0Serial() { long count = Stream.of(4, 5, 3, 9, 1, 2, 6) .filter(s -> { System.out.println(Thread.currentThread() + ", s = " + s); return true; }) .count(); System.out.println("count = " + count); }
并行流:
@Test public void test0Parallel() { long count = Stream.of(4, 5, 3, 9, 1, 2, 6) .parallel() // 将流转成并发流,Stream处理的时候将才去 .filter(s -> { System.out.println(Thread.currentThread() + ", s = " + s); return true; }) .count(); System.out.println("count = " + count); }
获取并行流有两种方式:
直接获取并行流: parallelStream()
将串行流转成并行流: parallel()
并行流的线程安全问题:
@Test public void parallelStreamNotice() { ArrayList<Integer> list = new ArrayList<>(); IntStream.rangeClosed(1, 1000) .parallel() .forEach(i -> { list.add(i); }); System.out.println("list = " + list.size()); }
@Test public void parallelStreamNotice() { ArrayList<Integer> list = new ArrayList<>(); // IntStream.rangeClosed(1, 1000) // .parallel() // .forEach(i -> { // list.add(i); // }); // System.out.println("list = " + list.size()); // 解决parallelStream线程安全问题方案一: 使用同步代码块 Object obj = new Object(); IntStream.rangeClosed(1, 1000) .parallel() .forEach(i -> { synchronized (obj) { list.add(i); } }); System.out.println("list = " + list.size()); }
// parallelStream线程安全问题 @Test public void parallelStreamNotice() { ArrayList<Integer> list = new ArrayList<>(); // IntStream.rangeClosed(1, 1000) // .parallel() // .forEach(i -> { // list.add(i); // }); // System.out.println("list = " + list.size()); // 解决parallelStream线程安全问题方案一: 使用同步代码块 // Object obj = new Object(); // IntStream.rangeClosed(1, 1000) // .parallel() // .forEach(i -> { // synchronized (obj) { // list.add(i); // } // }); // 解决parallelStream线程安全问题方案二: 使用线程安全的集合 Vector<Integer> v = new Vector(); List<Integer> synchronizedList = Collections.synchronizedList(list); IntStream.rangeClosed(1, 1000) .parallel() .forEach(i -> { synchronizedList.add(i); }); System.out.println("list = " + synchronizedList.size()); }
// parallelStream线程安全问题 @Test public void parallelStreamNotice() { List<Integer> collect = IntStream.rangeClosed(1, 1000) .parallel() .boxed() .collect(Collectors.toList()); System.out.println("collect.size = " + collect.size()); }