zoukankan      html  css  js  c++  java
  • java基础-Steam[2]-Collector接口

    理解Stream.collect()方法

       /**伪代码如下
        * <pre>{@code
        *     R result = supplier.get();
        *     for (T element : this stream)
        *         accumulator.accept(result, element);
        *     return result;
        * }</pre>
        * <pre>{@code
        *     List<String> asList = stringStream.collect(ArrayList::new, ArrayList::add,
        *                                                ArrayList::addAll);
        * }</pre>
        * Supplier:生产者,也是返回的结果类型
        * accumulator:将流中的元素添加到Supplier中
        * BiConsumer:合并两个Supplier,
        */
       <R> R collect(Supplier<R> supplier,
                     BiConsumer<R, ? super T> accumulator,
                     BiConsumer<R, R> combiner);
    
       /**
        * Collector封装了#collect(Supplier, BiConsumer, BiConsumer)参数
        * terminal operation
        * 即使使用非线程安全的数据结构,也没有线程安全问题,如ArrayList
        * List<String> asList = stringStream.collect(Collectors.toList());
        * Map<String, List<Person>> peopleByCity
        *         = personStream.collect(Collectors.groupingBy(Person::getCity));
        * Map<String, Map<String, List<Person>>> peopleByStateAndCity
        *         = personStream.collect(Collectors.groupingBy(Person::getState,                                                    Collectors.groupingBy(Person::getCity)));
        */
       <R, A> R collect(Collector<? super T, A, R> collector);
    

    Collector接口

    public interface Collector<T, A, R> {
        //该接口只有get().相对于new一个容器对象,在后面的聚合操作中使用
        Supplier<A> supplier();
        //void accept(T t, U u);理解为累加操作,即不断的将值累加到supplier()返回的对象中
        BiConsumer<A, T> accumulator();
        //容器运算,一般在并行流中,生成的多个容器之间的运算
        BinaryOperator<A> combiner();
    
        Function<A, R> finisher();
    
        Set<Characteristics> characteristics();
    }
    

    实现1:toList

    Collector<Person, ?, List<Person>> personListCollector = Collectors.toList();
    //源码
    public static <T> Collector<T, ?, List<T>> toList() {
        return new CollectorImpl<>((Supplier<List<T>>) ArrayList::new, List::add,
                                   (left, right) -> { left.addAll(right); return left; },
                                   CH_ID);
    }
    
    

    Supplier : new ArrayList()作为后面添加数据的容器

    BiConsumer:List::add,调用ArrayList对象的add方法添加元素

    BinaryOperator: (left, right) -> { left.addAll(right); return left;即ArrayList.addAll(ArrayList)

    示例

    public static void main(String[] args) {
        ArrayList<Person> list=new ArrayList<>();
        list.add(new Person("1",20));
        list.add(new Person("2",22));
        list.add(new Person("3",24));
        list.add(new Person("4",26));
        list.add(new Person("5",28));
        list.add(new Person("6",20));
        list.add(new Person("7",18));
        list.add(new Person("8",16));
        ArrayList<Person> newList = list.stream().collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
        System.out.println("--newList--");
        newList.stream().forEach(System.out::println);
    }
    //输出
    Person{name='1'}
    Person{name='2'}
    Person{name='3'}
    Person{name='4'}
    Person{name='5'}
    Person{name='6'}
    Person{name='7'}
    Person{name='8'}
    //替换collect语句
      ArrayList<Object> newList = list.stream().collect(ArrayList::new, ArrayList::add, ArrayList::add);
    //输出
    --newList--
    Person{name='1'}
    Person{name='2'}
    Person{name='3'}
    Person{name='4'}
    Person{name='5'}
    Person{name='6'}
    Person{name='7'}
    Person{name='8'}
    //替换collect语句
      ArrayList<Object> newList = list.stream().parallel().collect(ArrayList::new, ArrayList::add, ArrayList::add);
    //输出
    Person{name='1'}
    [Person{name='2'}]
    [Person{name='3'}, [Person{name='4'}]]
    [Person{name='5'}, [Person{name='6'}], [Person{name='7'}, [Person{name='8'}]]]
    //替换collect语句
      ArrayList<Object> newList = list.stream().parallel().collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
    //输出
    --newList--
    Person{name='1'}
    Person{name='2'}
    Person{name='3'}
    Person{name='4'}
    Person{name='5'}
    Person{name='6'}
    Person{name='7'}
    Person{name='8'}
    
    <R> R collect(Supplier<R> supplier,
                BiConsumer<R, ? super T> accumulator,
                BiConsumer<R, R> combiner);
    ArrayList<Person> newList = list.stream().collect(ArrayList::new, ArrayList::add, ArrayList::add);
    //将ArrayList泛型指定为Person后,第二个ArrayList::add编译器报错,必须改为addAll
    

    第三个参数必须是集合与集合的运算,一般在parallelStream中使用

    实现2:toMap

    public static <T, K, U> Collector<T, ?, Map<K,U>> toMap(Function<? super T, ? extends K> keyMapper,
                                                            Function<? super T, ? extends U> valueMapper) {
        return new CollectorImpl<>(HashMap::new,
                                   uniqKeysMapAccumulator(keyMapper, valueMapper),
                                   uniqKeysMapMerger(),
                                   CH_ID);
    }
    private static <T, K, V>
        BiConsumer<Map<K, V>, T> uniqKeysMapAccumulator(Function<? super T, ? extends K> keyMapper,
                                                        Function<? super T, ? extends V> valueMapper) {
        return (map, element) -> {
            K k = keyMapper.apply(element);
            V v = Objects.requireNonNull(valueMapper.apply(element));
            V u = map.putIfAbsent(k, v);
            if (u != null) throw duplicateKeyException(k, u, v);
        };
    }
    

    简单理解就是,将流中的数据根据Function映射到map中的key和value ,返回HashMap对象

    示例

    @Test
    public void testCollectMap (){
        ArrayList<Person> list = getData();
        Map<String, Integer> mapCollect = list.stream()
            .collect(Collectors.toMap(n -> n.getName() + "-test", n -> n.getAge() * 2));
        mapCollect.forEach((k, v) -> System.out.println(k + "==" + v));
    }
    

    实现3:groupingby

    public static <T, K> Collector<T, ?, Map<K, List<T>>>
        groupingBy(Function<? super T, ? extends K> classifier) {
        return groupingBy(classifier, toList());
    }
    public static <T, K, A, D>
        Collector<T, ?, Map<K, D>> groupingBy(Function<? super T, ? extends K> classifier,
                                              Collector<? super T, A, D> downstream) {
        return groupingBy(classifier, HashMap::new, downstream);
    }
    public static <T, K, D, A, M extends Map<K, D>>
        Collector<T, ?, M> groupingBy(Function<? super T, ? extends K> classifier,
                                      Supplier<M> mapFactory,
                                      Collector<? super T, A, D> downstream) {
        Supplier<A> downstreamSupplier = downstream.supplier();
        BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
        //最主要理解下面这段代码
        BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
            K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
            A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
            downstreamAccumulator.accept(container, t);
        };
        BinaryOperator<Map<K, A>> merger = Collectors.<K, A, Map<K, A>>mapMerger(downstream.combiner());
        @SuppressWarnings("unchecked")
        Supplier<Map<K, A>> mangledFactory = (Supplier<Map<K, A>>) mapFactory;
    
        if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
            return new CollectorImpl<>(mangledFactory, accumulator, merger, CH_ID);
        }
        else {
            @SuppressWarnings("unchecked")
            Function<A, A> downstreamFinisher = (Function<A, A>) downstream.finisher();
            Function<Map<K, A>, M> finisher = intermediate -> {
                intermediate.replaceAll((k, v) -> downstreamFinisher.apply(v));
                @SuppressWarnings("unchecked")
                M castResult = (M) intermediate;
                return castResult;
            };
            return new CollectorImpl<>(mangledFactory, accumulator, merger, finisher, CH_NOID);
        }
    }
    

    从上面两个例子我们知道,不管怎么变,最后都要转换为:

    1. Supplier

    2. BiConsumer

    3. BinaryOperator

    从下文示例返回的的数据类型是:

    Collector<Person, ?, Map<Integer, List<Person>>> collector = Collectors.groupingBy(Person::getAge);
    

    理解为,传入的操作元素是Person对象,最后返回一个map。map是按年龄分组,key对应年龄,value对应等于该年龄下的person的list对象。

    所以这里要构建两个容器对象,List和Map,或者理解为建立在Collectors.toList()的基础上

    理解一下代码:

    BiConsumer<Map<K, A>, T> accumulator = (m, t) -> {
        //根据分组函数计算出Map中的KEY
        K key = Objects.requireNonNull(classifier.apply(t), "element cannot be mapped to a null key");
        //获取map中key对应的容器对象,如果没有这调用相应的Supplier 
        //像上面的示例,就是new ArrayList()
        A container = m.computeIfAbsent(key, k -> downstreamSupplier.get());
        //累加到第一个容器对象中,示例是List::add
        downstreamAccumulator.accept(container, t);
    };
    

    这个BiConsumer已经可以说明是如何进行分组的了。

    示例

    @Test
    public void test (){
        Collector<Person, ?, Map<Integer, List<Person>>> collector = Collectors.groupingBy(Person::getAge);
        Map<Integer, List<Person>> collect = getData().stream().collect(collector);
        collect.forEach((k,v)-> System.out.println(k+"=="+v));
    }
    private ArrayList<Person> getData() {
        ArrayList<Person> list = new ArrayList<>();
        list.add(new Person("1", 20));
        list.add(new Person("2", 22));
        list.add(new Person("3", 24));
        list.add(new Person("4", 26));
        list.add(new Person("5", 28));
        list.add(new Person("6", 20));
        list.add(new Person("7", 28));
        list.add(new Person("8", 26));
        return list;
    }
    

    joining

    先了解StringJoiner,之前拼接字符串一直不知道有这个方法

    @Test
    public void testJoiner (){
        List<String> list = Arrays.asList("1", "2", "3");
        StringJoiner joiner = new StringJoiner(",","[","]");
        StringJoiner joiner2 = new StringJoiner("-","(",")");
        list.forEach(joiner::add);
        System.out.println(joiner.toString());
        list.forEach(joiner2::add);
        System.out.println(joiner2.toString());
        StringJoiner merge = joiner.merge(joiner2);
        System.out.println(merge.toString());
    }
    [1,2,3]
    (1-2-3)
    [1,2,3,1-2-3]
    

    Collectors.joining()

    public static Collector<CharSequence, ?, String> joining(CharSequence delimiter,
                                                             CharSequence prefix,
                                                             CharSequence suffix) {
        return new CollectorImpl<>(
            () -> new StringJoiner(delimiter, prefix, suffix),
            StringJoiner::add, StringJoiner::merge,
            StringJoiner::toString, CH_NOID);
    }
    

    看这里的CollectorImpl实现应该也能想到实现。

    reducing

    public static <T> Collector<T, ?, Optional<T>> reducing(BinaryOperator<T> op) {
        class OptionalBox implements Consumer<T> {
            T value = null;
            boolean present = false;
    
            @Override
            public void accept(T t) {
                if (present) {
                    value = op.apply(value, t);
                }
                else {
                    value = t;
                    present = true;
                }
            }
        }
    
        return new CollectorImpl<T, OptionalBox, Optional<T>>(
            OptionalBox::new, OptionalBox::accept,
            (a, b) -> { if (b.present) a.accept(b.value); return a; },
            a -> Optional.ofNullable(a.value), CH_NOID);
    }
    
    1. Supplier : 方法内部类OptionalBox,负责调用传入的BinaryOperator以及收集最后的结果
    2. BiConsumer:OptionalBox的accept方法
    3. BinaryOperator:两个OptionalBox调用accept方法
    4. finisher:最后结果进行封装,这里封装为Optional

    参考以下伪代码:

    result = new OptionalBox()
    foreach(cursor : list){
    	result.value = BinaryOperator.apply(result.value,cursor)
    }
    //忽略并行流
    return Optional.ofNullable(result.value)
    

    示例

    @Test
    public void testReduce() {
        Collector<Person, ?, Optional<Person>> reducing = Collectors
            .reducing((Person a, Person b) -> new Person("-1", a.getAge() + b.getAge()));
        Optional<Person> sumAge = getData().stream().collect(reducing);
        sumAge.ifPresent(System.out::println);
    }
    Person{name='-1', age=194}
    

    注意:BinaryOperator接口:继承BiFunction

    这里要求传入参数和返回结果都是相同类型,所以上面要专门new一个Person对象。

    所以在运行过程需要创建大量的对象,会影响性能

    public interface BiFunction<T, U, R> {
     R apply(T t, U u);
    }
    

    maxBy

    基于reducing

    public static <T> Collector<T, ?, Optional<T>>  maxBy(Comparator<? super T> comparator) {
        return reducing(BinaryOperator.maxBy(comparator));
    }
    @Test
    public void testMaxBy() {
        Collector<Person, ?, Optional<Person>> collector = Collectors
            .maxBy(Comparator.comparing(Person::getAge));
        Optional<Person> collect = getData().stream().collect(collector);
        collect.ifPresent(System.out::println);
    }
    

    averagingInt

    public static <T> Collector<T, ?, Double> averagingInt(ToIntFunction<? super T> mapper) {
        return new CollectorImpl<>(
            () -> new long[2],
            (a, t) -> { a[0] += mapper.applyAsInt(t); a[1]++; },
            (a, b) -> { a[0] += b[0]; a[1] += b[1]; return a; },
            a -> (a[1] == 0) ? 0.0d : (double) a[0] / a[1], CH_NOID);
    }
    
    1. Supplier :new long[2]
    2. BiConsumer:long[0]累加求和,long[1]计数
    3. BinaryOperator:两个Long[2]相加
    4. finisher:返回总和/计数

    partitioningBy

    public static <T>
        Collector<T, ?, Map<Boolean, List<T>>> partitioningBy(Predicate<? super T> predicate) {
        return partitioningBy(predicate, toList());
    }
        public static <T, D, A>
        Collector<T, ?, Map<Boolean, D>> partitioningBy(Predicate<? super T> predicate,
                                                        Collector<? super T, A, D> downstream) {
            BiConsumer<A, ? super T> downstreamAccumulator = downstream.accumulator();
            //根据Predicate结果,加入true还是false的list中
            BiConsumer<Partition<A>, T> accumulator = (result, t) ->
                    downstreamAccumulator.accept(predicate.test(t) ? result.forTrue : result.forFalse, t);
            BinaryOperator<A> op = downstream.combiner();
            // Partition 中true 的list和false的list 分别与另一个Partition相加
            BinaryOperator<Partition<A>> merger = (left, right) ->
                    new Partition<>(op.apply(left.forTrue, right.forTrue),
                                    op.apply(left.forFalse, right.forFalse));
            //这里Supplier是一个内部新建的Map子类
            Supplier<Partition<A>> supplier = () ->
                    new Partition<>(downstream.supplier().get(),
                                    downstream.supplier().get());
            if (downstream.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)) {
                return new CollectorImpl<>(supplier, accumulator, merger, CH_ID);
            }
            else {
                Function<Partition<A>, Map<Boolean, D>> finisher = par ->
                        new Partition<>(downstream.finisher().apply(par.forTrue),
                                        downstream.finisher().apply(par.forFalse));
                return new CollectorImpl<>(supplier, accumulator, merger, finisher, CH_NOID);
            }
        }
    
    1. Supplier:内部私有的Map子类Partition,里面保存两个容器对象。默认情况下是两个List,分别存储true和false的结果

      private static final class Partition<T>
                  extends AbstractMap<Boolean, T>
                  implements Map<Boolean, T> {
          	//两个容器对象,默认是List
              final T forTrue;
              final T forFalse;
      
              Partition(T forTrue, T forFalse) {
                  this.forTrue = forTrue;
                  this.forFalse = forFalse;
              }
      
              @Override
              public Set<Map.Entry<Boolean, T>> entrySet() {
                  return new AbstractSet<>() {
                      @Override
                      public Iterator<Map.Entry<Boolean, T>> iterator() {
                          Map.Entry<Boolean, T> falseEntry = new SimpleImmutableEntry<>(false, forFalse);
                          Map.Entry<Boolean, T> trueEntry = new SimpleImmutableEntry<>(true, forTrue);
                          return List.of(falseEntry, trueEntry).iterator();
                      }
      
                      @Override
                      public int size() {
                          return 2;
                      }
                  };
              }
          }
      
    2. BiConsumer:根据Predicate结果,加入Partition中true还是false的容器对象中,默认是list

    3. BinaryOperator:两个Partition的true和false集合分别相加

    示例

    @Test
    public void testPartition (){
        Collector<Person, ?, Map<Boolean, List<Person>>> collector = Collectors
            .partitioningBy((Person n) -> n.getAge() < 25);
        Map<Boolean, List<Person>> collect = getData().stream().collect(collector);
        collect.forEach((k,v)-> System.out.println(k+"="+v));
    }
    false=[Person{name='4', age=26}, Person{name='5', age=28}, Person{name='7', age=28}, Person{name='8', age=26}]
    true=[Person{name='1', age=20}, Person{name='2', age=22}, Person{name='3', age=24}, Person{name='6', age=20}]
    

    mapping

    public static <T, U, A, R>Collector<T, ?, R> mapping(Function<? super T, ? extends U> mapper,
                                   Collector<? super U, A, R> downstream) {
        BiConsumer<A, ? super U> downstreamAccumulator = downstream.accumulator();
        return new CollectorImpl<>(downstream.supplier(),
                                   (r, t) -> downstreamAccumulator.accept(r, mapper.apply(t)),
                                   downstream.combiner(), downstream.finisher(),
                                   downstream.characteristics());
    }
    

    相对于Collectors.toList(),在添加元素之前,会调用Function<? super T, ? extends U> 映射到相应的结果,再添加到容器中

    @Test
    public void test (){
        Collector<Person, ?, List<Integer>> mapping = Collectors.mapping((Person a) -> a.getAge(), Collectors.toList());
        List<Integer> collect = getData().stream().collect(mapping);
        collect.forEach(System.out::println);
    }
    
  • 相关阅读:
    hdu2049
    hdu2047
    hdu2568
    hdu2570
    推流摄像头推RTMP视频流至EasyDSS视频直播点播平台Chrome浏览器无法播放如何解决?
    RTMP推流协议视频智能分析/人脸识别/直播点播平台EasyDSS接口调用注意事项介绍
    【解决方案】人脸识别/智能分析视频安防服务平台EasyCVR如何打造智慧人社局培训办事机构远程监控系统?
    【解决方案】RTMP推流协议视频智能分析/直播点播/人脸识别平台EasyDSS打造智能多媒体展厅解决方案
    【解决方案】视频智能分析/人脸识别平台EasyDSS实现景区智慧旅游体系,VR+大数据打造风景区实时视频
    RTMP推流协议视频智能分析平台EasyDSS直播点播系统新版本无法完成推流以及录像回看排查过程
  • 原文地址:https://www.cnblogs.com/froggengo/p/14669850.html
Copyright © 2011-2022 走看看