zoukankan      html  css  js  c++  java
  • 第八篇:Spark SQL Catalyst源码分析之UDF

     /** Spark SQL源码分析系列文章*/ 

    在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准。

      在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能。但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF:

      spark1.0及以前的实现:

    [java] view plain copy
     
    1. protected[sql] lazy val catalog: Catalog = new SimpleCatalog  
    2. @transient  
    3. protected[sql] lazy val analyzer: Analyzer =  
    4.   new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true) //EmptyFunctionRegistry空实现  
    5. @transient  
    6. protected[sql] val optimizer = Optimizer  

      Spark1.1及以后的实现:

    [java] view plain copy
     
    1. protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry //SimpleFunctionRegistry实现,支持简单的UDF  
    2.   
    3. @transient  
    4. protected[sql] lazy val analyzer: Analyzer =  
    5.   new Analyzer(catalog, functionRegistry, caseSensitive = true)  

    一、引子:

      对于SQL语句中的函数,会经过SqlParser的的解析成UnresolvedFunction。UnresolvedFunction最后会被Analyzer解析。

     SqlParser:

     除了非官方定义的函数外,还可以定义自定义函数,sql parser会进行解析。

    [java] view plain copy
     
    1. ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {  
    2.     case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)  

      将SqlParser传入的udfName和exprs封装成一个class class UnresolvedFunction继承自Expression。

      只是这个Expression的dataType等一系列属性和eval计算方法均无法访问,强制访问会抛出异常,因为它没有被Resolved,只是一个载体。

    [java] view plain copy
     
    1. case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {  
    2.   override def dataType = throw new UnresolvedException(this, "dataType")  
    3.   override def foldable = throw new UnresolvedException(this, "foldable")  
    4.   override def nullable = throw new UnresolvedException(this, "nullable")  
    5.   override lazy val resolved = false  
    6.   
    7.   // Unresolved functions are transient at compile time and don't get evaluated during execution.  
    8.   override def eval(input: Row = null): EvaluatedType =  
    9.     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")  
    10.   
    11.   override def toString = s"'$name(${children.mkString(",")})"  
    12. }<strong></strong>  

    Analyzer:

      Analyzer初始化的时候会需要Catalog,database和table的元数据关系,以及FunctionRegistry来维护UDF名称和UDF实现的元数据,这里使用SimpleFunctionRegistry。

    [java] view plain copy
     
    1. /** 
    2.  * Replaces [[UnresolvedFunction]]s with concrete [[catalyst.expressions.Expression Expressions]]. 
    3.  */  
    4. object ResolveFunctions extends Rule[LogicalPlan] {  
    5.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
    6.     case q: LogicalPlan =>  
    7.       q transformExpressions { //对当前LogicalPlan进行transformExpressions操作  
    8.         case u @ UnresolvedFunction(name, children) if u.childrenResolved => //如果遍历到了UnresolvedFunction  
    9.           registry.lookupFunction(name, children) //从UDF元数据表里查找udf函数  
    10.       }  
    11.   }  
    12. }  

    二、UDF注册

    2.1 UDFRegistration

      

      registerFunction("len", (x:String)=>x.length)

      registerFunction是UDFRegistration下的方法,SQLContext现在实现了UDFRegistration这个trait,只要导入SQLContext,即可以使用udf功能。

      UDFRegistration核心方法registerFunction:

      registerFunction方法签名def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit

      接受一个udfName 和 一个FunctionN,可以是Function1 到Function22。即这个udf的参数只支持1-22个。(scala的痛啊)

      内部builder通过ScalaUdf来构造一个Expression,这里ScalaUdf继承自Expression(可以简单的理解目前的SimpleUDF即是一个Catalyst的一个Expression),传入scala的function作为UDF的实现,并且用反射检查字段类型是否是Catalyst允许的,见ScalaReflection.

    [java] view plain copy
     
    1. def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {  
    2. def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)//构造Expression  
    3. functionRegistry.registerFunction(name, builder)//向SQLContext的functionRegistry(维护了一个hashMap来管理udf映射)注册  

    2.2 注册Function:

    注意:这里FunctionBuilder是一个type FunctionBuilder = Seq[Expression] => Expression

    [java] view plain copy
     
    1. class SimpleFunctionRegistry extends FunctionRegistry {  
    2.   val functionBuilders = new mutable.HashMap[String, FunctionBuilder]() //udf映射关系维护[udfName,Expression]  
    3.   
    4.   def registerFunction(name: String, builder: FunctionBuilder) = { //put expression进Map  
    5.     functionBuilders.put(name, builder)  
    6.   }  
    7.   
    8.   override def lookupFunction(name: String, children: Seq[Expression]): Expression = {  
    9.     functionBuilders(name)(children) //查找udf,返回Expression  
    10.   }  
    11. }  

    至此,我们将一个scala function注册为一个catalyst的一个Expression,这就是spark的simple udf。

    三、UDF计算:

    UDF既然已经被封装为catalyst树里的一个Expression节点,那么计算的时候也就是计算ScalaUdf的eval方法。

    先通过Row和表达式计算function所需要的参数,最后通过反射调用function,来达到计算udf的目的。

     ScalaUdf继承自Expression:

    scalaUdf接受一个function, dataType,和一系列表达式。

    比较简单,看注释即可:

    [java] view plain copy
     
    1. case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])  
    2.   extends Expression {  
    3.   
    4.   type EvaluatedType = Any  
    5.   
    6.   def nullable = true  
    7.   
    8.   override def toString = s"scalaUDF(${children.mkString(",")})"  
    9.  override def eval(input: Row): Any = {  
    10.     val result = children.size match {  
    11.       case 0 => function.asInstanceOf[() => Any]()  
    12.       case 1 => function.asInstanceOf[(Any) => Any](children(0).eval(input)) //反射调用function  
    13.       case 2 =>  
    14.         function.asInstanceOf[(Any, Any) => Any](  
    15.           children(0).eval(input), //表达式参数计算  
    16.           children(1).eval(input))  
    17.       case 3 =>  
    18.         function.asInstanceOf[(Any, Any, Any) => Any](  
    19.           children(0).eval(input),  
    20.           children(1).eval(input),  
    21.           children(2).eval(input))  
    22.       case 4 =>  
    23.      ......  
    24.        case 22 => //scala function只支持22个参数,这里枚举了。  
    25.         function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](  
    26.           children(0).eval(input),  
    27.           children(1).eval(input),  
    28.           children(2).eval(input),  
    29.           children(3).eval(input),  
    30.           children(4).eval(input),  
    31.           children(5).eval(input),  
    32.           children(6).eval(input),  
    33.           children(7).eval(input),  
    34.           children(8).eval(input),  
    35.           children(9).eval(input),  
    36.           children(10).eval(input),  
    37.           children(11).eval(input),  
    38.           children(12).eval(input),  
    39.           children(13).eval(input),  
    40.           children(14).eval(input),  
    41.           children(15).eval(input),  
    42.           children(16).eval(input),  
    43.           children(17).eval(input),  
    44.           children(18).eval(input),  
    45.           children(19).eval(input),  
    46.           children(20).eval(input),  
    47.           children(21).eval(input))  

    四、总结

        Spark目前的UDF其实就是scala function。将scala function封装到一个Catalyst Expression当中,在进行sql计算时,使用同样的Eval方法对当前输入Row进行计算。

        编写一个spark udf非常简单,只需给UDF起个函数名,并且传递一个scala function即可。依靠scala函数编程的表现能力,使得编写scala udf比较简单,且相较hive的udf更容易使人理解。

    ——EOF——

    原创文章,转载请注明:

    转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

    本文链接地址:http://blog.csdn.net/oopsoom/article/details/39395641

    注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

    image

    转自:http://blog.csdn.net/oopsoom/article/details/39395641

  • 相关阅读:
    LeetCode 1110. Delete Nodes And Return Forest
    LeetCode 473. Matchsticks to Square
    LeetCode 886. Possible Bipartition
    LeetCode 737. Sentence Similarity II
    LeetCode 734. Sentence Similarity
    LeetCode 491. Increasing Subsequences
    LeetCode 1020. Number of Enclaves
    LeetCode 531. Lonely Pixel I
    LeetCode 1091. Shortest Path in Binary Matrix
    LeetCode 590. N-ary Tree Postorder Traversal
  • 原文地址:https://www.cnblogs.com/sh425/p/7596424.html
Copyright © 2011-2022 走看看