zoukankan      html  css  js  c++  java
  • 从"UDF不应有状态" 切入来剖析Flink SQL代码生成

    从"UDF不应有状态" 切入来剖析Flink SQL代码生成

    0x00 摘要

    "Flink SQL UDF不应有状态" 这个技术细节可能有些朋友已经知道了。但是为什么不应该有状态呢?这个恐怕大家就不甚清楚了。本文就带你一起从这个问题点入手,看看Flink SQL究竟是怎么处理UDF,怎么生成对应的SQL代码。

    0x01 概述结论

    先说结论,后续一步步给大家详述问题过程。

    1. 问题结论

    结论是:Flink内部针对UDF生成了java代码,但是这些java代码针对SQL做了优化,导致在某种情况下,可能 会对 "在SQL中本应只调用一次" 的UDF 重复调用

    • 我们在写SQL时候,经常会在SQL中只写一次UDF,我们认为运行时候也应该只调用一次UDF。
    • 对于SQL,Flink是内部解析处理之后,把SQL语句转化为Flink原生算子来处理。大家可以认为是把SQL翻译成了java代码再执行,这些代码针对 SQL做了优化。
    • 对于UDF,Flink也是内部生成java代码来处理,这些代码也针对SQL做了优化。
    • 在Flink内部生成的这些代码中,Flink会在某些特定情况下,对 "在SQL中本应只调用一次" 的UDF 重复调用
    • Flink生成的内部代码,是把"投影运算"和"过滤条件"分别生成,然后拼接在一起。优化后的"投影运算"和"过滤条件"分别调用了UDF,所以拼接之后就会有多个UDF调用。
    • 因为实际上编写时候的一次UDF,优化后可能调用了多次,所以UDF内部就不应该有状态信息。

    比如:

    1. myFrequency 这个字段是由 UDF_FRENQUENCY 这个UDF函数 在本步骤生成。
    
    "SELECT word, UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount"
    
    2. 按说下面SQL语句就应该直接取出 myFrequency 即可。因为 myFrequency 已经存在了。
    
    "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
    
    但是因为Flink做了一些优化,把 第一个SQL中 UDF_FRENQUENCY 的计算下推到了 第二个SQL。
    
    3. 优化后实际就变成了类似这样的SQL。
    
    "SELECT word, UDF_FRENQUENCY(frequency) FROM tableFrequency WHERE UDF_FRENQUENCY(frequency) <> 0"
    
    4. 所以UDF_FRENQUENCY就被执行了两次:在WHERE中执行了一次,在SELECT中又执行了一次。
    

    Flink针对UDF所生成的Java代码 简化转义 版如下,能看出来调用了两次:

      // 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
    
        java.lang.Long result$12 = UDF_FRENQUENCY(frequency); // 这次 UDF 调用对应 WHERE myFrequency <> 0
        
        if (result$12 != 0) { // 这里说明 myFrequency <> 0,于是可以进行 SELECT
          
          // 这里对应的是 SELECT myFrequency,注意的是,按我们一般的逻辑,应该直接复用result$12,但是这里又调用了 UDF,重新计算了一遍。所以 UDF 才不应该有状态信息。
    	    java.lang.Long result$9 = UDF_FRENQUENCY(frequency);  
    
    	    long select;
          
    	    if (result$9 == null) {
    	      select = -1L;
    	    }
    	    else {
    	      select = result$9; // 这里最终 SELECT 了 myFrequency
    	    }
        }
    

    2. 问题流程

    实际上就是Flink生成SQL代码的流程,其中涉及到几个重要的节点举例如下:

    关于具体SQL流程,请参见我之前的文章:[源码分析] 带你梳理 Flink SQL / Table API内部执行流程

    // NOTE : 执行顺序是从上至下, " -----> " 表示生成的实例类型
    * 
    *        +-----> "SELECT xxxxx WHERE UDF_FRENQUENCY(frequency) <> 0" (SQL statement)
    *        |    
    *        |     
    *        +-----> LogicalFilter (RelNode) // Abstract Syntax Tree,未优化的RelNode   
    *        |      
    *        |     
    *    FilterToCalcRule (RelOptRule) // Calcite优化rule     
    *        | 
    *        |   
    *        +-----> LogicalCalc (RelNode)  // Optimized Logical Plan,逻辑执行计划
    *        |  
    *        |    
    *    DataSetCalcRule (RelOptRule) // Flink定制的优化rule,转化为物理执行计划
    *        |       
    *        |   
    *        +-----> DataSetCalc (FlinkRelNode) // Physical RelNode,物理执行计划
    *        |      
    *        |     
    *    DataSetCalc.translateToPlanInternal  // 作用是生成Flink算子  
    *        |     
    *        |     
    *        +-----> FlatMapRunner (Operator) // In Flink Task   
    *        |     
    *        |    
    

    这里的几个关键点是:

    • "WHERE UDF_FRENQUENCY(frequency) <> 0" 这部分SQL对应Calcite的逻辑算子是 LogicalFilter
    • LogicalFilter被转换为LogicalCalc,经过思考我们可以知道,Filter的Condition条件是需要进行计算才能获得的,所以需要转换为Calc
    • DataSetCalc中会生成UDF JAVA代码,这个java类是:DataSetCalcRule extends RichFlatMapFunction。这点很有意思,Flink认为UDF是一个Flatmap操作
    • 为什么UDF是一个Flatmap操作。因为UDF的输入实际是一个数据库记录Record,这很像集合;输出的是数目不等的几部分。这恰恰是Flatmap的思想所在

    关于FlatMap,请参见我之前的文章:[源码分析] 从FlatMap用法到Flink的内部实现

    我们后文中主要就是排查SQL生成流程中哪里出现了这个"UDF多次调用的问题点"

    0x02 实例代码

    以下是我们的示例程序,后续就讲解这个程序的生成代码。

    1. UDF函数

    import org.apache.flink.table.functions.ScalarFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class myUdf extends ScalarFunction {
        private Long current = 0L;
        private static final Logger LOGGER = LoggerFactory.getLogger(myUdf.class);
        public Long eval(Long a) throws Exception {
            if(current == 0L) {
                current = a;
            } else  {
                current += 1;
            }
            LOGGER.error("The current is : " + current );
            return current;
        }
    }
    

    2. 测试代码

    import org.apache.flink.api.scala._
    import org.apache.flink.table.api.scala._
    
    object TestUdf {
    
      def main(args: Array[String]): Unit = {
    
        // set up execution environment
        val env = ExecutionEnvironment.getExecutionEnvironment
        val tEnv = BatchTableEnvironment.create(env)
    
        val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
    
        tEnv.registerFunction("UDF_FRENQUENCY", new myUdf())
    
        // register the DataSet as a view "WordCount"
        tEnv.createTemporaryView("TableWordCount", input, 'word, 'frequency)
    
        val tableFrequency = tEnv.sqlQuery("SELECT word, UDF_FRENQUENCY(frequency) as myFrequency FROM TableWordCount")
        tEnv.registerTable("TableFrequency", tableFrequency)
    
        // run a SQL query on the Table and retrieve the result as a new Table
        val table = tEnv.sqlQuery("SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0")
    
        table.toDataSet[WC].print()
      }
    
      case class WC(word: String, frequency: Long)
    }
    

    3. 输出结果

    // 输出如下,能看到本来应该是调用三次,结果现在调用了六次
    
    11:15:05,409 ERROR mytestpackage.myUdf                - The current is : 1
    11:15:05,409 ERROR mytestpackage.myUdf                - The current is : 2
    11:15:05,425 ERROR mytestpackage.myUdf                - The current is : 3
    11:15:05,425 ERROR mytestpackage.myUdf                - The current is : 4
    11:15:05,426 ERROR mytestpackage.myUdf                - The current is : 5
    11:15:05,426 ERROR mytestpackage.myUdf                - The current is : 6
    

    1. LogicalFilter

    这里是 " myFrequency <> 0" 被转换为 LogicalFilter。具体是SqlToRelConverter函数中会将SQL语句转换为RelNode。

    具体在SqlToRelConverter (org.apache.calcite.sql2rel)完成,其打印内容摘要如下:

    filter = {LogicalFilter@4844} "LogicalFilter#2"
     variablesSet = {RegularImmutableSet@4817}  size = 0
     condition = {RexCall@4816} "<>($1, 0)"
     input = {LogicalProject@4737} "LogicalProject#1"
     desc = "LogicalFilter#2"
     rowType = null
     digest = "LogicalFilter#2"
     cluster = {RelOptCluster@4765} 
     id = 2
     traitSet = {RelTraitSet@4845}  size = 1
    
    展开查看调用栈
    
    create:107, LogicalFilter (org.apache.calcite.rel.logical)
    createFilter:333, RelFactories$FilterFactoryImpl (org.apache.calcite.rel.core)
    convertWhere:993, SqlToRelConverter (org.apache.calcite.sql2rel)
    convertSelectImpl:649, SqlToRelConverter (org.apache.calcite.sql2rel)
    convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
    convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
    convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
    rel:150, FlinkPlannerImpl (org.apache.flink.table.calcite)
    rel:135, FlinkPlannerImpl (org.apache.flink.table.calcite)
    toQueryOperation:490, SqlToOperationConverter (org.apache.flink.table.sqlexec)
    convertSqlQuery:315, SqlToOperationConverter (org.apache.flink.table.sqlexec)
    convert:155, SqlToOperationConverter (org.apache.flink.table.sqlexec)
    parse:66, ParserImpl (org.apache.flink.table.planner)
    sqlQuery:457, TableEnvImpl (org.apache.flink.table.api.internal)
    main:55, TestUdf$ (mytestpackage)
    main:-1, TestUdf (mytestpackage)
    

    2. FilterToCalcRule

    这里Flink发现了FilterToCalcRule 这个rule适合对Filter进行切换。

    我们思考下可知,Filter的Condition条件是需要进行计算才能获得的,所以需要转换为Calc

    具体源码在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)

    call = {VolcanoRuleMatch@5576} "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1, 0))]"
     targetSet = {RelSet@5581} 
     targetSubset = null
     digest = "rule [FilterToCalcRule] rels [rel#35:LogicalFilter.NONE(input=RelSubset#34,condition=<>($1, 0))]"
     cachedImportance = 0.891
     volcanoPlanner = {VolcanoPlanner@5526} 
     generatedRelList = null
     id = 45
     operand0 = {RelOptRuleOperand@5579} 
     nodeInputs = {RegularImmutableBiMap@5530}  size = 0
     rule = {FilterToCalcRule@5575} "FilterToCalcRule"
     rels = {RelNode[1]@5582} 
     planner = {VolcanoPlanner@5526} 
     parents = null
    
    展开查看调用栈
    
    onMatch:65, FilterToCalcRule (org.apache.calcite.rel.rules)
    onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
    findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
    run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
    runVolcanoPlanner:280, Optimizer (org.apache.flink.table.plan)
    optimizeLogicalPlan:199, Optimizer (org.apache.flink.table.plan)
    optimize:56, BatchOptimizer (org.apache.flink.table.plan)
    translate:280, BatchTableEnvImpl (org.apache.flink.table.api.internal)
    toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
    toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
    main:57, TestUdf$ (mytestpackage)
    main:-1, TestUdf (mytestpackage)
    

    3. LogicalCalc

    因为上述的FilterToCalcRule,所以生成了 LogicalCalc。我们也可以看到这里就是包含了UDF_FRENQUENCY

    calc = {LogicalCalc@5632} "LogicalCalc#60"
     program = {RexProgram@5631} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], proj#0..1=[{exprs}], $condition=[$t4])"
     input = {RelSubset@5605} "rel#32:Subset#0.LOGICAL"
     desc = "LogicalCalc#60"
     rowType = {RelRecordType@5629} "RecordType(VARCHAR(65536) word, BIGINT frequency)"
     digest = "LogicalCalc#60"
     cluster = {RelOptCluster@5596} 
     id = 60
     traitSet = {RelTraitSet@5597}  size = 1
    

    4. DataSetCalc

    经过转换,最后得到了physical RelNode,即物理执行计划 DataSetCalc。

    具体源码在 VolcanoPlanner.findBestExp (org.apache.calcite.plan.volcano)。

    // 这里给出了执行函数,运行内容和调用栈
      
    ConverterRule.onMatch(RelOptRuleCall call) {
            RelNode rel = call.rel(0);
            if (rel.getTraitSet().contains(this.inTrait)) {
                RelNode converted = this.convert(rel);
                if (converted != null) {
                    call.transformTo(converted);
                }
            }
    }
    
    // 转换后的 DataSetCalc 内容如下
    
    converted = {DataSetCalc@5560} "Calc(where: (<>(UDF_FRENQUENCY(frequency), 0:BIGINT)), select: (word, UDF_FRENQUENCY(frequency) AS myFrequency))"
     cluster = {RelOptCluster@5562} 
     rowRelDataType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word, BIGINT myFrequency)"
     calcProgram = {RexProgram@5566} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], word=[$t0], myFrequency=[$t2], $condition=[$t4])"
     ruleDescription = "DataSetCalcRule"
     program = {RexProgram@5566} "(expr#0..1=[{inputs}], expr#2=[UDF_FRENQUENCY($t1)], expr#3=[0:BIGINT], expr#4=[<>($t2, $t3)], word=[$t0], myFrequency=[$t2], $condition=[$t4])"
     input = {RelSubset@5564} "rel#71:Subset#5.DATASET"
     desc = "DataSetCalc#72"
     rowType = {RelRecordType@5565} "RecordType(VARCHAR(65536) word, BIGINT myFrequency)"
     digest = "DataSetCalc#72"
     AbstractRelNode.cluster = {RelOptCluster@5562} 
     id = 72
     traitSet = {RelTraitSet@5563}  size = 1
    
    展开查看调用栈
    
    init:52, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
    convert:40, DataSetCalcRule (org.apache.flink.table.plan.rules.dataSet)
    onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
    onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
    findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
    run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
    runVolcanoPlanner:280, Optimizer (org.apache.flink.table.plan)
    optimizePhysicalPlan:209, Optimizer (org.apache.flink.table.plan)
    optimize:57, BatchOptimizer (org.apache.flink.table.plan)
    translate:280, BatchTableEnvImpl (org.apache.flink.table.api.internal)
    toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
    toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
    main:57, TestUdf$ (mytestpackage)
    main:-1, TestUdf (mytestpackage)
    

    5. generateFunction (问题点所在)

    在DataSetCalc中,会最后生成UDF对应的JAVA代码。

    class DataSetCalc {
      
      override def translateToPlan(
          tableEnv: BatchTableEnvImpl,
          queryConfig: BatchQueryConfig): DataSet[Row] = {
    
        ......
        
        // 这里生成了UDF对应的JAVA代码
        val genFunction = generateFunction(
          generator,
          ruleDescription,
          new RowSchema(getRowType),
          projection,
          condition,
          config,
          classOf[FlatMapFunction[Row, Row]])
    
        // 这里生成了FlatMapRunner
        val runner = new FlatMapRunner(genFunction.name, genFunction.code, returnType)
    
        inputDS.flatMap(runner).name(calcOpName(calcProgram, getExpressionString))
      }  
    }
    
    展开查看调用栈
    
    translateToPlan:90, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
    translate:306, BatchTableEnvImpl (org.apache.flink.table.api.internal)
    translate:281, BatchTableEnvImpl (org.apache.flink.table.api.internal)
    toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
    toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
    main:57, TestUdf$ (mytestpackage)
    main:-1, TestUdf (mytestpackage)
    

    真正生成代码的位置如下,能看出来生成代码是FlatMapFunction。而本文的问题点就出现在这里

    // 下面能看出,针对不同的SQL子句,Flink会进行不同的转化
    
    trait CommonCalc {
    
      private[flink] def generateFunction[T <: Function](
          generator: FunctionCodeGenerator,
          ruleDescription: String,
          returnSchema: RowSchema,
          calcProjection: Seq[RexNode],
          calcCondition: Option[RexNode],
          config: TableConfig,
          functionClass: Class[T]):
        GeneratedFunction[T, Row] = {
    
        // 生成过滤条件,就是 SELEC。filterCondition实际上已经生成包含了调用UDF的代码,下面会给出其内容
        val projection = generator.generateResultExpression(
          returnSchema.typeInfo,
          returnSchema.fieldNames,
          calcProjection)
    
        // only projection
        val body = if (calcCondition.isEmpty) {
          s"""
            |${projection.code}
            |${generator.collectorTerm}.collect(${projection.resultTerm});
            |""".stripMargin
        }
        else {
          // 生成过滤条件,就是 WHERE。filterCondition实际上已经生成包含了调用UDF的代码,下面会给出其内容
          val filterCondition = generator.generateExpression(calcCondition.get)
            
          // only filter
          if (projection == null) {
            s"""
              |${filterCondition.code}
              |if (${filterCondition.resultTerm}) {
              |  ${generator.collectorTerm}.collect(${generator.input1Term});
              |}
              |""".stripMargin
          }
          // both filter and projection
          else {
            // 本例中,会进入到这里。把 filterCondition 和 projection 代码拼接起来。这下子就有了两个 UDF 的调用。
            s"""
              |${filterCondition.code}
              |if (${filterCondition.resultTerm}) {
              |  ${projection.code}
              |  ${generator.collectorTerm}.collect(${projection.resultTerm});
              |}
              |""".stripMargin
          }
        }
    
        // body 是filterCondition 和 projection 代码的拼接,分别都有 UDF 的调用,现在就有了两个UDF调用了,也就是我们问题所在。
        generator.generateFunction(
          ruleDescription,
          functionClass,
          body,
          returnSchema.typeInfo)
      }
    }
    
    // 此函数输入中,calcCondition就是我们SQL的过滤条件
    
    calcCondition = {Some@5663} "Some(<>(UDF_FRENQUENCY($1), 0))"
    
    // 此函数输入中,calcProjection就是我们SQL的投影运算条件
      
    calcProjection = {ArrayBuffer@5662} "ArrayBuffer" size = 2
     0 = {RexInputRef@7344} "$0"
     1 = {RexCall@7345} "UDF_FRENQUENCY($1)"
      
    // 生成过滤条件,就是 WHERE 对应的代码。filterCondition实际上已经生成包含了调用UDF的代码
      
    filterCondition = {GeneratedExpression@5749} "GeneratedExpression(result$16,isNull$17,
    
    
    
    java.lang.Long result$12 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      isNull$8 ? null : (java.lang.Long) result$7);
    
    
    boolean isNull$14 = result$12 == null;
    long result$13;
    if (isNull$14) {
      result$13 = -1L;
    }
    else {
      result$13 = result$12;
    }
    
    
    
    long result$15 = 0L;
    
    boolean isNull$17 = isNull$14 || false;
    boolean result$16;
    if (isNull$17) {
      result$16 = false;
    }
    else {
      result$16 = result$13 != result$15;
    }
    ,Boolean,false)"
        
    // 生成投影运算,就是 SELECT 对应的代码。projection也包含了调用UDF的代码  
      
    projection = {GeneratedExpression@5738} "GeneratedExpression(out,false,
    
    if (isNull$6) {
      out.setField(0, null);
    }
    else {
      out.setField(0, result$5);
    }
    
    
    
    
    
    java.lang.Long result$9 = function_spendreport$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      isNull$8 ? null : (java.lang.Long) result$7);
    
    
    boolean isNull$11 = result$9 == null;
    long result$10;
    if (isNull$11) {
      result$10 = -1L;
    }
    else {
      result$10 = result$9;
    }
    
    
    if (isNull$11) {
      out.setField(1, null);
    }
    else {
      out.setField(1, result$10);
    }
    ,Row(word: String, myFrequency: Long),false)"
      
    // 具体这个类其实是 DataSetCalcRule extends RichFlatMapFunction 
    name = "DataSetCalcRule"
      
    // 生成的类  
    clazz = {Class@5773} "interface org.apache.flink.api.common.functions.FlatMapFunction"
      
    // 生成类的部分代码,这里对应的是UDF的业务内容
    bodyCode = "
    
    
    
    
    java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      isNull$8 ? null : (java.lang.Long) result$7);
    
    
    boolean isNull$14 = result$12 == null;
    long result$13;
    if (isNull$14) {
      result$13 = -1L;
    }
    else {
      result$13 = result$12;
    }
    
    
    
    long result$15 = 0L;
    
    boolean isNull$17 = isNull$14 || false;
    boolean result$16;
    if (isNull$17) {
      result$16 = false;
    }
    else {
      result$16 = result$13 != result$15;
    }
    
    if (result$16) {
      
    
    if (isNull$6) {
      out.setField(0, null);
    }
    else {
      out.setField(0, result$5);
    }
    
    
    
    
    
    java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
      isNull$8 ? null : (java.lang.Long) result$7);
    
    
    boolean isNull$11 = result$9 == null;
    long result$10;
    if (isNull$11) {
      result$10 = -1L;
    }
    else {
      result$10 = result$9;
    }
    
    
    if (isNull$11) {
      out.setField(1, null);
    }
    else {
      out.setField(1, result$10);
    }
    
      c.collect(out);
    }
    "
    
    展开查看调用栈
    
    generateFunction:94, FunctionCodeGenerator (org.apache.flink.table.codegen)
    generateFunction:79, CommonCalc$class (org.apache.flink.table.plan.nodes)
    generateFunction:45, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
    translateToPlan:105, DataSetCalc (org.apache.flink.table.plan.nodes.dataset)
    translate:306, BatchTableEnvImpl (org.apache.flink.table.api.internal)
    translate:281, BatchTableEnvImpl (org.apache.flink.table.api.internal)
    toDataSet:69, BatchTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
    toDataSet:53, TableConversions (org.apache.flink.table.api.scala)
    main:57, TestUdf$ (mytestpackage)
    main:-1, TestUdf (mytestpackage)
    

    6. FlatMapRunner

    从定义能够看出来,FlatMapRunner继承了RichFlatMapFunction,说明 Flink认为UDF就是一个Flatmap操作

    package org.apache.flink.table.runtime
    
    class FlatMapRunner(
        name: String,
        code: String,
        @transient var returnType: TypeInformation[Row])
      extends RichFlatMapFunction[Row, Row] ... {
    
      private var function: FlatMapFunction[Row, Row] = _
    
      ...
    
      override def flatMap(in: Row, out: Collector[Row]): Unit =
        function.flatMap(in, out)
    
      ...
    }
    

    0x04 UDF生成的代码

    1. 缩减版

    这里是生成的代码缩减版,能看出具体问题点,myUdf函数被执行了两次。

    function_mytestpackage(myUdf)c45b0e23278f15e8f7d075abac9a121b 这个就是 myUdf 转换之后的函数。

      // 原始 SQL "SELECT word, myFrequency FROM TableFrequency WHERE myFrequency <> 0"
     
        java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
          isNull$8 ? null : (java.lang.Long) result$7); // 这次 UDF 调用对应 WHERE myFrequency <> 0
    
        boolean isNull$14 = result$12 == null; 
        boolean isNull$17 = isNull$14 || false;
        boolean result$16;
        if (isNull$17) {
          result$16 = false;
        }
        else {
          result$16 = result$13 != result$15;
        }
        
        if (result$16) { // 这里说明 myFrequency <> 0,所以可以进入
    	    java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
    	      isNull$8 ? null : (java.lang.Long) result$7); // 这里对应的是 SELECT myFrequency,注意的是,这里又调用了 UDF,重新计算了一遍,所以 UDF 才不应该有状态信息。 
    	    boolean isNull$11 = result$9 == null;
    	    long result$10;
    	    if (isNull$11) {
    	      result$10 = -1L;
    	    }
    	    else {
    	      result$10 = result$9; // 这里才进行SELECT myFrequency,但是这时候 UDF 已经被计算两次了
    	    }
        }
    

    2. 完整版

    以下是生成的代码,因为是自动生成,所以看起来会有点费劲,不过好在已经是最后一步了。

    public class DataSetCalcRule$18 extends org.apache.flink.api.common.functions.RichFlatMapFunction {
    
      final mytestpackage.myUdf function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b;
    
      final org.apache.flink.types.Row out =
          new org.apache.flink.types.Row(2);
      
      private org.apache.flink.types.Row in1;
    
      public DataSetCalcRule$18() throws Exception {
        
        function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b = (mytestpackage.myUdf)
        org.apache.flink.table.utils.EncodingUtils.decodeStringToObject(
          "rO0ABXNyABFzcGVuZHJlcG9ydC5teVVkZmGYnDRF7Hj4AgABTAAHY3VycmVudHQAEExqYXZhL2xhbmcvTG9uZzt4cgAvb3JnLmFwYWNoZS5mbGluay50YWJsZS5mdW5jdGlvbnMuU2NhbGFyRnVuY3Rpb25uLPkGQbqbDAIAAHhyADRvcmcuYXBhY2hlLmZsaW5rLnRhYmxlLmZ1bmN0aW9ucy5Vc2VyRGVmaW5lZEZ1bmN0aW9u14hb_NiViUACAAB4cHNyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHAAAAAAAAAAAA",
          org.apache.flink.table.functions.UserDefinedFunction.class); 
      }
    
      @Override
      public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.open(new org.apache.flink.table.functions.FunctionContext(getRuntimeContext()));
      }
    
      @Override
      public void flatMap(Object _in1, org.apache.flink.util.Collector c) throws Exception {
        in1 = (org.apache.flink.types.Row) _in1;
        
        boolean isNull$6 = (java.lang.String) in1.getField(0) == null;
        java.lang.String result$5;
        if (isNull$6) {
          result$5 = "";
        }
        else {
          result$5 = (java.lang.String) (java.lang.String) in1.getField(0);
        }
        
        boolean isNull$8 = (java.lang.Long) in1.getField(1) == null;
        long result$7;
        if (isNull$8) {
          result$7 = -1L;
        }
        else {
          result$7 = (java.lang.Long) in1.getField(1);
        }
    
        java.lang.Long result$12 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
          isNull$8 ? null : (java.lang.Long) result$7);
    
        boolean isNull$14 = result$12 == null;
        long result$13;
        if (isNull$14) {
          result$13 = -1L;
        }
        else {
          result$13 = result$12;
        }
    
        long result$15 = 0L;
        
        boolean isNull$17 = isNull$14 || false;
        boolean result$16;
        if (isNull$17) {
          result$16 = false;
        }
        else {
          result$16 = result$13 != result$15;
        }
        
        if (result$16) {
        
            if (isNull$6) {
              out.setField(0, null);
            }
            else {
              out.setField(0, result$5);
            }
    
            java.lang.Long result$9 = function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.eval(
              isNull$8 ? null : (java.lang.Long) result$7);
    
            boolean isNull$11 = result$9 == null;
            long result$10;
            if (isNull$11) {
              result$10 = -1L;
            }
            else {
              result$10 = result$9;
            }
    
            if (isNull$11) {
              out.setField(1, null);
            }
            else {
              out.setField(1, result$10);
            }
    
              c.collect(out);
            }
      }
    
      @Override
      public void close() throws Exception {  
        function_mytestpackage$myUdf$c45b0e23278f15e8f7d075abac9a121b.close();
      }
    }
    

    0x05 总结

    至此,我们把Flink SQL如何生成JAVA代码的流程大致走了一遍。

    Flink生成的内部代码,是把"投影运算"和"过滤条件"分别生成,然后拼接在一起

    即使原始SQL中只有一次UDF调用,但是如果SELECT和WHERE都间接用到了UDF,那么最终"投影运算"和"过滤条件"就会分别调用了UDF,所以拼接之后就会有多个UDF调用。

    这就是 "UDF不应该有内部历史状态" 的最终原因。我们在实际开发过程中一定要注意这个问题。

  • 相关阅读:
    HTML5: HTML5 Video(视频)
    HTML5: HTML5 Geolocation(地理定位)
    HTML5: HTML5 拖放
    HTML5: HTML5 MathML
    HTML5: HTML5 内联 SVG
    HTML5: HTML5 Canvas
    HTML5: HTML5 新元素
    HTML5: 浏览器支持
    HTML5: HTML5 介绍
    HTML5: 目录
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/12773123.html
Copyright © 2011-2022 走看看