zoukankan      html  css  js  c++  java
  • SparkSQL UDF使用方法与原理详解


    1. 临时UDF


    create temporary function tmp_trans_array as ''com.test.spark.udf.TransArray' using jar 'spark-test-udf-1.0.0.jar';
    select tmp_trans_array (1, '\|' , id, position) as (id0, position0) from test_udf limit 10;


    def createTempFunction(
        name: String,
        info: ExpressionInfo,
        funcDefinition: FunctionBuilder,
        ignoreIfExists: Boolean): Unit = {
      if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) {
        throw new TempFunctionAlreadyExistsException(name)
      functionRegistry.registerFunction(name, info, funcDefinition)


    protected val functionBuilders =
      StringKeyHashMap[(ExpressionInfo, FunctionBuilder)](caseSensitive = false)
    override def registerFunction(
        name: String,
        info: ExpressionInfo,
        builder: FunctionBuilder): Unit = synchronized {
      functionBuilders.put(name, (info, builder))

    2. 持久化UDF


    create function trans_array as 'com.test.spark.udf.TransArray'  using jar 'hdfs://namenodeIP:9000/libs/spark-test-udf-1.0.0.jar';
    select trans_array (1, ' \|' , id, position) as (id0, position0) from test_spark limit 10



    mysql> select *  from FUNCS;
    | FUNC_ID    | CLASS_NAME                    | CREATE_TIME | DB_ID | FUNC_NAME     | FUNC_TYPE | OWNER_NAME | OWNER_TYPE |
    | 96         | com.test.spark.udf.TransArray |  1481459766 | 1     | trans_array   | 1         | NULL       | USER       |
    mysql> select *  from FUNC_RU;
    | FUNC_ID | RESOURCE_TYPE | RESOURCE_URI                                         | INTEGER_IDX |  
    |  96     | 1             | hdfs://namenodeIP:9000/libs/spark-test-udf-1.0.0.jar |  0          |


     def lookupFunction(
          name: FunctionIdentifier,
          children: Seq[Expression]): Expression = synchronized {
        // Note: the implementation of this function is a little bit convoluted.
        // We probably shouldn't use a single FunctionRegistry to register all three kinds of functions
        // (built-in, temp, and external).
        if (name.database.isEmpty && functionRegistry.functionExists(name)) {
          // This function has been already loaded into the function registry.
          return functionRegistry.lookupFunction(name, children)
        // If the name itself is not qualified, add the current database to it.
        val database = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
        val qualifiedName = name.copy(database = Some(database))
        if (functionRegistry.functionExists(qualifiedName)) {
          // This function has been already loaded into the function registry.
          // Unlike the above block, we find this function by using the qualified name.
          return functionRegistry.lookupFunction(qualifiedName, children)
        // The function has not been loaded to the function registry, which means
        // that the function is a permanent function (if it actually has been registered
        // in the metastore). We need to first put the function in the FunctionRegistry.
        // TODO: why not just check whether the function exists first?
        val catalogFunction = try {
          externalCatalog.getFunction(database, name.funcName)
        } catch {
          case _: AnalysisException => failFunctionLookup(name)
          case _: NoSuchPermanentFunctionException => failFunctionLookup(name)
        // Please note that qualifiedName is provided by the user. However,
        // catalogFunction.identifier.unquotedString is returned by the underlying
        // catalog. So, it is possible that qualifiedName is not exactly the same as
        // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
        // At here, we preserve the input from the user.
        registerFunction(catalogFunction.copy(identifier = qualifiedName), overrideIfExists = false)
        // Now, we need to create the Expression.
        functionRegistry.lookupFunction(qualifiedName, children)
       * List all functions in the specified database, including temporary functions. This
       * returns the function identifier and the scope in which it was defined (system or user
       * defined).
      def listFunctions(db: String): Seq[(FunctionIdentifier, String)] = listFunctions(db, "*")
       * List all matching functions in the specified database, including temporary functions. This
       * returns the function identifier and the scope in which it was defined (system or user
       * defined).
      def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = {
        val dbName = formatDatabaseName(db)
        val dbFunctions = externalCatalog.listFunctions(dbName, pattern).map { f =>
          FunctionIdentifier(f, Some(dbName)) }
        val loadedFunctions = StringUtils
          .filterPattern(functionRegistry.listFunction().map(_.unquotedString), pattern).map { f =>
            // In functionRegistry, function names are stored as an unquoted format.
            Try(parser.parseFunctionIdentifier(f)) match {
              case Success(e) => e
              case Failure(_) =>
                // The names of some built-in functions are not parsable by our parser, e.g., %
        val functions = dbFunctions ++ loadedFunctions
        // The session catalog caches some persistent functions in the FunctionRegistry
        // so there can be duplicates.
        functions.map {
          case f if FunctionRegistry.functionSet.contains(f) => (f, "SYSTEM")
          case f => (f, "USER")
  • 相关阅读:
    【HTML5】---【HTML5提供的一些新的标签用法以及和HTML 4的区别】
    【HTML5】页面点击按钮添加一行 删除一行 全选 反选 全不选
    第四篇:python 高级之面向对象初级
  • 原文地址:https://www.cnblogs.com/itboys/p/9478711.html
Copyright © 2011-2022 走看看