zoukankan      html  css  js  c++  java
  • [源码解析] GroupReduce,GroupCombine 和 Flink SQL group by

    [源码解析] GroupReduce,GroupCombine和Flink SQL group by

    0x00 摘要

    本文从源码和实例入手,为大家解析 Flink 中 GroupReduce 和 GroupCombine 的用途。也涉及到了 Flink SQL group by 的内部实现。

    0x01 缘由

    在前文[源码解析] Flink的Groupby和reduce究竟做了什么中,我们剖析了Group和reduce都做了些什么,也对combine有了一些了解。但是总感觉意犹未尽,因为在Flink还提出了若干新算子,比如GroupReduce和GroupCombine。这几个算子不搞定,总觉得如鲠在喉,但没有找到一个良好的例子来进行剖析说明。

    本文是笔者在探究Flink SQL UDF问题的一个副产品。起初是为了调试一段sql代码,结果发现Flink本身给出了一个GroupReduce和GroupCombine使用的完美例子。于是就拿出来和大家共享,一起分析看看究竟如何使用这两个算子。

    请注意:这个例子是Flink SQL,所以本文中将涉及Flink SQL goup by内部实现的知识。

    0x02 概念

    Flink官方对于这两个算子的使用说明如下:

    2.1 GroupReduce

    GroupReduce算子应用在一个已经分组了的DataSet上,其会对每个分组都调用到用户定义的group-reduce函数。它与Reduce的区别在于用户定义的函数会立即获得整个组。

    Flink将在组的所有元素上使用Iterable调用用户自定义函数,并且可以返回任意数量的结果元素。

    2.2 GroupCombine

    GroupCombine转换是可组合GroupReduceFunction中组合步骤的通用形式。它在某种意义上被概括为允许将输入类型 I 组合到任意输出类型O。与此相对的是,GroupReduce中的组合步骤仅允许从输入类型 I 到输出类型 I 的组合。这是因为GroupReduceFunction的 "reduce步骤" 期望自己的输入类型为 I。

    在一些应用中,我们期望在执行附加变换(例如,减小数据大小)之前将DataSet组合成中间格式。这可以通过CombineGroup转换能以非常低的成本实现。

    注意:分组数据集上的GroupCombine在内存中使用贪婪策略执行,该策略可能不会一次处理所有数据,而是以多个步骤处理。它也可以在各个分区上执行,而无需像GroupReduce转换那样进行数据交换。这可能会导致输出的是部分结果,所以GroupCombine是不能替代GroupReduce操作的,尽管它们的操作内容可能看起来都一样。

    2.3 例子

    是不是有点晕?还是直接让代码来说话吧。以下官方示例演示了如何将CombineGroup和GroupReduce转换用于WordCount实现。即通过combine操作先对单词数目进行初步排序,然后通过reduceGroup对combine产生的结果进行最终排序。因为combine进行了初步排序,所以在算子之间传输的数据量就少多了

    DataSet<String> input = [..] // The words received as input
    
    // 这里通过combine操作先对单词数目进行初步排序,其优势在于用户定义的combine函数只调用一次,因为runtime已经把输入数据一次性都提供给了自定义函数。  
    DataSet<Tuple2<String, Integer>> combinedWords = input
      .groupBy(0) // group identical words
      .combineGroup(new GroupCombineFunction<String, Tuple2<String, Integer>() {
    
        public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
            String key = null;
            int count = 0;
    
            for (String word : words) {
                key = word;
                count++;
            }
            // emit tuple with word and count
            out.collect(new Tuple2(key, count));
        }
    });
    
    // 这里对combine的结果进行第二次排序,其优势在于用户定义的reduce函数只调用一次,因为runtime已经把输入数据一次性都提供给了自定义函数。  
    DataSet<Tuple2<String, Integer>> output = combinedWords
      .groupBy(0)                              // group by words again
      .reduceGroup(new GroupReduceFunction() { // group reduce with full data exchange
    
        public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
            String key = null;
            int count = 0;
    
            for (Tuple2<String, Integer> word : words) {
                key = word;
                count++;
            }
            // emit tuple with word and count
            out.collect(new Tuple2(key, count));
        }
    });
    

    看到这里,有的兄弟已经明白了,这和mapPartition很类似啊,都是runtime做了大量工作。为了让大家这两个算子的使用情形有深刻的认识,我们再通过一个sql的例子,向大家展示Flink内部是怎么应用这两个算子的,也能看出来他们的强大之处

    0x03 代码

    下面代码主要参考自 flink 使用问题汇总。我们可以看到这里通过groupby进行了聚合操作。其中collect方法,类似于mysql的group_concat。

    public class UdfExample {
        public static class MapToString extends ScalarFunction {
    
            public String eval(Map<String, Integer> map) {
                if(map==null || map.size()==0) {
                    return "";
                }
                StringBuffer sb=new StringBuffer();
                for(Map.Entry<String, Integer> entity : map.entrySet()) {
                    sb.append(entity.getKey()+",");
                }
                String result=sb.toString();
                return result.substring(0, result.length()-1);
            }
        }
    
        public static void main(String[] args) throws Exception {
            MemSourceBatchOp src = new MemSourceBatchOp(new Object[][]{
                    new Object[]{"1", "a", 1L},
                    new Object[]{"2", "b33", 2L},
                    new Object[]{"2", "CCC", 2L},
                    new Object[]{"2", "xyz", 2L},
                    new Object[]{"1", "u", 1L}
            }, new String[]{"f0", "f1", "f2"});
    
            BatchTableEnvironment environment = MLEnvironmentFactory.getDefault().getBatchTableEnvironment();
            Table table = environment.fromDataSet(src.getDataSet());
            environment.registerTable("myTable", table);
            BatchOperator.registerFunction("MapToString",  new MapToString());
            BatchOperator.sqlQuery("select f0, mapToString(collect(f1)) as type from myTable group by f0").print();
        }
    }
    

    程序输出是

    f0|type
    --|----
    1|a,u
    2|CCC,b33,xyz
    

    这个SQL语句的重点是group by。这个是程序猿经常使用的操作。但是大家有没有想过这个group by在真实运行起来时候是怎么操作的呢?针对大数据环境有没有做了什么优化呢?其实,Flink正是使用了GroupReduce和GroupCombine来实现并且优化了group by的功能。优化之处在于:

    • GroupReduce和GroupCombine的函数调用次数要远低于正常的reduce算子,如果reduce操作中涉及到频繁创建额外的对象或者外部资源操作,则会相当省时间。
    • 因为combine进行了初步排序,所以在算子之间传输的数据量就少多了。

    SQL生成Flink的过程十分错综复杂,所以我们只能找最关键处。其是在 DataSetAggregate.translateToPlan 完成的。我们可以看到,对于SQL语句 “select f0, mapToString(collect(f1)) as type from myTable group by f0”,Flink系统把它翻译成如下阶段,即

    • pre-aggregation :排序 + combine;
    • final aggregation :排序 + reduce;

    从之前的文章我们可以知道,groupBy这个其实不是一个算子,它只是排序过程中的一个辅助步骤而已,所以我们重点还是要看combineGroup和reduceGroup。这恰恰是我们想要的完美例子。

    input ----> (groupBy + combineGroup) ----> (groupBy + reduceGroup) ----> output
    

    SQL生成的Scala代码如下,其中 combineGroup在后续中将生成GroupCombineOperator,reduceGroup将生成GroupReduceOperator。

      override def translateToPlan(
          tableEnv: BatchTableEnvImpl,
          queryConfig: BatchQueryConfig): DataSet[Row] = {
    
        if (grouping.length > 0) {
          // grouped aggregation
          ...... 
          if (preAgg.isDefined) { // 我们的例子是在这里
            inputDS          
              // pre-aggregation
              .groupBy(grouping: _*)
              .combineGroup(preAgg.get) // 将生成GroupCombineOperator算子
              .returns(preAggType.get)
              .name(aggOpName)
              // final aggregation
              .groupBy(grouping.indices: _*) //将生成GroupReduceOperator算子。
              .reduceGroup(finalAgg.right.get)
              .returns(rowTypeInfo)
              .name(aggOpName)
          } else {
            ......
          }
        }
        else {
          ......
        }
      }
    }
    
    // 程序变量打印如下
    this = {DataSetAggregate@5207} "Aggregate(groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))"
     cluster = {RelOptCluster@5220} 
    

    0x05 JobGraph

    LocalExecutor.execute中会生成JobGraph。JobGraph是提交给 JobManager 的数据结构,是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。

    在生成JobGraph时候,系统得到如下JobVertex。

    jobGraph = {JobGraph@5652} "JobGraph(jobId: 6aae8b5e5ad32f588136bef26f8b65f6)"
     taskVertices = {LinkedHashMap@5655}  size = 4
    
    {JobVertexID@5677} "c625209bb7fb9a098807551840aeaa99" -> {InputOutputFormatVertex@5678} "CHAIN DataSource (at initializeDataSource(MemSourceBatchOp.java:98) (org.apache.flink.api.java.io.CollectionInputFormat)) -> FlatMap (select: (f0, f1)) (org.apache.flink.runtime.operators.DataSourceTask)"
    
    {JobVertexID@5679} "b56ace4acd7a2f69ea110a9f262ff80a" -> {JobVertex@5680} "CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map (Map at linkFrom(MapBatchOp.java:35)) (org.apache.flink.runtime.operators.BatchTask)"
     
    {JobVertexID@5681} "3f5e2a0f700421d80ce85e02a6d9db73" -> {InputOutputFormatVertex@5682} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"
     
    {JobVertexID@5683} "ad29dc5b2e0a39ad2cd1d164b6f859f7" -> {JobVertex@5684} "GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) (org.apache.flink.runtime.operators.BatchTask)"
    

    我们可以看到,在JobGraph中就生成了对应的两个算子。其中这里的FlatMap就是用户的UDF函数MapToString的映射生成。

    GroupCombine (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1))  
      
    CHAIN GroupReduce (groupBy: (f0), select: (f0, COLLECT(f1) AS $f1)) -> FlatMap (select: (f0, mapToString($f1) AS type)) -> Map 
    

    0x06 Runtime

    最后,让我们看看runtime会如何处理这两个算子。

    6.1 ChainedFlatMapDriver

    首先,Flink会在ChainedFlatMapDriver.collect中对record进行处理,这是从Table中提取数据所必须经历的,与后续的group by关系不大。

    @Override
    public void collect(IT record) {
       try {
          this.numRecordsIn.inc();
          this.mapper.flatMap(record, this.outputCollector);
       } catch (Exception ex) {
          throw new ExceptionInChainedStubException(this.taskName, ex);
       }
    }
    
    // 这里能够看出来,我们获取了第一列记录
    record = {Row@9317} "1,a,1"
     fields = {Object[3]@9330} 
    this.taskName = "FlatMap (select: (f0, f1))"
    
    // 程序堆栈打印如下
    collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
    collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
    invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
    doRun:707, Task (org.apache.flink.runtime.taskmanager)
    run:532, Task (org.apache.flink.runtime.taskmanager)
    run:748, Thread (java.lang)
    

    6.2 GroupReduceCombineDriver

    其次,GroupReduceCombineDriver.run()中会进行combine操作。

    1. 会通过this.sorter.write(value)把数据写到排序缓冲区。
    2. 会通过sortAndCombineAndRetryWrite(value)进行实际的排序,合并。

    因为是系统实现,所以Combine的用户自定义函数就是由Table API提供的,比如org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate

    @Override
    public void run() throws Exception {
       final MutableObjectIterator<IN> in = this.taskContext.getInput(0);
       final TypeSerializer<IN> serializer = this.serializer;
    
       if (objectReuseEnabled) {
        .....
       }
       else {
          IN value;
          while (running && (value = in.next()) != null) {
             // try writing to the sorter first
             if (this.sorter.write(value)) {
                continue;
             }
    
             // do the actual sorting, combining, and data writing
             sortAndCombineAndRetryWrite(value);
          }
       }
    
       // sort, combine, and send the final batch
       if (running) {
          sortAndCombine();
       }
    }
    
    // 程序变量如下
    value = {Row@9494} "1,a"
     fields = {Object[2]@9503} 
    

    sortAndCombine是具体排序/合并的过程。

    1. 排序是通过 org.apache.flink.runtime.operators.sort.QuickSort 完成的。
    2. 合并是通过 org.apache.flink.table.functions.aggfunctions.CollectAccumulator.accumulate 完成的。
    3. 给下游是由 org.apache.flink.table.runtime.aggregate.DataSetPreAggFunction.combine 调用 out.collect(output) 完成的。
    private void sortAndCombine() throws Exception {
       final InMemorySorter<IN> sorter = this.sorter;
       // 这里进行实际的排序
       this.sortAlgo.sort(sorter);
       final GroupCombineFunction<IN, OUT> combiner = this.combiner;
       final Collector<OUT> output = this.output;
    
       // iterate over key groups
       if (objectReuseEnabled) {
    			......		
       } else {
          final NonReusingKeyGroupedIterator<IN> keyIter = 
                new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
          // 这里是归并操作
          while (this.running && keyIter.nextKey()) {
             // combiner.combiner 是用户定义操作,runtime把某key对应的数据一次性传给它
             combiner.combine(keyIter.getValues(), output);
          }
       }
    }
    

    具体调用栈如下:

    accumulate:57, CollectAggFunction (org.apache.flink.table.functions.aggfunctions)
    accumulate:-1, DataSetAggregatePrepareMapHelper$5
    combine:71, DataSetPreAggFunction (org.apache.flink.table.runtime.aggregate)
    sortAndCombine:213, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
    run:188, GroupReduceCombineDriver (org.apache.flink.runtime.operators)
    run:504, BatchTask (org.apache.flink.runtime.operators)
    invoke:369, BatchTask (org.apache.flink.runtime.operators)
    doRun:707, Task (org.apache.flink.runtime.taskmanager)
    run:532, Task (org.apache.flink.runtime.taskmanager)
    run:748, Thread (java.lang)
    

    6.3 GroupReduceDriver & ChainedFlatMapDriver

    这两个放在一起,是因为他们组成了Operator Chain。

    GroupReduceDriver.run中完成了reduce。具体reduce 操作是在 org.apache.flink.table.runtime.aggregate.DataSetFinalAggFunction.reduce 完成的,然后在其中直接发送给下游 out.collect(output)

    @Override
    public void run() throws Exception {
       // cache references on the stack
       final GroupReduceFunction<IT, OT> stub = this.taskContext.getStub();
     
       if (objectReuseEnabled) {
           ......	
       }
       else {
          final NonReusingKeyGroupedIterator<IT> iter = new NonReusingKeyGroupedIterator<IT>(this.input, this.comparator);
          // run stub implementation
          while (this.running && iter.nextKey()) {
             // stub.reduce 是用户定义操作,runtime把某key对应的数据一次性传给它
             stub.reduce(iter.getValues(), output);
          }
       }
    }
    

    从前文我们可以,这里已经配置成了Operator Chain,所以out.collect(output)会调用到CountingCollector。CountingCollector的成员变量collector已经配置成了ChainedFlatMapDriver。

    public void collect(OUT record) {
       this.numRecordsOut.inc();
       this.collector.collect(record);
    }
    
    this.collector = {ChainedFlatMapDriver@9643} 
     mapper = {FlatMapRunner@9610} 
     config = {TaskConfig@9655} 
     taskName = "FlatMap (select: (f0, mapToString($f1) AS type))"
    

    于是程序就调用到了 ChainedFlatMapDriver.collect

    public void collect(IT record) {
       try {
          this.numRecordsIn.inc();
          this.mapper.flatMap(record, this.outputCollector);
       } catch (Exception ex) {
          throw new ExceptionInChainedStubException(this.taskName, ex);
       }
    }
    

    最终调用栈如如下:

    eval:21, UdfExample$MapToString (com.alibaba.alink)
    flatMap:-1, DataSetCalcRule$14
    flatMap:52, FlatMapRunner (org.apache.flink.table.runtime)
    flatMap:31, FlatMapRunner (org.apache.flink.table.runtime)
    collect:80, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
    collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
    reduce:80, DataSetFinalAggFunction (org.apache.flink.table.runtime.aggregate)
    run:131, GroupReduceDriver (org.apache.flink.runtime.operators)
    run:504, BatchTask (org.apache.flink.runtime.operators)
    invoke:369, BatchTask (org.apache.flink.runtime.operators)
    doRun:707, Task (org.apache.flink.runtime.taskmanager)
    run:532, Task (org.apache.flink.runtime.taskmanager)
    run:748, Thread (java.lang)
    

    0x07 总结

    由此我们可以看到:

    • GroupReduce,GroupCombine和mapPartition十分类似,都是从系统层面对算子进行优化,把循环操作放到用户自定义函数来处理。
    • 对于group by这个SQL语句,Flink将其翻译成 GroupReduce + GroupCombine,采用两阶段优化的方式来完成了对大数据下的处理。

    0x08 参考

    flink 使用问题汇总

  • 相关阅读:
    【笔记】【dp模型】
    【刷题】【dp】【背包】金明的预算
    建模结束了——5.3
    HDU
    洛谷 P2734 [USACO3.3]游戏 A Game
    洛谷 P4095 [HEOI2013]Eden 的新背包问题
    洛谷 P1156 垃圾陷阱
    洛谷 P1833 樱花
    洛谷 P3966 [TJOI2013]单词
    洛谷 P5357 【模板】AC自动机(二次加强版)
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/13148695.html
Copyright © 2011-2022 走看看