zoukankan      html  css  js  c++  java
  • flink Reduce、GroupReduce、GroupCombine笔记

    1、reduce操作,在分组的dataset上使用,也可以在不分组的dataset上使用

    应用于分组DataSet的Reduce转换使用用户定义的reduce函数将每个组减少为单个元素。对于每组输入元素,reduce函数连续地将元素对组合成一个元素,直到每个组只剩下一个元素。
    
    注意,对于ReduceFunction,返回对象的key字段应与输入值匹配。这是因为reduce是可隐式组合(combine)的,并且从combine运算符发出的对象在传递给reduce运算符时再次按key分组。

    1.1 使用key表达式的dataset进行reduce

    key表达式指定DataSet的每个元素的一个或多个字段。每个key表达式都是公共字段的名称或getter方法。用点被用于向下钻取对象。key表达式“*”选择所有字段。以下代码显示如何使用key表达式对POJO DataSet进行分组,并使用reduce函数对其进行规约。
    
    
    
    // some ordinary POJO
    public class WC {
      public String word;
      public int count;
      // [...]
    }
    
    // ReduceFunction that sums Integer attributes of a POJO
    public class WordCounter implements ReduceFunction<WC> {
      @Override
      public WC reduce(WC in1, WC in2) {
        return new WC(in1.word, in1.count + in2.count);
      }
    }
    
    // [...]
    DataSet<WC> words = // [...]
    DataSet<WC> wordCounts = words
                             // DataSet grouping on field "word"
                             .groupBy("word")
                             // apply ReduceFunction on grouped DataSet
                             .reduce(new WordCounter());

    1.2 使用KeySelector函数的dataset上进行reduce

    key选择器函数从DataSet的每个元素中提取键值。提取的key用于对DataSet进行分组。以下代码显示如何使用键选择器函数对POJO DataSet进行分组,并使用reduce函数对其进行规约操作。
    
    
    // some ordinary POJO
    public class WC {
      public String word;
      public int count;
      // [...]
    }
    
    // ReduceFunction that sums Integer attributes of a POJO
    public class WordCounter implements ReduceFunction<WC> {
      @Override
      public WC reduce(WC in1, WC in2) {
        return new WC(in1.word, in1.count + in2.count);
      }
    }
    
    // [...]
    DataSet<WC> words = // [...]
    DataSet<WC> wordCounts = words
                             // DataSet grouping on field "word"
                             .groupBy(new SelectWord())
                             // apply ReduceFunction on grouped DataSet
                             .reduce(new WordCounter());
    
    public class SelectWord implements KeySelector<WC, String> {
      @Override
      public String getKey(Word w) {
        return w.word;
      }
    }

    1.3 在Tuple元组上应用的reduce,可以使用数字来指明字段位置,类似索引

    字段位置键指定一个或多个字段用于分组
    
    
    DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
    DataSet<Tuple3<String, Integer, Double>> reducedTuples = tuples
                                             // group DataSet on first and second field of Tuple
                                             .groupBy(0, 1)
                                             // apply ReduceFunction on grouped DataSet
                                             .reduce(new MyTupleReducer());

    1.4 在整个数据集上应用reduce

    Reduce转换可以将用户定义的reduce函数应用于DataSet的所有元素。 reduce函数随后将元素对组合成一个元素,直到只剩下一个元素。
    
    使用Reduce转换规约完整的DataSet意味着最终的Reduce操作不能并行完成。但是,reduce函数可以自动组合,因此Reduce转换不会限制大多数用例的可伸缩性
    
    以下代码显示如何对Integer DataSet的所有元素求和:
    
    
    // ReduceFunction that sums Integers
    public class IntSummer implements ReduceFunction<Integer> {
      @Override
      public Integer reduce(Integer num1, Integer num2) {
        return num1 + num2;
      }
    }
    
    // [...]
    DataSet<Integer> intNumbers = // [...]
    DataSet<Integer> sum = intNumbers.reduce(new IntSummer());

    2、分组reduce,即GroupReduce

    应用于分组DataSet的GroupReduce调用用户定义的group-reduce函数转换每个分组。
    这与Reduce的区别在于用户定义的函数会立即获得整个组。在组的所有元素上使用Iterable调用该函数,并且可以返回任意数量的结果元素。

    2.1 GroupReduce对于分组的键于redeuce相同

    以下代码显示如何从Integer分组的DataSet中删除重复的字符串。
    
    
    public class DistinctReduce
             implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
    
      @Override
      public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
    
        Set<String> uniqStrings = new HashSet<String>();
        Integer key = null;
    
        // add all strings of the group to the set
        for (Tuple2<Integer, String> t : in) {
          key = t.f0;
          uniqStrings.add(t.f1);
        }
    
        // emit all unique strings.
        for (String s : uniqStrings) {
          out.collect(new Tuple2<Integer, String>(key, s));
        }
      }
    }
    
    // [...]
    DataSet<Tuple2<Integer, String>> input = // [...]
    DataSet<Tuple2<Integer, String>> output = input
                               .groupBy(0)            // group DataSet by the first tuple field
                               .reduceGroup(new DistinctReduce());  // apply GroupReduceFunction

    2.2 将GroupReduce应用于排序分组的数据集

    group-reduce函数使用Iterable访问组的元素。Iterable可以按指定的顺序分发组的元素(可选)。在许多情况下,这可以帮助降低用户定义的组减少功能的复杂性并提高其效率。
    
    下面的代码显示了如何删除由Integer分组并按String排序的DataSet中的重复字符串的另一个示例。
    
    
    // GroupReduceFunction that removes consecutive identical elements
    public class DistinctReduce
             implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
    
      @Override
      public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
        Integer key = null;
        String comp = null;
    
        for (Tuple2<Integer, String> t : in) {
          key = t.f0;
          String next = t.f1;
    
          // check if strings are different
          if (com == null || !next.equals(comp)) {
            out.collect(new Tuple2<Integer, String>(key, next));
            comp = next;
          }
        }
      }
    }
    
    // [...]
    DataSet<Tuple2<Integer, String>> input = // [...]
    DataSet<Double> output = input
                             .groupBy(0)                         // group DataSet by first field
                             .sortGroup(1, Order.ASCENDING)      // sort groups on second tuple field
                             .reduceGroup(new DistinctReduce());

    3、可组合的GroupReduce功能

    与reduce函数相比,group-reduce函数不是可隐式组合的。为了使group-reduce函数可组合,它必须实现GroupCombineFunction接口。
    
    要点:GroupCombineFunction接口的通用输入和输出类型必须等于GroupReduceFunction的通用输入类型,如以下示例所示:
    
    
    // Combinable GroupReduceFunction that computes a sum.
    public class MyCombinableGroupReducer implements
      GroupReduceFunction<Tuple2<String, Integer>, String>,
      GroupCombineFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>
    {
      @Override
      public void reduce(Iterable<Tuple2<String, Integer>> in,
                         Collector<String> out) {
    
        String key = null;
        int sum = 0;
    
        for (Tuple2<String, Integer> curr : in) {
          key = curr.f0;
          sum += curr.f1;
        }
        // concat key and sum and emit
        out.collect(key + "-" + sum);
      }
    
      @Override
      public void combine(Iterable<Tuple2<String, Integer>> in,
                          Collector<Tuple2<String, Integer>> out) {
        String key = null;
        int sum = 0;
    
        for (Tuple2<String, Integer> curr : in) {
          key = curr.f0;
          sum += curr.f1;
        }
        // emit tuple with key and sum
        out.collect(new Tuple2<>(key, sum));
      }
    }

    4、GroupCombine 分组连接

    GroupCombine转换是可组合GroupReduceFunction中组合步骤的通用形式。它在某种意义上被概括为允许将输入类型I组合到任意输出类型O.
    相反,GroupReduce中的组合步骤仅允许从输入类型I到输出类型I的组合。这是因为reduce步骤中,GroupReduceFunction期望输入类型为I. 在一些应用中,期望在执行附加变换(例如,减小数据大小)之前将DataSet组合成中间格式。这可以通过CombineGroup转换能以非常低的成本实现。 注意:分组数据集上的GroupCombine在内存中使用贪婪策略执行,该策略可能不会一次处理所有数据,而是以多个步骤处理。
    它也可以在各个分区上执行,而无需像GroupReduce转换那样进行数据交换。这可能会导致输出的是部分结果,
    所以GroupCombine是不能替代GroupReduce操作的,尽管它们的操作内容可能看起来都一样。 以下示例演示了如何将CombineGroup转换用于备用WordCount实现。

    DataSet<String> input = [..] // The words received as input
    
    DataSet<Tuple2<String, Integer>> combinedWords = input
      .groupBy(0) // group identical words
      .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
    
        public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
            String key = null;
            int count = 0;
    
            for (String word : words) {
                key = word;
                count++;
            }
            // emit tuple with word and count
            out.collect(new Tuple2(key, count));
        }
    });
    
    DataSet<Tuple2<String, Integer>> output = combinedWords
      .groupBy(0)                              // group by words again
      .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange
    
        public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
            String key = null;
            int count = 0;
    
            for (Tuple2<String, Integer> word : words) {
                key = word;
                count++;
            }
            // emit tuple with word and count
            out.collect(new Tuple2(key, count));
        }
    });
     
  • 相关阅读:
    抓取国家地区基础数据
    h264
    h.264
    vlc 推送rtsp视频流不能播放
    花生壳tcp内网端口映射
    make 安装时指定目录
    ubuntu安装vsftpd
    vsftpd命令
    vim删除某一列
    linux打开防火墙
  • 原文地址:https://www.cnblogs.com/asker009/p/11111546.html
Copyright © 2011-2022 走看看