0 简介
自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。
自定义函数可以用 JVM 语言(例如 Java 或 Scala)或 Python 实现,实现者可以在 UDF 中使用任意第三方库,本文聚焦于使用 JVM 语言开发自定义函数。
1 概述
当前 Flink 有如下几种函数:
- 标量函数 将标量值转换成一个新标量值;
- 表值函数 将标量值转换成新的行数据;
- 聚合函数 将多行数据里的标量值转换成一个新标量值;
- 表值聚合函数 将多行数据里的标量值转换成新的行数据;
- 异步表值函数 是异步查询外部数据系统的特殊函数。
注意 标量和表值函数已经使用了新的基于数据类型的类型系统,聚合函数仍然使用基于 TypeInformation
的旧类型系统。
以下示例展示了如何创建一个基本的标量函数,以及如何在 Table API 和 SQL 里调用这个函数。
函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 内联 后直接使用。
import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunction // define function logic class SubstringFunction extends ScalarFunction { def eval(s: String, begin: Integer, end: Integer): String = { s.substring(begin, end) } } val env = TableEnvironment.create(...) // 在 Table API 里不经注册直接“内联”调用函数 env.from("MyTable").select(call(classOf[SubstringFunction], $"myField", 5, 12)) // 注册函数 env.createTemporarySystemFunction("SubstringFunction", classOf[SubstringFunction]) // 在 Table API 里调用注册好的函数 env.from("MyTable").select(call("SubstringFunction", $"myField", 5, 12)) // 在 SQL 里调用注册好的函数 env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable")
对于交互式会话,还可以在使用或注册函数之前对其进行参数化,这样可以把函数 实例 而不是函数 类 用作临时函数。
为确保函数实例可应用于集群环境,参数必须是可序列化的。
import org.apache.flink.table.api._ import org.apache.flink.table.functions.ScalarFunction // 定义可参数化的函数逻辑 class SubstringFunction(val endInclusive) extends ScalarFunction { def eval(s: String, begin: Integer, end: Integer): String = { s.substring(endInclusive ? end + 1 : end) } } val env = TableEnvironment.create(...) // 在 Table API 里不经注册直接“内联”调用函数 env.from("MyTable").select(call(new SubstringFunction(true), $"myField", 5, 12)) // 注册函数 env.createTemporarySystemFunction("SubstringFunction", new SubstringFunction(true))
2 开发指南
注意在聚合函数使用新的类型系统前,本节仅适用于标量和表值函数。
所有的自定义函数都遵循一些基本的实现原则。
2.1 函数类
实现类必须继承自合适的基类之一(例如 org.apache.flink.table.functions.ScalarFunction
)。
该类必须声明为 public
,而不是 abstract
,并且可以被全局访问。不允许使用非静态内部类或匿名类。
为了将自定义函数存储在持久化的 catalog 中,该类必须具有默认构造器,且在运行时可实例化。
2.2 求值方法
基类提供了一组可以被重写的方法,例如 open()
、 close()
或 isDeterministic()
。
但是,除了上述方法之外,作用于每条传入记录的主要逻辑还必须通过专门的 求值方法 来实现。
根据函数的种类,后台生成的运算符会在运行时调用诸如 eval()
、accumulate()
或 retract()
之类的求值方法。
这些方法必须声明为 public
,并带有一组定义明确的参数。
常规的 JVM 方法调用语义是适用的。因此可以:
- 实现重载的方法,例如
eval(Integer)
和eval(LocalDateTime)
; - 使用变长参数,例如
eval(Integer...)
; - 使用对象继承,例如
eval(Object)
可接受LocalDateTime
和Integer
作为参数; - 也可组合使用,例如
eval(Object...)
可接受所有类型的参数。
以下代码片段展示了一个重载函数的示例:
import org.apache.flink.table.functions.ScalarFunction import scala.annotation.varargs // 有多个重载求值方法的函数 class SumFunction extends ScalarFunction { def eval(a: Integer, b: Integer): Integer = { a + b } def eval(a: String, b: String): Integer = { Integer.valueOf(a) + Integer.valueOf(b) } @varargs // generate var-args like Java def eval(d: Double*): Integer = { d.sum.toInt } }
2.3 类型推导
Table(类似于 SQL 标准)是一种强类型的 API。因此,函数的参数和返回类型都必须映射到数据类型。
从逻辑角度看,Planner 需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用自定义函数时需要知道如何将内部数据结构表示为 JVM 对象。
术语 类型推导 概括了意在验证输入值、派生出参数/返回值数据类型的逻辑。
Flink 自定义函数实现了自动的类型推导提取,通过反射从函数的类及其求值方法中派生数据类型。如果这种隐式的反射提取方法不成功,则可以通过使用 @DataTypeHint
和 @FunctionHint
注解相关参数、类或方法来支持提取过程,下面展示了有关如何注解函数的例子。
如果需要更高级的类型推导逻辑,实现者可以在每个自定义函数中显式重写 getTypeInference()
方法。但是,建议使用注解方式,因为它可使自定义类型推导逻辑保持在受影响位置附近,而在其他位置则保持默认状态。
自动类型推导
自动类型推导会检查函数的类和求值方法,派生出函数参数和结果的数据类型, @DataTypeHint
和 @FunctionHint
注解支持自动类型推导。
有关可以隐式映射到数据类型的类的完整列表,请参阅数据类型。
@DataTypeHint
在许多情况下,需要支持以 内联 方式自动提取出函数参数、返回值的类型。
以下例子展示了如何使用 @DataTypeHint
,详情可参考该注解类的文档。
import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.annotation.InputGroup import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row import scala.annotation.varargs // function with overloaded evaluation methods class OverloadedFunction extends ScalarFunction { // no hint required def eval(a: Long, b: Long): Long = { a + b } // 定义 decimal 的精度和小数位 @DataTypeHint("DECIMAL(12, 3)") def eval(double a, double b): BigDecimal = { java.lang.BigDecimal.valueOf(a + b) } // 定义嵌套数据类型 @DataTypeHint("ROW<s STRING, t TIMESTAMP(3) WITH LOCAL TIME ZONE>") def eval(Int i): Row = { Row.of(java.lang.String.valueOf(i), java.time.Instant.ofEpochSecond(i)) } // 允许任意类型的符入,并输出定制序列化后的值 @DataTypeHint(value = "RAW", bridgedTo = classOf[java.nio.ByteBuffer]) def eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object o): java.nio.ByteBuffer = { MyUtils.serializeToByteBuffer(o) } }
@FunctionHint
有时我们希望一种求值方法可以同时处理多种数据类型,有时又要求对重载的多个求值方法仅声明一次通用的结果类型。
@FunctionHint
注解可以提供从入参数据类型到结果数据类型的映射,它可以在整个函数类或求值方法上注解输入、累加器和结果的数据类型。可以在类顶部声明一个或多个注解,也可以为类的所有求值方法分别声明一个或多个注解。所有的 hint 参数都是可选的,如果未定义参数,则使用默认的基于反射的类型提取。在函数类顶部定义的 hint 参数被所有求值方法继承。
以下例子展示了如何使用 @FunctionHint
,详情可参考该注解类的文档。
import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.annotation.FunctionHint import org.apache.flink.table.functions.TableFunction import org.apache.flink.types.Row // 为函数类的所有求值方法指定同一个输出类型 @FunctionHint(output = new DataTypeHint("ROW<s STRING, i INT>")) class OverloadedFunction extends TableFunction[Row] { def eval(a: Int, b: Int): Unit = { collect(Row.of("Sum", Int.box(a + b))) } // overloading of arguments is still possible def eval(): Unit = { collect(Row.of("Empty args", Int.box(-1))) } } // 解耦类型推导与求值方法,类型推导完全取决于 @FunctionHint @FunctionHint( input = Array(new DataTypeHint("INT"), new DataTypeHint("INT")), output = new DataTypeHint("INT") ) @FunctionHint( input = Array(new DataTypeHint("BIGINT"), new DataTypeHint("BIGINT")), output = new DataTypeHint("BIGINT") ) @FunctionHint( input = Array(), output = new DataTypeHint("BOOLEAN") ) class OverloadedFunction extends TableFunction[AnyRef] { // an implementer just needs to make sure that a method exists // that can be called by the JVM @varargs def eval(o: AnyRef*) = { if (o.length == 0) { collect(Boolean.box(false)) } collect(o(0)) } }
定制类型推导
在大多数情况下,@DataTypeHint
和 @FunctionHint
足以构建自定义函数,然而通过重写 getTypeInference()
定制自动类型推导逻辑,实现者可以创建任意像系统内置函数那样有用的函数。
以下用 Java 实现的例子展示了定制类型推导的潜力,它根据字符串参数来确定函数的结果类型。该函数带有两个字符串参数:第一个参数表示要分析的字符串,第二个参数表示目标类型。
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.DataTypeFactory; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.types.inference.TypeInference; import org.apache.flink.types.Row; public static class LiteralFunction extends ScalarFunction { public Object eval(String s, String type) { switch (type) { case "INT": return Integer.valueOf(s); case "DOUBLE": return Double.valueOf(s); case "STRING": default: return s; } } // 禁用自动的反射式类型推导,使用如下逻辑进行类型推导 @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { return TypeInference.newBuilder() // 指定输入参数的类型,必要时参数会被隐式转换 .typedArguments(DataTypes.STRING(), DataTypes.STRING()) // specify a strategy for the result data type of the function .outputTypeStrategy(callContext -> { if (!callContext.isArgumentLiteral(1) || callContext.isArgumentNull(1)) { throw callContext.newValidationError("Literal expected for second argument."); } // 基于字符串值返回数据类型 final String literal = callContext.getArgumentValue(1, String.class).orElse("STRING"); switch (literal) { case "INT": return Optional.of(DataTypes.INT().notNull()); case "DOUBLE": return Optional.of(DataTypes.DOUBLE().notNull()); case "STRING": default: return Optional.of(DataTypes.STRING()); } }) .build(); } }
2.4 运行时集成
有时候自定义函数需要获取一些全局信息,或者在真正被调用之前做一些配置(setup)/清理(clean-up)的工作。自定义函数也提供了 open()
和 close()
方法,你可以重写这两个方法做到类似于 DataStream API 中 RichFunction
的功能。
open() 方法在求值方法被调用之前先调用。close() 方法在求值方法调用完之后被调用。
open() 方法提供了一个 FunctionContext,它包含了一些自定义函数被执行时的上下文信息,比如 metric group、分布式文件缓存,或者是全局的作业参数等。
下面的信息可以通过调用 FunctionContext
的对应的方法来获得:
方法 | 描述 |
---|---|
getMetricGroup() |
执行该函数的 subtask 的 Metric Group。 |
getCachedFile(name) |
分布式文件缓存的本地临时文件副本。 |
getJobParameter(name, defaultValue) |
跟对应的 key 关联的全局参数值。 |
下面的例子展示了如何在一个标量函数中通过 FunctionContext 来获取一个全局的任务参数:
import org.apache.flink.table.api._ import org.apache.flink.table.functions.FunctionContext import org.apache.flink.table.functions.ScalarFunction class HashCodeFunction extends ScalarFunction { private var factor: Int = 0 override def open(context: FunctionContext): Unit = { // 获取参数 "hashcode_factor" // 如果不存在,则使用默认值 "12" factor = context.getJobParameter("hashcode_factor", "12").toInt } def eval(s: String): Int = { s.hashCode * factor } } val env = TableEnvironment.create(...) // 设置任务参数 env.getConfig.addJobParameter("hashcode_factor", "31") // 注册函数 env.createTemporarySystemFunction("hashCode", classOf[HashCodeFunction]) // 调用函数 env.sqlQuery("SELECT myField, hashCode(myField) FROM MyTable")