理解Stream.collect()方法
/**伪代码如下
* <pre>{@code
* R result = supplier.get();
* for (T element : this stream)
* accumulator.accept(result, element);
* return result;
* }</pre>
* <pre>{@code
* List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add,
* ArrayList::addAll);
* }</pre>
* Supplier:生产者,也是返回的结果类型
* accumulator:将流中的元素添加到Supplier中
* BiConsumer:合并两个Supplier,
*/
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
/**
* Collector封装了#collect(Supplier, BiConsumer, BiConsumer)参数
* terminal operation
* 即使使用非线程安全的数据结构,也没有线程安全问题,如ArrayList
* List<String> asList = stringStream.collect(Collectors.toList());
* Map<String, List<Person>> peopleByCity
* = personStream.collect(Collectors.groupingBy(Person::getCity));
* Map<String, Map<String, List<Person>>> peopleByStateAndCity
* = personStream.collect(Collectors.groupingBy(Person::getState, Collectors.groupingBy(Person::getCity)));
*/
<R, A> R collect(Collector<? super T, A, R> collector);
Collector接口
public interface Collector<T, A, R> {
//该接口只有get().相对于new一个容器对象,在后面的聚合操作中使用
Supplier<A> supplier();
//void accept(T t, U u);理解为累加操作,即不断的将值累加到supplier()返回的对象中
BiConsumer<A, T> accumulator();
//容器运算,一般在并行流中,生成的多个容器之间的运算
BinaryOperator<A> combiner();
Function<A, R> finisher();
Set<Characteristics> characteristics();
}
实现1:toList
Collector<Person, ?, List<Person>> personListCollector = Collectors.toList();
//源码
public static <T> Collector<T, ?, List<T>> toList() {
return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
(left, right) -> { left.addAll(right); return left; },
CH_ID);
}
Supplier :
new ArrayList()
作为后面添加数据的容器BiConsumer:
List::add
,调用ArrayList对象的add方法添加元素BinaryOperator:
(left, right) -> { left.addAll(right); return left;
即ArrayList.addAll(ArrayList)
示例
public static void main(String[] args) {
ArrayList<Person> list=new ArrayList<>();
list.add(new Person("1",20));
list.add(new Person("2",22));
list.add(new Person("3",24));
list.add(new Person("4",26));
list.add(new Person("5",28));
list.add(new Person("6",20));
list.add(new Person("7",18));
list.add(new Person("8",16));
ArrayList<Person> newList = list.stream().collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
System.out.println("--newList--");
newList.stream().forEach(System.out::println);
}
//输出
Person{name='1'}
Person{name='2'}
Person{name='3'}
Person{name='4'}
Person{name='5'}
Person{name='6'}
Person{name='7'}
Person{name='8'}
//替换collect语句
ArrayList<Object> newList = list.stream().collect(ArrayList::new, ArrayList::add, ArrayList::add);
//输出
--newList--
Person{name='1'}
Person{name='2'}
Person{name='3'}
Person{name='4'}
Person{name='5'}
Person{name='6'}
Person{name='7'}
Person{name='8'}
//替换collect语句
ArrayList<Object> newList = list.stream().parallel().collect(ArrayList::new, ArrayList::add, ArrayList::add);
//输出
Person{name='1'}
[Person{name='2'}]
[Person{name='3'}, [Person{name='4'}]]
[Person{name='5'}, [Person{name='6'}], [Person{name='7'}, [Person{name='8'}]]]
//替换collect语句
ArrayList<Object> newList = list.stream().parallel().collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
//输出
--newList--
Person{name='1'}
Person{name='2'}
Person{name='3'}
Person{name='4'}
Person{name='5'}
Person{name='6'}
Person{name='7'}
Person{name='8'}
<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner); ArrayList<Person> newList = list.stream().collect(ArrayList::new, ArrayList::add, ArrayList::add); //将ArrayList泛型指定为Person后,第二个ArrayList::add编译器报错,必须改为addAll
第三个参数必须是集合与集合的运算,一般在parallelStream中使用
实现2:toMap
public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends U> valueMapper) {
return new CollectorImpl<>(HashMap::new,
uniqKeysMapAccumulator(keyMapper, valueMapper),
uniqKeysMapMerger(),
CH_ID);
}
private static <T, K, V>
BiConsumer<Map<K, V>, T> uniqKeysMapAccumulator(Function<? super T, ? extends K> keyMapper,
Function<? super T, ? extends V> valueMapper) {
return (map, element) -> {
K k = keyMapper.apply(element);
V v = Objects.requireNonNull(valueMapper.apply(element));
V u = map.putIfAbsent(k, v);
if (u != null) throw duplicateKeyException(k, u, v);
};
}
简单理解就是,将流中的数据根据Function映射到map中的key和value ,返回HashMap对象
示例
@Test
public void testCollectMap (){
ArrayList<Person> list = getData();
Map<String, Integer> mapCollect = list.stream()
.collect(Collectors.toMap(n -> n.getName() + "-test", n -> n.getAge() * 2));
mapCollect.forEach((k, v) -> System.out.println(k + "==" + v));
}
实现3:groupingby
public static <T, K> Collector<T, ?, Map<K, List<T>>>
groupingBy(Function<? super T, ? extends K> classifier) {
return groupingBy(classifier, toList());
}
public static <T, K, A, D>
Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
Collector<? super T, A, D> downstream) {
return groupingBy(classifier, HashMap::new, downstream);
}
public static <T, K, D, A, M extends Map<K, D>>
Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Collector<? super T, A, D> downstream) {
Supplier<A> downstreamSupplier = downstream.supplier();
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
//最主要理解下面这段代码
BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
downstreamAccumulator.accept(container, t);
};
BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
@SuppressWarnings("unchecked")
Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
}
else {
@SuppressWarnings("unchecked")
Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
Function<Map<K, A>, M> finisher = intermediate -> {
intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
@SuppressWarnings("unchecked")
M castResult = (M) intermediate;
return castResult;
};
return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
}
}
从上面两个例子我们知道,不管怎么变,最后都要转换为:
Supplier
BiConsumer
BinaryOperator
从下文示例返回的的数据类型是:
Collector<Person, ?, Map<Integer, List<Person>>> collector = Collectors.groupingBy(Person::getAge);
理解为,传入的操作元素是Person对象,最后返回一个map。map是按年龄分组,key对应年龄,value对应等于该年龄下的person的list对象。
所以这里要构建两个容器对象,List和Map,或者理解为建立在Collectors.toList()的基础上
理解一下代码:
BiConsumer<Map<K, A>, T> accumulator = (m, t) -> { //根据分组函数计算出Map中的KEY K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key"); //获取map中key对应的容器对象,如果没有这调用相应的Supplier //像上面的示例,就是new ArrayList() A container = m.computeIfAbsent(key, k -> downstreamSupplier.get()); //累加到第一个容器对象中,示例是List::add downstreamAccumulator.accept(container, t); };
这个BiConsumer已经可以说明是如何进行分组的了。
示例
@Test
public void test (){
Collector<Person, ?, Map<Integer, List<Person>>> collector = Collectors.groupingBy(Person::getAge);
Map<Integer, List<Person>> collect = getData().stream().collect(collector);
collect.forEach((k,v)-> System.out.println(k+"=="+v));
}
private ArrayList<Person> getData() {
ArrayList<Person> list = new ArrayList<>();
list.add(new Person("1", 20));
list.add(new Person("2", 22));
list.add(new Person("3", 24));
list.add(new Person("4", 26));
list.add(new Person("5", 28));
list.add(new Person("6", 20));
list.add(new Person("7", 28));
list.add(new Person("8", 26));
return list;
}
joining
先了解StringJoiner,之前拼接字符串一直不知道有这个方法
@Test
public void testJoiner (){
List<String> list = Arrays.asList("1", "2", "3");
StringJoiner joiner = new StringJoiner(",","[","]");
StringJoiner joiner2 = new StringJoiner("-","(",")");
list.forEach(joiner::add);
System.out.println(joiner.toString());
list.forEach(joiner2::add);
System.out.println(joiner2.toString());
StringJoiner merge = joiner.merge(joiner2);
System.out.println(merge.toString());
}
[1,2,3]
(1-2-3)
[1,2,3,1-2-3]
Collectors.joining()
public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,
CharSequence prefix,
CharSequence suffix) {
return new CollectorImpl<>(
() -> new StringJoiner(delimiter, prefix, suffix),
StringJoiner::add, StringJoiner::merge,
StringJoiner::toString, CH_NOID);
}
看这里的CollectorImpl实现应该也能想到实现。
reducing
public static <T> Collector<T, ?, Optional<T>> reducing(BinaryOperator<T> op) {
class OptionalBox implements Consumer<T> {
T value = null;
boolean present = false;
@Override
public void accept(T t) {
if (present) {
value = op.apply(value, t);
}
else {
value = t;
present = true;
}
}
}
return new CollectorImpl<T, OptionalBox, Optional<T>>(
OptionalBox::new, OptionalBox::accept,
(a, b) -> { if (b.present) a.accept(b.value); return a; },
a -> Optional.ofNullable(a.value), CH_NOID);
}
- Supplier : 方法内部类
OptionalBox
,负责调用传入的BinaryOperator
以及收集最后的结果- BiConsumer:
OptionalBox
的accept方法- BinaryOperator:两个OptionalBox调用accept方法
- finisher:最后结果进行封装,这里封装为
Optional
参考以下伪代码:
result = new OptionalBox() foreach(cursor : list){ result.value = BinaryOperator.apply(result.value,cursor) } //忽略并行流 return Optional.ofNullable(result.value)
示例
@Test
public void testReduce() {
Collector<Person, ?, Optional<Person>> reducing = Collectors
.reducing((Person a, Person b) -> new Person("-1", a.getAge() + b.getAge()));
Optional<Person> sumAge = getData().stream().collect(reducing);
sumAge.ifPresent(System.out::println);
}
Person{name='-1', age=194}
注意:BinaryOperator
接口:继承 BiFunction
这里要求传入参数和返回结果都是相同类型,所以上面要专门new一个Person对象。
所以在运行过程需要创建大量的对象,会影响性能
public interface BiFunction<T, U, R> { R apply(T t, U u); }
maxBy
基于reducing
public static <T> Collector<T, ?, Optional<T>> maxBy(Comparator<? super T> comparator) {
return reducing(BinaryOperator.maxBy(comparator));
}
@Test
public void testMaxBy() {
Collector<Person, ?, Optional<Person>> collector = Collectors
.maxBy(Comparator.comparing(Person::getAge));
Optional<Person> collect = getData().stream().collect(collector);
collect.ifPresent(System.out::println);
}
averagingInt
public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper) {
return new CollectorImpl<>(
() -> new long[2],
(a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; },
(a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; },
a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID);
}
- Supplier :new long[2]
- BiConsumer:long[0]累加求和,long[1]计数
- BinaryOperator:两个Long[2]相加
- finisher:返回总和/计数
partitioningBy
public static <T>
Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
return partitioningBy(predicate, toList());
}
public static <T, D, A>
Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
Collector<? super T, A, D> downstream) {
BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
//根据Predicate结果,加入true还是false的list中
BiConsumer<Partition<A>, T> accumulator = (result, t) ->
downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t);
BinaryOperator<A> op = downstream.combiner();
// Partition 中true 的list和false的list 分别与另一个Partition相加
BinaryOperator<Partition<A>> merger = (left, right) ->
new Partition<>(op.apply(left.forTrue, right.forTrue),
op.apply(left.forFalse, right.forFalse));
//这里Supplier是一个内部新建的Map子类
Supplier<Partition<A>> supplier = () ->
new Partition<>(downstream.supplier().get(),
downstream.supplier().get());
if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
return new CollectorImpl<>(supplier, accumulator, merger, CH_ID);
}
else {
Function<Partition<A>, Map<Boolean, D>> finisher = par ->
new Partition<>(downstream.finisher().apply(par.forTrue),
downstream.finisher().apply(par.forFalse));
return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID);
}
}
Supplier:内部私有的Map子类Partition,里面保存两个容器对象。默认情况下是两个List,分别存储true和false的结果
private static final class Partition<T> extends AbstractMap<Boolean, T> implements Map<Boolean, T> { //两个容器对象,默认是List final T forTrue; final T forFalse; Partition(T forTrue, T forFalse) { this.forTrue = forTrue; this.forFalse = forFalse; } @Override public Set<Map.Entry<Boolean, T>> entrySet() { return new AbstractSet<>() { @Override public Iterator<Map.Entry<Boolean, T>> iterator() { Map.Entry<Boolean, T> falseEntry = new SimpleImmutableEntry<>(false, forFalse); Map.Entry<Boolean, T> trueEntry = new SimpleImmutableEntry<>(true, forTrue); return List.of(falseEntry, trueEntry).iterator(); } @Override public int size() { return 2; } }; } }
BiConsumer:根据Predicate结果,加入Partition中true还是false的容器对象中,默认是list
BinaryOperator:两个Partition的true和false集合分别相加
示例
@Test
public void testPartition (){
Collector<Person, ?, Map<Boolean, List<Person>>> collector = Collectors
.partitioningBy((Person n) -> n.getAge() < 25);
Map<Boolean, List<Person>> collect = getData().stream().collect(collector);
collect.forEach((k,v)-> System.out.println(k+"="+v));
}
false=[Person{name='4', age=26}, Person{name='5', age=28}, Person{name='7', age=28}, Person{name='8', age=26}]
true=[Person{name='1', age=20}, Person{name='2', age=22}, Person{name='3', age=24}, Person{name='6', age=20}]
mapping
public static <T, U, A, R>Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
Collector<? super U, A, R> downstream) {
BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
return new CollectorImpl<>(downstream.supplier(),
(r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
downstream.combiner(), downstream.finisher(),
downstream.characteristics());
}
相对于Collectors.toList(),在添加元素之前,会调用
Function<? super T, ? extends U>
映射到相应的结果,再添加到容器中
@Test
public void test (){
Collector<Person, ?, List<Integer>> mapping = Collectors.mapping((Person a) -> a.getAge(), Collectors.toList());
List<Integer> collect = getData().stream().collect(mapping);
collect.forEach(System.out::println);
}