zoukankan      html  css  js  c++  java
  • 【翻译】Flink Table Api & SQL — Hive —— Hive 函数

    本文翻译自官网:Hive Functions  https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/hive/hive_functions.html

    Flink Table Api & SQL 翻译目录

    用户可以在Flink中使用 Hive 现有的自定义函数。

    支持的UDF类型包括: 

    • UDF
    • GenericUDF
    • GenericUDTF
    • UDAF
    • GenericUDAFResolver2

    根据查询的计划和执行,Hive的UDF和GenericUDF会自动转换为Flink的ScalarFunction,Hive的GenericUDTF会自动转换为Flink的TableFunction,Hive的UDAF和GenericUDAFResolver2会转换为Flink的AggregateFunction。

    要使用Hive用户定义的函数,用户必须

    • 设置由Hive Metastore支持的HiveCatalog,其中包含该函数作为会话的当前 catalog
    • 在Flink的classpath中包含该函数的 jar
    • 使用 Blink planner

    使用Hive 自定义的函数

    假设我们在Hive Metastore中注册了以下Hive函数:

     

    /**
     * Test simple udf. Registered under name 'myudf'
     */
    public class TestHiveSimpleUDF extends UDF {
    
        public IntWritable evaluate(IntWritable i) {
            return new IntWritable(i.get());
        }
    
        public Text evaluate(Text text) {
            return new Text(text.toString());
        }
    }
    
    /**
     * Test generic udf. Registered under name 'mygenericudf'
     */
    public class TestHiveGenericUDF extends GenericUDF {
    
        @Override
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
            checkArgument(arguments.length == 2);
    
            checkArgument(arguments[1] instanceof ConstantObjectInspector);
            Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
            checkArgument(constant instanceof IntWritable);
            checkArgument(((IntWritable) constant).get() == 1);
    
            if (arguments[0] instanceof IntObjectInspector ||
                    arguments[0] instanceof StringObjectInspector) {
                return arguments[0];
            } else {
                throw new RuntimeException("Not support argument: " + arguments[0]);
            }
        }
    
        @Override
        public Object evaluate(DeferredObject[] arguments) throws HiveException {
            return arguments[0].get();
        }
    
        @Override
        public String getDisplayString(String[] children) {
            return "TestHiveGenericUDF";
        }
    }
    
    /**
     * Test split udtf. Registered under name 'mygenericudtf'
     */
    public class TestHiveUDTF extends GenericUDTF {
    
        @Override
        public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
            checkArgument(argOIs.length == 2);
    
            // TEST for constant arguments
            checkArgument(argOIs[1] instanceof ConstantObjectInspector);
            Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
            checkArgument(constant instanceof IntWritable);
            checkArgument(((IntWritable) constant).get() == 1);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(
                Collections.singletonList("col1"),
                Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
        }
    
        @Override
        public void process(Object[] args) throws HiveException {
            String str = (String) args[0];
            for (String s : str.split(",")) {
                forward(s);
                forward(s);
            }
        }
    
        @Override
        public void close() {
        }
    }

    从Hive CLI中,我们可以看到它们已注册: 

    hive> show functions;
    OK
    ......
    mygenericudf
    myudf
    myudtf

    然后,用户可以在SQL中以如下方式使用它们: 

    Flink SQL> select mygenericudf(myudf(name), 1) as a, mygenericudf(myudf(age), 1) as b, s from mysourcetable, lateral table(myudtf(name, 1)) as T(s);

    局限性

    Flink中现时不支持Hive内置内置。要使用Hive内置函数,用户必须首先在Hive Metastore中手动注册它们。

    仅在Blink planner中测试了Flink 批处理对Hive功能的支持。

    Hive函数当前不能在Flink中的各个 catalog 之间使用。

    有关数据类型限制,请参考Hive 

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

  • 相关阅读:
    Java SE 8 Archive Downloads (JDK 8u202 and earlier)
    MySql.Data.MySqlClient.MySqlProtocolException:“Packet received out-of-order. Expected 1; got 2.”
    docker image 镜像导入导出
    Windows设置国内源阿里云镜像加速与离线安装pip包的方法
    Python time strptime() 函数根据指定的格式把一个时间字符串解析为时间元组
    centos7下vs code编辑器字体与windows版本同步设置-安装中文字体,美化vscode
    windows10 pip install MySQL-python mysqlclient
    Failed to start docker.service: Unit not found.
    tcping测试端口是否畅通
    the type initializer for 'system.drawingcore.gdiplus' threw an exception
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/11982298.html
Copyright © 2011-2022 走看看