zoukankan      html  css  js  c++  java
  • Java之Stream如何运用?案例详解

     

    Java Stream

    注意:

    • Stream 自己不会存储元素。
    • Stream 不会改变源对象。相反,它会返回一个持有结果得新Stream
    • Stream 操作时延迟执行得,这意味着它们会等到需要结果时才执行。(延迟加载)
      另外注意:在学习Java高级过程中难免会遇到各种问题解决不了。为此我建了个裙 783802103,里面很多架构师一起交流解答,没基础勿进哦!

    Stream 操作步骤

    1. Stream 创建: 一个数据源(集合,数组),获取一个流。
    2. Stream 中间操作: 一个中间操作链,对数据源的数据进行处理。
    3. Stream 终止操作: 一个终止操作,执行中间操作链,并产生结果。

    2 Stream 用法

    2.1 创建Stream

    //1. 通过 Collection.stream() / parallelStream() 创建Stream
    List<String> list = new ArrayList<String>();
    Stream<String> stream11 = list.stream();             // 串行流
    Stream<String> stream12 = list.parallelStream();     // 并行流
    
    //2. 通过 Arrays.stream() 获取数组流
    IntStream stream2 = Arrays.stream(new int[]{1,2});  // 串行流
    
    //3. 通过 Stream.of() 获取流
    Stream<String> stream3 = Stream.of("123", "456");    // 串行流
    
    //4. 创建无限流,需要配合 limit() 截断,不然无限制下去
    Stream<Integer> stream41 = Stream.iterate(2, (x) -> x * 2);   // 串行流
    Stream<Double> stream42 = Stream.generate(Math::random);      // 串行流
    

    2.2 Stream 中间操作

    多个中间操作可以连接起来形成一个流水线,除非流水线上触发终止操作,否则中间操作不会执行任何得处理。而终止操作时一次性全部处理,称为‘延迟加载’

    中间操作(例举部分)说明
    limit(long maxSize) 截断,使其元素不超过给定数量
    filter(Predicate<T> predicate) 过滤,从流中过滤出想要的元素
    skip(long n) 忽略,跳过前n个元素,若流中元素不足n个,则返回空
    distinct() 去重,通过元素 hashCode() 和 equals() 去除重复元素
    map(Funcation<T,R> mapper) 映射,函数会被应用到每个元素上,并将其映射成一个新的元素
    flatMap(Function<T, Stream> mapper) 映射,将流中的每个值都换成一个流,然后把所有流连接成一个流
    sorted() 排序,自然排序
    sorted(Comparator<T> comparator) 排序,定制排序
    // 中间操作:不会执行任何操作
    Stream<Double> stream = Stream.generate(Math::random)   // double 无限流
    .limit(20)                                      // 截断,取前 20 个
    .filter(x -> x > 0.3)                           // 过滤,取大于 0.3 的元素
    .skip(1)                                        // 忽略,丢弃第一个元素
    .distinct()                                     // 去重
    .map(x -> x * 10)                               // 映射,将每个元素扩大 10 倍
    .sorted();                                      // 对 double 流进行排序
    
    // 终止操作,只有执行终止操作才会执行全部。即:延迟加载
    stream.forEach(System.out::println);
    
    // 中间操作:flatMap 接收一个函数作为参数,将流中的每个值都换成一个流,然后把所有流连接成一个流
    List<String> list = Arrays.asList("aaa", "bbb", "ccc", "ddd");
    list.stream().flatMap((e) -> filterCharacter(e)).forEach(System.out::println);
    
    //如果使用map则需要这样写
    list.stream().map((e) -> filterCharacter(e)).forEach((e) -> {
        e.forEach(System.out::println);
    });
    
    public Stream<Character> filterCharacter(String str){
        List<Character> list = new ArrayList<>();
        for (Character ch : str.toCharArray()) {
            list.add(ch);
        }
        return list.stream();
    }
    

    2.3 Stream 终止操作

    2.3.1 查找与匹配

    操作(例举部分)说明
    allMatch(Predicate<T> predicate) 检查是否匹配所有元素
    anyMatch(Predicate<T> predicate) 检查是否至少匹配所有元素
    noneMatch(Predicate<T> predicate) 检查是否没有匹配所有元素
    findFirst() 返回第一个元素
    findAny() 返回当前流中任意元素
    count() 返回流中元素总个数
    max(Comparator<T> comparator) 返回流中最大值
    min(Comparator<T> comparator) 返回流中最小值

    2.3.2 规约 - 将流中元素结合在一起,返回一个值

    操作(例举部分)说明
    reduce(T identitty,BinaryOperator<T>) 需要传一个起始值,然后,传入的是一个二元运算
    reduce(BinaryOperator<T>) 没有起始值,有可能结果为空,所以返回的值会被封装到Optional中
    // 求和
    List<Integer> list = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    Integer sum = list.stream().reduce(0, (x, y) -> x + y);
    
    // 求和,没有起始值,则有可能结果为空,所以返回的值会被封装到Optional中
    List<Integer> list = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    Optional<Integer> sum = list.stream().reduce(Integer :: sum);
    

    2.3.3 收集

    将流转换为其他形式。接收一个Collector接口的实现,用于给Stream中元素做汇总的方法。Collector接口方法的实现决定了如何对流执行收集操作(如收集到List,Set,Map)。但是Collectors实用类提供了很多静态方法,可以方便地创建常见得收集器实例。

    操作(例举部分)说明
    Collectors.toList() 将流转换成List
    Collectors.toSet() 将流转换为Set
    Collectors.toCollection(Supplier<T> supplier) 将流转换为其他类型的集合
    Collectors.counting() 元素个数
    Collectors.averagingInt/Long/Double(Function<T,R> function) 平均数,不同之处在于传入得参数类型不同,返回值都为Double
    Collectors.summingInt/Long/Double(Function<T,R> function) 求和,不同之处在于传入得参数类型不同,返回值为Integer, Double, Long
    Collectors.maxBy(Comparator<T> comparator) 最大值
    Collectors.minBy(Comparator<T> comparator) 最小值
    Collectors.groupingBy(Function<T,R> function) 分组,返回Map
    Collectors.partitioningBy(Predicate<T> predicate) 分区,传入函数返回true和false 分成两个区,返回Map

    3 并行流

    并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。Java8中将并行流进行了优化,我们可以很容易的对数据进行并行操作。Stream API可以声明性地通过parallel()与scqucntial()在并行流与顺序流之间进行切换。

    3.1 Fork-Join 框架

    Fork—Join框架:是java7提供得一个用于执行任务得框架,就是在必要得情况下,将一个大任务,进行拆分(Fork)成若干个小任务(拆分到不能再拆分),再将一个个的小任务运算得结果进行join汇总。

    Fork—Join框架时ExecutorService接口得一种具体实现,目的是为了帮助更好地利用多处理器带来得好处。它是为那些能够被递归地拆分成子任务的工作类型量身设计的。起目的在于能够使用所有有可用的运算能力来提升你的应用的性能。 

    关于 Fork-Join 实现原理请看这篇:图解Fork/Join https://mp.weixin.qq.com/s/OzZFGW_8GBYHUa0Ef10WVg

    /**
     * 要想使用Fark—Join,类必须继承RecursiveAction(无返回值)或者 RecursiveTask(有返回值)
     *
     * 计算从 start 到 end 的数字累加
     */
    public class ForkJoin extends RecursiveTask<Long> {
    
        private long start; // 起始数字
        private long end;   // 结束数字
    
        public ForkJoin(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
        // 拆分的最小区间
        private static final long THRESHOLD = 10000L;
    
        @Override
        protected Long compute() {
            // 当区间小于最小区间时,直接计算累加
            if (end - start <= THRESHOLD) {
                long sum = 0;
                for (long i = start; i < end; i++) {
                    sum += i;
                }
                return sum;
            } else { // 否则,将区间一分为二,分给两个不同的线程去计算
                // 注意这里,如果有问题,会抛出java.lang.NoClassDefFoundError: Could not initialize class java.util.concurrent.locks.AbstractQueuedSynchronizer$Node 异常
                long middle = start + (end - start) / 2;
    
                ForkJoin left = new ForkJoin(start, middle);    // 递归,直到分解到最小区间后,开始计算
                left.fork();    // 拆分子任务,压入线程队列
    
                ForkJoin right = new ForkJoin(middle, end);     // 递归,直到分解到最小区间后,开始计算
                right.fork();   // 拆分子任务,压入线程队列
    
                // 合并两部分计算的值
                return left.join() + right.join();
            }
        }
    
        public static void main(String[] args) {
            // 开始时间
            Instant start = Instant.now();
    
            // 这里需要一个线程池的支持
            ForkJoinPool pool = new ForkJoinPool();
    
            // 累加到 1 亿
            ForkJoinTask<Long> task = new ForkJoin(0, 100000000L);
    
            long sum = pool.invoke(task);
    
            // 结束时间
            Instant end = Instant.now();
    
            System.out.println(String.format("累加到1亿的计算时间为:%s 毫秒,值:%s", Duration.between(start, end).toMillis(), sum));
        }
    }
    

    3.2 并行流对 Fork-Join 的简化

    //开始时间
    Instant start = Instant.now();
    
    long sum = LongStream.rangeClosed(0, 1000000000L)  // 创建0-1亿的数字串行流
    	.parallel()                                 // 转换为并行流,使用Fort-Join框架,缺省使用ForkJoinPool.commonPool()线程池
    	.reduce(0, Long :: sum);        // 规约计算所有元素累加
    
    //结束时间
    Instant end = Instant.now();
    System.out.println(String.format("累加到1亿的计算时间为:%s 毫秒,值:%s", Duration.between(start, end).toMillis(), sum));
    

    3.3 并行流的性能

    性能测试请看:Stream Performance https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/8-Stream%20Performance.md 此处引用结论:

    * 对于简单操作,比如最简单的遍历,Stream串行API性能明显差于显示迭代,但并行的Stream API能够发挥多核特性。
    * 对于复杂操作,Stream串行API性能可以和手动实现的效果匹敌,在并行执行时Stream API效果远超手动实现。
    
    所以,如果出于性能考虑,
    1. 对于简单操作推荐使用外部迭代手动实现,
    2. 对于复杂操作,推荐使用Stream API, 
    3. 在多核情况下,推荐使用并行Stream API来发挥多核优势,
    4. 单核情况下不建议使用并行Stream API。
    
    如果出于代码简洁性考虑,使用Stream API能够写出更短的代码。
    即使是从性能方面说,尽可能的使用Stream API也另外一个优势,
    那就是只要Java Stream类库做了升级优化,代码不用做任何修改就能享受到升级带来的好处。

    最后注意:在学习Java高级过程中难免会遇到各种问题解决不了。为此我建了个裙 783802103,里面很多架构师一起交流解答,没基础勿进哦!
    本文的文字及图片来源于网络加上自己的想法,仅供学习、交流使用,不具有任何商业用途,版权归原作者所有,如有问题请及时联系我们以作处理

  • 相关阅读:
    阿里巴巴微服务开源项目盘点(持续更新)
    云计算、大数据、编程语言学习指南下载,100+技术课程免费学!这份诚意满满的新年技术大礼包,你Get了吗?
    【机器学习PAI实战】—— 玩转人工智能之综述
    泡沫下的破浪者,智能语音产品到底落地何处?
    我们总结了每个技术团队都会遇到的 4 个难题
    在 Ali Kubernetes 系统中,我们这样实践混沌工程
    云上护航服务—保障云上的尖峰时刻
    本地 vs. 云:大数据厮杀的最终幸存者会是谁?— InfoQ专访阿里云智能通用计算平台负责人关涛
    WAF开放规则定义权:专家策略+用户自定义策略=Web安全
    队列的其本应用_迷官问题
  • 原文地址:https://www.cnblogs.com/chengxuyuanaa/p/12942201.html
Copyright © 2011-2022 走看看