zoukankan      html  css  js  c++  java
  • SparkSQL UDF使用方法与原理详解

    UDF是SQL中很常见的功能,但在Spark-1.6及之前的版本,只能创建临时UDF,不支持创建持久化的UDF,除非修改Spark源码。从Spark-2.0开始,SparkSQL终于支持持久化的UDF。讲解SparkSQL中使用UDF和底层实现的原理。

    1. 临时UDF

    创建和使用方法:

    create temporary function tmp_trans_array as ''com.test.spark.udf.TransArray' using jar 'spark-test-udf-1.0.0.jar';
    
    select tmp_trans_array (1, '\|' , id, position) as (id0, position0) from test_udf limit 10;

      实现原理,在org.apache.spark.sql.execution.command.CreateFunctionCommand类的run方法中,会判断创建的Function是否是临时方法,若是,则会创建一个临时Function。从下面的代码我可以看到,临时函数直接注册到functionRegistry(实现类是SimpleFunctionRegistry),即内存中。

    def createTempFunction(
        name: String,
        info: ExpressionInfo,
        funcDefinition: FunctionBuilder,
        ignoreIfExists: Boolean): Unit = {
      if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
        throw new TempFunctionAlreadyExistsException(name)
      }
      functionRegistry.registerFunction(name, info, funcDefinition)
    }

    下面是实际的注册代码,所有需要的UDF都会加载到StringKeyHashMap。

    protected val functionBuilders =
      StringKeyHashMap[(ExpressionInfo, FunctionBuilder)](caseSensitive = false)
    
    override def registerFunction(
        name: String,
        info: ExpressionInfo,
        builder: FunctionBuilder): Unit = synchronized {
      functionBuilders.put(name, (info, builder))
    }

    2. 持久化UDF

    使用方法如下,注意jar包最好放在HDFS上,在其他机器上也能使用。

    create function trans_array as 'com.test.spark.udf.TransArray'  using jar 'hdfs://namenodeIP:9000/libs/spark-test-udf-1.0.0.jar';
    
    select trans_array (1, ' \|' , id, position) as (id0, position0) from test_spark limit 10

    实现原理

    (1)创建永久函数时,在org.apache.spark.sql.execution.command.CreateFunctionCommand中,会调用SessionCatalog的createFunction,最终执行了HiveExternalCatalog的createFunction,这里可以看出,创建永久函数会在Hive元数据库中创建相应的函数。通过查询元数据库我们可以看到如下记录,说明函数已经创建到元数据库中。

    mysql> select *  from FUNCS;
    | FUNC_ID    | CLASS_NAME                    | CREATE_TIME | DB_ID | FUNC_NAME     | FUNC_TYPE | OWNER_NAME | OWNER_TYPE |
    | 96         | com.test.spark.udf.TransArray |  1481459766 | 1     | trans_array   | 1         | NULL       | USER       |
    
    mysql> select *  from FUNC_RU;
    | FUNC_ID | RESOURCE_TYPE | RESOURCE_URI                                         | INTEGER_IDX |  
    |  96     | 1             | hdfs://namenodeIP:9000/libs/spark-test-udf-1.0.0.jar |  0          |

    (2)使用永久函数,在解析SQL中的UDF时,会调用SessionCatalog的lookupFunction0方法,在此方法中,首先会检查内存中是否存在,如果不存在则会加载此UDF,加载时会把RESOURCE_URI发到ClassLoader的路径中,如果把UDF注册到内存的functionRegistry中。主要代码在SessionCatalog,如下:

     def lookupFunction(
          name: FunctionIdentifier,
          children: Seq[Expression]): Expression = synchronized {
        // Note: the implementation of this function is a little bit convoluted.
        // We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
        // (built-in, temp, and external).
        if (name.database.isEmpty && functionRegistry.functionExists(name)) {
          // This function has been already loaded into the function registry.
          return functionRegistry.lookupFunction(name, children)
        }
    
        // If the name itself is not qualified, add the current database to it.
        val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
        val qualifiedName = name.copy(database = Some(database))
    
        if (functionRegistry.functionExists(qualifiedName)) {
          // This function has been already loaded into the function registry.
          // Unlike the above block, we find this function by using the qualified name.
          return functionRegistry.lookupFunction(qualifiedName, children)
        }
    
        // The function has not been loaded to the function registry, which means
        // that the function is a permanent function (if it actually has been registered
        // in the metastore). We need to first put the function in the FunctionRegistry.
        // TODO: why not just check whether the function exists first?
        val catalogFunction = try {
          externalCatalog.getFunction(database, name.funcName)
        } catch {
          case _: AnalysisException => failFunctionLookup(name)
          case _: NoSuchPermanentFunctionException => failFunctionLookup(name)
        }
        loadFunctionResources(catalogFunction.resources)
        // Please note that qualifiedName is provided by the user. However,
        // catalogFunction.identifier.unquotedString is returned by the underlying
        // catalog. So, it is possible that qualifiedName is not exactly the same as
        // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
        // At here, we preserve the input from the user.
        registerFunction(catalogFunction.copy(identifier = qualifiedName), overrideIfExists = false)
        // Now, we need to create the Expression.
        functionRegistry.lookupFunction(qualifiedName, children)
      }
    
      /**
       * List all functions in the specified database, including temporary functions. This
       * returns the function identifier and the scope in which it was defined (system or user
       * defined).
       */
      def listFunctions(db: String): Seq[(FunctionIdentifier, String)] = listFunctions(db, "*")
    
      /**
       * List all matching functions in the specified database, including temporary functions. This
       * returns the function identifier and the scope in which it was defined (system or user
       * defined).
       */
      def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = {
        val dbName = formatDatabaseName(db)
        requireDbExists(dbName)
        val dbFunctions = externalCatalog.listFunctions(dbName, pattern).map { f =>
          FunctionIdentifier(f, Some(dbName)) }
        val loadedFunctions = StringUtils
          .filterPattern(functionRegistry.listFunction().map(_.unquotedString), pattern).map { f =>
            // In functionRegistry, function names are stored as an unquoted format.
            Try(parser.parseFunctionIdentifier(f)) match {
              case Success(e) => e
              case Failure(_) =>
                // The names of some built-in functions are not parsable by our parser, e.g., %
                FunctionIdentifier(f)
            }
          }
        val functions = dbFunctions ++ loadedFunctions
        // The session catalog caches some persistent functions in the FunctionRegistry
        // so there can be duplicates.
        functions.map {
          case f if FunctionRegistry.functionSet.contains(f) => (f, "SYSTEM")
          case f => (f, "USER")
        }.distinct
      }
  • 相关阅读:
    8种CSS清除浮动的方法优缺点分析
    link和@import的区别
    删除表及删除表中数据的方法
    【HTML5】---【HTML5提供的一些新的标签用法以及和HTML 4的区别】
    【HTML】---HTML语义化
    【HTML5】页面点击按钮添加一行 删除一行 全选 反选 全不选
    第四篇:python 高级之面向对象初级
    第三篇:python高级之生成器&迭代器
    第二篇:python高级之装饰器
    第一篇:python高级之函数
  • 原文地址:https://www.cnblogs.com/itboys/p/9478711.html
Copyright © 2011-2022 走看看