zoukankan      html  css  js  c++  java
  • 第八篇:Spark SQL Catalyst源码分析之UDF

     /** Spark SQL源码分析系列文章*/ 

    在SQL的世界里,除了官方提供的常用的处理函数之外,一般都会提供可扩展的对外自定义函数接口,这已经成为一种事实的标准。

      在前面Spark SQL源码分析之核心流程一文中,已经介绍了Spark SQL Catalyst Analyzer的作用,其中包含了ResolveFunctions这个解析函数的功能。但是随着Spark1.1版本的发布,Spark SQL的代码有很多新完善和新功能了,和我先前基于1.0的源码分析多少有些不同,比如支持UDF:

      spark1.0及以前的实现:

    [java] view plain copy
     
    1. protected[sql] lazy val catalog: Catalog = new SimpleCatalog  
    2. @transient  
    3. protected[sql] lazy val analyzer: Analyzer =  
    4.   new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = true) //EmptyFunctionRegistry空实现  
    5. @transient  
    6. protected[sql] val optimizer = Optimizer  

      Spark1.1及以后的实现:

    [java] view plain copy
     
    1. protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry //SimpleFunctionRegistry实现,支持简单的UDF  
    2.   
    3. @transient  
    4. protected[sql] lazy val analyzer: Analyzer =  
    5.   new Analyzer(catalog, functionRegistry, caseSensitive = true)  

    一、引子:

      对于SQL语句中的函数,会经过SqlParser的的解析成UnresolvedFunction。UnresolvedFunction最后会被Analyzer解析。

     SqlParser:

     除了非官方定义的函数外,还可以定义自定义函数,sql parser会进行解析。

    [java] view plain copy
     
    1. ident ~ "(" ~ repsep(expression, ",") <~ ")" ^^ {  
    2.     case udfName ~ _ ~ exprs => UnresolvedFunction(udfName, exprs)  

      将SqlParser传入的udfName和exprs封装成一个class class UnresolvedFunction继承自Expression。

      只是这个Expression的dataType等一系列属性和eval计算方法均无法访问,强制访问会抛出异常,因为它没有被Resolved,只是一个载体。

    [java] view plain copy
     
    1. case class UnresolvedFunction(name: String, children: Seq[Expression]) extends Expression {  
    2.   override def dataType = throw new UnresolvedException(this, "dataType")  
    3.   override def foldable = throw new UnresolvedException(this, "foldable")  
    4.   override def nullable = throw new UnresolvedException(this, "nullable")  
    5.   override lazy val resolved = false  
    6.   
    7.   // Unresolved functions are transient at compile time and don't get evaluated during execution.  
    8.   override def eval(input: Row = null): EvaluatedType =  
    9.     throw new TreeNodeException(this, s"No function to evaluate expression. type: ${this.nodeName}")  
    10.   
    11.   override def toString = s"'$name(${children.mkString(",")})"  
    12. }<strong></strong>  

    Analyzer:

      Analyzer初始化的时候会需要Catalog,database和table的元数据关系,以及FunctionRegistry来维护UDF名称和UDF实现的元数据,这里使用SimpleFunctionRegistry。

    [java] view plain copy
     
    1. /** 
    2.  * Replaces [[UnresolvedFunction]]s with concrete [[catalyst.expressions.Expression Expressions]]. 
    3.  */  
    4. object ResolveFunctions extends Rule[LogicalPlan] {  
    5.   def apply(plan: LogicalPlan): LogicalPlan = plan transform {  
    6.     case q: LogicalPlan =>  
    7.       q transformExpressions { //对当前LogicalPlan进行transformExpressions操作  
    8.         case u @ UnresolvedFunction(name, children) if u.childrenResolved => //如果遍历到了UnresolvedFunction  
    9.           registry.lookupFunction(name, children) //从UDF元数据表里查找udf函数  
    10.       }  
    11.   }  
    12. }  

    二、UDF注册

    2.1 UDFRegistration

      

      registerFunction("len", (x:String)=>x.length)

      registerFunction是UDFRegistration下的方法,SQLContext现在实现了UDFRegistration这个trait,只要导入SQLContext,即可以使用udf功能。

      UDFRegistration核心方法registerFunction:

      registerFunction方法签名def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit

      接受一个udfName 和 一个FunctionN,可以是Function1 到Function22。即这个udf的参数只支持1-22个。(scala的痛啊)

      内部builder通过ScalaUdf来构造一个Expression,这里ScalaUdf继承自Expression(可以简单的理解目前的SimpleUDF即是一个Catalyst的一个Expression),传入scala的function作为UDF的实现,并且用反射检查字段类型是否是Catalyst允许的,见ScalaReflection.

    [java] view plain copy
     
    1. def registerFunction[T: TypeTag](name: String, func: Function1[_, T]): Unit = {  
    2. def builder(e: Seq[Expression]) = ScalaUdf(func, ScalaReflection.schemaFor(typeTag[T]).dataType, e)//构造Expression  
    3. functionRegistry.registerFunction(name, builder)//向SQLContext的functionRegistry(维护了一个hashMap来管理udf映射)注册  

    2.2 注册Function:

    注意:这里FunctionBuilder是一个type FunctionBuilder = Seq[Expression] => Expression

    [java] view plain copy
     
    1. class SimpleFunctionRegistry extends FunctionRegistry {  
    2.   val functionBuilders = new mutable.HashMap[String, FunctionBuilder]() //udf映射关系维护[udfName,Expression]  
    3.   
    4.   def registerFunction(name: String, builder: FunctionBuilder) = { //put expression进Map  
    5.     functionBuilders.put(name, builder)  
    6.   }  
    7.   
    8.   override def lookupFunction(name: String, children: Seq[Expression]): Expression = {  
    9.     functionBuilders(name)(children) //查找udf,返回Expression  
    10.   }  
    11. }  

    至此,我们将一个scala function注册为一个catalyst的一个Expression,这就是spark的simple udf。

    三、UDF计算:

    UDF既然已经被封装为catalyst树里的一个Expression节点,那么计算的时候也就是计算ScalaUdf的eval方法。

    先通过Row和表达式计算function所需要的参数,最后通过反射调用function,来达到计算udf的目的。

     ScalaUdf继承自Expression:

    scalaUdf接受一个function, dataType,和一系列表达式。

    比较简单,看注释即可:

    [java] view plain copy
     
    1. case class ScalaUdf(function: AnyRef, dataType: DataType, children: Seq[Expression])  
    2.   extends Expression {  
    3.   
    4.   type EvaluatedType = Any  
    5.   
    6.   def nullable = true  
    7.   
    8.   override def toString = s"scalaUDF(${children.mkString(",")})"  
    9.  override def eval(input: Row): Any = {  
    10.     val result = children.size match {  
    11.       case 0 => function.asInstanceOf[() => Any]()  
    12.       case 1 => function.asInstanceOf[(Any) => Any](children(0).eval(input)) //反射调用function  
    13.       case 2 =>  
    14.         function.asInstanceOf[(Any, Any) => Any](  
    15.           children(0).eval(input), //表达式参数计算  
    16.           children(1).eval(input))  
    17.       case 3 =>  
    18.         function.asInstanceOf[(Any, Any, Any) => Any](  
    19.           children(0).eval(input),  
    20.           children(1).eval(input),  
    21.           children(2).eval(input))  
    22.       case 4 =>  
    23.      ......  
    24.        case 22 => //scala function只支持22个参数,这里枚举了。  
    25.         function.asInstanceOf[(Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any, Any) => Any](  
    26.           children(0).eval(input),  
    27.           children(1).eval(input),  
    28.           children(2).eval(input),  
    29.           children(3).eval(input),  
    30.           children(4).eval(input),  
    31.           children(5).eval(input),  
    32.           children(6).eval(input),  
    33.           children(7).eval(input),  
    34.           children(8).eval(input),  
    35.           children(9).eval(input),  
    36.           children(10).eval(input),  
    37.           children(11).eval(input),  
    38.           children(12).eval(input),  
    39.           children(13).eval(input),  
    40.           children(14).eval(input),  
    41.           children(15).eval(input),  
    42.           children(16).eval(input),  
    43.           children(17).eval(input),  
    44.           children(18).eval(input),  
    45.           children(19).eval(input),  
    46.           children(20).eval(input),  
    47.           children(21).eval(input))  

    四、总结

        Spark目前的UDF其实就是scala function。将scala function封装到一个Catalyst Expression当中,在进行sql计算时,使用同样的Eval方法对当前输入Row进行计算。

        编写一个spark udf非常简单,只需给UDF起个函数名,并且传递一个scala function即可。依靠scala函数编程的表现能力,使得编写scala udf比较简单,且相较hive的udf更容易使人理解。

    ——EOF——

    原创文章,转载请注明:

    转载自:OopsOutOfMemory盛利的Blog,作者: OopsOutOfMemory

    本文链接地址:http://blog.csdn.net/oopsoom/article/details/39395641

    注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议,欢迎转载、转发和评论,但是请保留本文作者署名和文章链接。如若需要用于商业目的或者与授权方面的协商,请联系我。

    image

    转自:http://blog.csdn.net/oopsoom/article/details/39395641

  • 相关阅读:
    【Rust】多种错误类型
    【Rust】Result别名
    【Rust】Option然后
    【Rust】可选和错误
    【Rust】Result问号
    【Rust】Option转换
    【Rust】Option展开
    【Rust】Result结果
    【Rust】Result提前返回
    jQuery过滤 安静点
  • 原文地址:https://www.cnblogs.com/sh425/p/7596424.html
Copyright © 2011-2022 走看看