zoukankan      html  css  js  c++  java
  • Flink实战(八十七):FLINK-SQL应用场景(7)Flink 与 hive 结合使用(六)Hive 函数

    1 通过 HiveModule 使用 Hive 内置函数

    在 Flink SQL 和 Table API 中,可以通过系统内置的 HiveModule 来使用 Hive 内置函数,

    详细信息,请参考 HiveModule

    val name            = "myhive"
    val version         = "2.3.4"
    
    tableEnv.loadModue(name, new HiveModule(version));
    modules:
       - name: core
         type: core
       - name: myhive
         type: hive
    • 请注意旧版本的部分 Hive 内置函数存在线程安全问题。 我们建议用户及时通过补丁修正 Hive 中的这些问题。

    2 Hive 用户自定义函数(User Defined Functions)

    在 Flink 中用户可以使用 Hive 里已经存在的 UDF 函数。

    支持的 UDF 类型包括:

    • UDF
    • GenericUDF
    • GenericUDTF
    • UDAF
    • GenericUDAFResolver2

    在进行查询规划和执行时,Hive UDF 和 GenericUDF 函数会自动转换成 Flink 中的 ScalarFunction,GenericUDTF 会被自动转换成 Flink 中的 TableFunction,UDAF 和 GenericUDAFResolver2 则转换成 Flink 聚合函数(AggregateFunction).

    想要使用 Hive UDF 函数,需要如下几步:

    • 通过 Hive Metastore 将带有 UDF 的 HiveCatalog 设置为当前会话的 catalog 后端。
    • 将带有 UDF 的 jar 包放入 Flink classpath 中,并在代码中引入。
    • 使用 Blink planner。

    3 使用 Hive UDF

    假设我们在 Hive Metastore 中已经注册了下面的 UDF 函数:

    /**
     * 注册为 'myudf' 的简单 UDF 测试类. 
     */
    public class TestHiveSimpleUDF extends UDF {
    
        public IntWritable evaluate(IntWritable i) {
            return new IntWritable(i.get());
        }
    
        public Text evaluate(Text text) {
            return new Text(text.toString());
        }
    }
    
    /**
     * 注册为 'mygenericudf' 的普通 UDF 测试类
     */
    public class TestHiveGenericUDF extends GenericUDF {
    
        @Override
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
            checkArgument(arguments.length == 2);
    
            checkArgument(arguments[1] instanceof ConstantObjectInspector);
            Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
            checkArgument(constant instanceof IntWritable);
            checkArgument(((IntWritable) constant).get() == 1);
    
            if (arguments[0] instanceof IntObjectInspector ||
                    arguments[0] instanceof StringObjectInspector) {
                return arguments[0];
            } else {
                throw new RuntimeException("Not support argument: " + arguments[0]);
            }
        }
    
        @Override
        public Object evaluate(DeferredObject[] arguments) throws HiveException {
            return arguments[0].get();
        }
    
        @Override
        public String getDisplayString(String[] children) {
            return "TestHiveGenericUDF";
        }
    }
    
    /**
     * 注册为 'mygenericudtf' 的字符串分割 UDF 测试类
     */
    public class TestHiveUDTF extends GenericUDTF {
    
        @Override
        public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
            checkArgument(argOIs.length == 2);
    
            // TEST for constant arguments
            checkArgument(argOIs[1] instanceof ConstantObjectInspector);
            Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
            checkArgument(constant instanceof IntWritable);
            checkArgument(((IntWritable) constant).get() == 1);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(
                Collections.singletonList("col1"),
                Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
        }
    
        @Override
        public void process(Object[] args) throws HiveException {
            String str = (String) args[0];
            for (String s : str.split(",")) {
                forward(s);
                forward(s);
            }
        }
    
        @Override
        public void close() {
        }
    }

    在 Hive CLI 中,可以查询到已经注册的 UDF 函数:

    hive> show functions;
    OK
    ......
    mygenericudf
    myudf
    myudtf

    此时,用户如果想使用这些 UDF,在 SQL 中就可以这样写:

    Flink SQL> select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from mysourcetable, lateral table(myudtf(name, 1)) as T(s);

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14054275.html

  • 相关阅读:
    Generate Parentheses
    Length of Last Word
    Maximum Subarray
    Count and Say
    二分搜索算法
    Search Insert Position
    Implement strStr()
    Remove Element
    Remove Duplicates from Sorted Array
    Remove Nth Node From End of List
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/14054275.html
Copyright © 2011-2022 走看看