前言
2014年Java8发布,引入了很多的新特性,其中最具有代表性的就是Lambda表达式、方法引用、函数式接口、Stream API等新特性,而这几个新特性往往都是互相配合使用的,使得编码更加的简洁。
1、新特性简介
1.1、Lambda表达式:
Lambda允许把函数作为一个方法的参数,将函数作为参数传递到方法中,Lambda表达式语法格式为:
(parameters) -> expression 或 (parameters) ->{ statements; }
如下几个简单例子
1.不需要参数,返回值5 :()->5
2.接收一个参数,不返回值 :(x)->System.out.println(x);
3.接收一个参数,返回值参数的2倍数字:x->2*x;
4.接收两个参数,返回两个参数的和:(x,y)-> x+y;
lambda表达式中只能引用外部final类型修饰的变量,或者在lambda表达式后不会再修改的非final类型变量,换句话说lambda表达式中引用的变量在外部就不可改了
1.2、函数式接口
函数式接口(Functional Interface)就是一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口,本质上还是一个接口,只是增加了一个限制而已。
为了将函数式接口和普通接口区分开,通常会在接口定义上添加@FunctionalInterface注解用于标识当前接口为函数式接口,比如常见的Runnable接口就是一个函数式接口,代码如下:
@FunctionalInterface public interface Runnable { public abstract void run(); }
另外为了简化编码,如果接口只定义了一个抽象方法,那么及时没有添加@FuncationInterface注解,编译时该接口也会被标记为上函数式接口;而如果一个接口定义了多个抽象方法,那么及时添加了@FuncationInterface注解也不会被当作是函数式接口,并且编译就无法通过。
1.3、方法引用
方法引用提供了非常有用的语法,可以直接引用已有Java类或对象(实例)的方法或构造器。与lambda联合使用,方法引用可以使语言的构造更紧凑简洁,减少冗余代码。
方法引用通过方法的名字来指向一个方法,方法引用可以使语言的构造更紧凑简洁,减少冗余代码,方法引用使用一对冒号 :: ,案例如下:
1 list.forEach(System.out::println);
通过方法引用的方式直接采用println方法名就可以执行System.out的println方法
1.4、Stream API(java.util.stream)
采用流的方式处理集合数据,把真正的函数式编程风格引入到Java中。可以说Lambda、函数式编程、方法引用只是代码语法上进行了优化,而Stream则是在Lambda、函数式编程、方法引用的基础之上优化了集合的操作,采用流式处理使得集合只需要经过一个遍历就可以执行大量的操作,不仅简化了集合操作而提供了操作的性能。
比如有一个User类,属性包括userName(姓名)、score(分数)、gener(性别),有一个List存储了大量的User对象,那么如果想要从List中找到男生中分数最高的三个人的姓名,此时需要如何实现?
大概需要如下几个步骤:
1、筛选出所有男生;2、将男生按分数进行排序;3、获取前三名男生信息;4、获取男生的姓名
实现代码如下:
List<String> manList = new ArrayList<>(); Collections.sort(list); for (User user : list){ if(user.getGender()==0){ manList.add(user.getName()); } } manList = manList.subList(0,3); System.out.println(JSON.toJSONString(manList));
而如果采用Stream的API那么需要一行代码就可以实现,实现代码如下:
1 list.stream().sorted().filter(i->i.getGender()==0).map(i->i.getName()).limit(3).forEach((u)->System.out.println(u));
这一行代码中就包括了排序、过滤、属性提取、数量提取、遍历打印等一系列操作,大大简化了代码编写,并且全程只需要执行一个遍历操作。
2、Stream的实现原理
Stream最简单的实现方式莫过于每次函数调用都执行一次遍历处理,然后将结果在传递给下一个函数,以上述为例,调用sorted()时先将所有数据进行排序,存储到临时集合list1中,然后调用filter函数时将排好序的list1传入进行遍历过滤处理生成临时集合list2,
调用map函数时再传入list2处理之后生成list3,依次类推直到执行最后一个函数。这种方式简单粗暴,虽然实现比较简单但是有两个缺点,一个缺点是需要执行多次遍历操作,每执行一个函数就需要对所有数据进行遍历一次;还有一个缺点是每个函数执行后都需要创建
一个临时集合存储处理的结果,就需要创建多个临时集合用于存储临时数据。
Stream本质是一个流水线,流水线是特点就是数据只需要遍历一次,每条数据流水线头部流下尾部并进行计算得出结果。
Stream的操作主要分成中间操作和结束操作,中间操作只会对操作进行记录,不会实际处理数据。而结束操作会触发实际的计算操作,是一种惰性计算的方式,这是Stream高效的原因之一。
中间操作又分为有状态操作StatefulOp和无状态操作StatelessOp两种,有状态操作指处理元素时必须拿到所有元素才可以处理,比如sorted()、limit()、distinct()等操作;无状态操作指处理元素时不受之前的元素影响,比如filter()、map()等操作;
结束操作又分为短路操作和非短路操作两种,短路操作指达到某种条件之后就可以结束流水线,不需要处理完全部数据,比如findFirst()、findAny()等操作; 非短路操作必须处理完全部数据才能得到结果,比如collect()、max()、count()、foreach()等操作
Stream流水线中每一个操作都包含了数据源+中间操作+回调函数,可以抽象成一个完整的流程Stage,Stream流水线就是由多个Stage组成,每个Stage分别持有前一个Stage和后一个Stage的引用形成了一个双向链表。
Stream的抽象实现类是AbstractPipeline,AbstractPipeline就可以看作是一个Stage,而第一个Stage就是Head,可以通过Collection.stream()方法获取Head对象,Head也是Stream的实现类,不包含操作,只是流水线的开头。
从Head开始每执行一个中间操作都会产生一个新的Stream,Stream对象以双向链表构成,形成完整的流水线,所以这个双向链表的Stream就完整的记录了源数据和需要执行的所有操作。
通过Stream双向链表可以记录所有的操作,接下来还需要将各个Stream叠加起来,也就是前面的函数执行完了如何去执行下一个函数,每一个Stage都只知道本身的操作是什么,并不知道下一个Stage的具体操作是什么,所以需要有一个串联机制来让前一个操作后能够
调用到下一个操作。此时就用到了Sink接口。
Sink接口定义如下:
1 interface Sink<T> extends Consumer<T> { 2 /** 3 * 开始遍历元素之前调用该方法,通知Sink做好准备 4 */ 5 default void begin(long size) {} 6 7 /** 8 * 所有元素遍历完成之后调用,通知Sink没有更多的元素了 9 */ 10 default void end() {} 11 12 /** 13 * 是否可以结束操作,可以让短路操作尽早结束 14 */ 15 default boolean cancellationRequested() { 16 return false; 17 } 18 19 /** 20 * 遍历元素时调用,接受一个待处理元素,并对元素进行处理。 21 * Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了 22 */ 23 default void accept(int value) { 24 throw new IllegalStateException("called wrong accept method"); 25 } 26 27 }
Stage将自己的操作封装到Sink中,前一个Stage只需要调用后一个Stage对应的Sink的accept方法即可。而对于有状态操作begin和end方法也必须实现,比如sorted操作,begin方法需要创建保存结果的容器,end方法负责对容器的数据进行排序。
对于短路操作cancellationRequested()是必须实现的,一旦cancellationRequested返回true就表示操作已经结束了。所以Stream的核心就是如何实现Sink接口。
Stream流水线整体流程就是从Head开始依次调用下一个Stage对于的Sink的begin、end、accept、cancellationRequested方法,而在accept方法中如果还有下一个Stage,那么还需要在accept方法中继续调用下一个Stage的accept方法。
如排序的Sink实现类源码如下:
1 private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> { 2 private ArrayList<T> list; 3 4 RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { 5 super(sink, comparator); 6 } 7 8 @Override 9 public void begin(long size) { 10 if (size >= Nodes.MAX_ARRAY_SIZE) 11 throw new IllegalArgumentException(Nodes.BAD_SIZE); 12 /** 初始化List */ 13 list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>(); 14 } 15 16 @Override 17 public void end() { 18 /** list排序 */ 19 list.sort(comparator); 20 /** 调用下一个Stream的begin方法 */ 21 downstream.begin(list.size()); 22 if (!cancellationWasRequested) { 23 /** 如果没有短路,就遍历调用下一个Stream的accept方法 */ 24 list.forEach(downstream::accept); 25 } 26 else { 27 for (T t : list) { 28 if (downstream.cancellationRequested()) break; 29 downstream.accept(t); 30 } 31 } 32 /** 调用下一个Stream的end方法 */ 33 downstream.end(); 34 list = null; 35 } 36 37 @Override 38 public void accept(T t) { 39 /** 向list中添加数据*/ 40 list.add(t); 41 } 42 }
1、首先begin()方法告诉Sink参与排序的元素个数,方便确定中间结果容器的的大小;
2、之后通过accept()方法将元素添加到中间结果当中,最终执行时调用者会不断调用该方法,直到遍历所有元素;
3、最后end()方法告诉Sink所有元素遍历完毕,启动排序步骤,排序完成后将结果传递给下游的Sink;
4、如果下游的Sink是短路操作,将结果传递给下游时不断询问下游cancellationRequested()是否可以结束处理。
Sink将多个Stream的操作进行了串联,接下来就需要执行整个流水线的操作,而执行操作是调用结束操作是触发的,结束操作的Sink只需要处理数据即可,不需要再向下传递,而结束操作执行时会就会触发整个流水线的执行。
还有一个问题是结束操作的Sink如何一层一层找到最上层的Sink,此时就用到了AbstractPipeline的onWrapSink方法,该方法的作用是将当前的Stage操作和将结果传递给下游的Stage进行封装成一个新的Sink,相当于将当前操作和下游的Sink合并成新的Sink,
那么最终就可以得到一个包含了所有操作的Sink,而从结束操作开始调用onWrapSink方法,相当于执行结束操作的Sink方法,就相当于执行了流水线上所有Sink的处理逻辑。