zoukankan      html  css  js  c++  java
  • Flink UDF 解析复杂 SQL

    2021-06-07 修改

    白干了,flink 1.13 json format 可以直接解析复杂的sql,以如下格式

    CREATE TABLE user_log (
      user_id STRING
      ,item_id STRING
      ,category_id STRING
      ,sub_json ROW(sub_name STRING, password STRING, sub_json ROW(sub_name STRING, sub_pass STRING))
    ) WITH (
       'connector' = 'kafka'
      ,'topic' = 'user_b'
      ,'properties.bootstrap.servers' = '10.201.1.132:9092'
      ,'properties.group.id' = 'user_log_1'
      ,'scan.startup.mode' = 'latest-offset'
      ,'format' = 'json'
      ,'json.ignore-parse-errors' = 'false'
    );
    
    insert into mysql_table_venn_user_log_sink
    SELECT user_id, item_id, category_id, sub_json.sub_name, sub_json.password, sub_json.sub_json.sub_name, sub_json.sub_json.sub_pass
    FROM user_log;

    -------------分割线-------------------

    最近用 Flink  处理一些 json 格式数据的时候,突然发现 1.13 的 json format 没有解析复杂 SQL 的属性了

    在 Flink 1.10 的时候,还写了一篇博客来介绍 自定义 json 格式的写法:

    https://www.cnblogs.com/Springmoon-venn/p/12664547.html

    这就尴尬了

    没发,只能写个自定义的 udf 先凑合着用了,这两天突然有点想法,写个通用的 udf,来解析复杂 json

    处理json 的udf 的需求是输入多个字段,返回多个字段,但是只有一行,只能使用 UDTF(flink 也就是 table functions)


    官网地址: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/udfs/#table-functions

    类型推导 
    Table(类似于 SQL 标准)是一种强类型的 API。因此,函数的参数和返回类型都必须映射到[数据类型]({%link dev/table/types.zh.md %})。
    
    从逻辑角度看,Planner 需要知道数据类型、精度和小数位数;从 JVM 角度来看,Planner 在调用自定义函数时需要知道如何将内部数据结构表示为 JVM 对象。
    
    术语 类型推导 概括了意在验证输入值、派生出参数/返回值数据类型的逻辑。
    
    Flink 自定义函数实现了自动的类型推导提取,通过反射从函数的类及其求值方法中派生数据类型。如果这种隐式的反射提取方法不成功,则可以通过使用 @DataTypeHint 和 @FunctionHint 注解相关参数、类或方法来支持提取过程,下面展示了有关如何注解函数的例子。
    
    如果需要更高级的类型推导逻辑,实现者可以在每个自定义函数中显式重写 getTypeInference() 方法。但是,建议使用注解方式,因为它可使自定义类型推导逻辑保持在受影响位置附近,而在其他位置则保持默认状态。
    
    自动类型推导 
    自动类型推导会检查函数的类和求值方法,派生出函数参数和结果的数据类型, @DataTypeHint 和 @FunctionHint 注解支持自动类型推导。
    
    有关可以隐式映射到数据类型的类的完整列表,请参阅[数据类型]({%link dev/table/types.zh.md %}#数据类型注解)。

    官网示例:

    @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
    public static class SplitFunction extends TableFunction<Row> {
    
      public void eval(String str) {
        for (String s : str.split(" ")) {
          // use collect(...) to emit a row
          collect(Row.of(s, s.length()));
        }
      }
    }

    发现scala 的支持有点尴尬,因为想做一个解析复杂 json 的通用UDF,json 里面的字段是不固定的,如果将返回值放到数组中,如: collect(Row.of(arr(0), arr(1))) 这样返回会报错: Scala collections are not supported. See the documentation for supported classes or treat them as RAW types.

    改成 java 略好一点

    改完的代码,写成这样:

    public class ParseJson extends TableFunction<Row> {
    
        public void eval(String... json) {
            if (json == null || json.length == 0) {
                return;
            }
            // get column from json
            String[] arr = getStrings(json);
            
            RowKind rowKind = RowKind.fromByteValue((byte) 0);
            Row row = new Row(rowKind, json.length - 1);
            for (int i = 0; i < arr.length; ++i) {
                row.setField(json[i + 1], arr[i]);
            }
    
            collect(row);
        }
    
        /**
         * parse user columns from json and provider column name
         */
        private String[] getStrings(String[] json) {
            JsonObject jsonObject = new JsonParser().parse(json[0]).getAsJsonObject();
            int len = json.length - 1;
            String[] arr = new String[len];
            for (int i = 0; i < len - 1; ++i) {
                JsonElement tm = jsonObject.get(json[i + 1]);
                if (tm != null) {
                    arr[i] = tm.getAsString();
                } else {
                    arr[i] = null;
                }
            }
            return arr;
        }
    }

    代码比较简单,输入参数是数组的,第一个字段是需要处理的 json 字符串,后面的字段是需要解析的字段
    解析出来的结果放在 String 字符数组中,再转为 Row

    可是不能推导出 Row 的类型和名称: Cannot extract a data type from a pure 'org.apache.flink.types.Row' class. Please use annotations to define field names and field types.

    吐血啊,如果这样不能识别,需要在 annotations 上写配置对应的列名和列类型,就不通用了

    退而求其次,先配置个 annotations 看下情况再说

    @FunctionHint(output = @DataTypeHint("ROW<col1 STRING, col2 STRING, col3 STRING>"))
    public void eval(String json, String col1, String col2, String col3) {
        if (json == null || json.trim().length() == 0) {
            return;
        }
        String[] inputArr = new String[3];
        inputArr[0] = col1;
        inputArr[1] = col2;
        inputArr[2] = col3;
        String[] arr = getStrings(json, inputArr);
        collect(Row.of(arr[0], arr[1], arr[2]));
    }
    
    @FunctionHint(output = @DataTypeHint("ROW<col1 STRING, col2 STRING, col3 STRING, col4 STRING>"))
    public void eval(String json, String col1, String col2, String col3, String col4) {
        if (json == null || json.trim().length() == 0) {
            return;
        }
        String[] inputArr = new String[4];
        inputArr[0] = col1;
        inputArr[1] = col2;
        inputArr[2] = col3;
        inputArr[3] = col4;
        String[] arr = getStrings(json, inputArr);
        collect(Row.of(arr[0], arr[1], arr[2], arr[3]));
    }

    测试 json 如下:

    {"subJson":"{"password":"pass_4","doub":"3.1250","username":"venn_4"}","category_id":"category_id_4","user_id":"user_id_702","item_id":"item_id_4","behavior":"4","sort_col":"25","sales":"35","ts":"2021-05-31 14:29:53.680"}

    测试 SQL 如下:

    insert into t_json_sink
    select col1, col2, item_id, username, password, cast(doub as double) doub
    from t_json a
        LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id', 'item_id', 'subJson')) AS T(col1, col2, item_id,subJson) ON TRUE
        LEFT JOIN LATERAL TABLE(udf_parse_json(subJson, 'username', 'password', 'doub')) AS T1(username, password, doub) ON TRUE
        ;

    这样是可以解析复杂的嵌套 json 的,但是需要穷举字段数量,因为是通用的 UDF,需要支持所有的字段

    测试了一下三层的 json,是可支持的

    测试 SQL 如下:

    insert into t_json_sink
    select category_id, user_id, item_id, cast(sort_col as int) sort_col, username, password, cast(doub as double) doub,sub_name,sub_pass
    from t_json a
        LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id', 'item_id', 'sort_col', 'sub_json')) AS T(category_id, user_id, item_id, sort_col, sub_json) ON TRUE
        LEFT JOIN LATERAL TABLE(udf_parse_json(sub_json, 'username', 'password', 'doub', 'sub_json_1')) AS T1(username, password, doub, sub_json_1) ON TRUE
        LEFT JOIN LATERAL TABLE(udf_parse_json(sub_json_1, 'sub_name', 'sub_pass')) AS T2(sub_name, sub_pass) ON TRUE
        ;

    看起来,多嵌套几层也是可以执行的,唯一的问题就是需要穷举字段数量

    直接代码生成100个以内的所有字段的 eval 重载方法,发现程序不能启动了,一直卡在 sql 检查上

    减少 eval 的重载方法数量,发现随着重载方法增多,启动越慢,10 个以上重载方法的启动时间(15秒)就不能接受了

    挑战失败,继续翻官网,在数据类型章节最前面看到:

    ROW<myField ARRAY<BOOLEAN>, myOtherField TIMESTAMP(3)>

    ROW 类型,里面的字段有数组,如果把 FunctionHint 写成这样:

    @FunctionHint(output = @DataTypeHint("ROW<arr ARRAY<STRING>>"))

    直接返回一个数组,刚好处理过程都是用的数组作为流转的类型,不需要构造结果的格式了

    最后的代码写成这样

    @FunctionHint(output = @DataTypeHint("ROW<arr ARRAY<STRING>>"))
    public void eval(String... json) {
        if (json == null || json.length == 0) {
            return;
        }
        String[] arr = getStrings(json);
        RowKind rowKind = RowKind.fromByteValue((byte) 0);
        Row row = new Row(rowKind, json.length - 1);
        row.setField(0, arr);
        collect(row);
    }

    对应的SQL 如下:

    insert into t_json_sink
    select arr, arr[1],arr[2]
    from t_json a
        LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'category_id', 'user_id')) AS T(arr) ON TRUE

    这样也能很好的实现通用的复杂 json 解析

    测试了下嵌套6层的 json:

    {
        "sub_json": "{
            "sub_json":"    {
                "sub_json":"{
                    "sub_json":"{
                        "sub_json":"{"sub_name":"sub_5_venn_3","sub_pass":"sub_5_pass_3"}",
                        "sub_name":"sub_4_venn_3","sub_pass":"sub_4_pass_3"}",
                    "sub_name":"sub_3_venn_3","sub_pass":"sub_3_pass_3"}",
                "sub_name":"sub_2_sub_venn_3","sub_pass":"sub_2_sub_pass_3"}",
            "password":"sub_1_pass_3","sub_name":"sub_1_venn_3","doub":"3.1250"}",
        "category_id": "category_id_3",
        "user_id": "user_id_73",
        "item_id": "item_id_3",
        "behavior": "3",
        "sort_col": "45",
        "sales": "45",
        "ts": "2021-06-01 13:52:44.708"
    }

    对应 SQL 如下:

    insert into t_json_sink
    select T.arr[1], T1.arr[1], T2.arr[1], T3.arr[1], T4.arr[1], T5.arr[1]
    from t_json a
        LEFT JOIN LATERAL TABLE(udf_parse_json(json, 'user_id', 'sub_json')) AS T(arr) ON TRUE
        LEFT JOIN LATERAL TABLE(udf_parse_json(T.arr[2], 'sub_name', 'sub_json')) AS T1(arr) ON TRUE
        LEFT JOIN LATERAL TABLE(udf_parse_json(T1.arr[2], 'sub_name', 'sub_json')) AS T2(arr) ON TRUE
        LEFT JOIN LATERAL TABLE(udf_parse_json(T2.arr[2], 'sub_name', 'sub_json')) AS T3(arr) ON TRUE
        LEFT JOIN LATERAL TABLE(udf_parse_json(T3.arr[2], 'sub_name', 'sub_json')) AS T4(arr) ON TRUE
        LEFT JOIN LATERAL TABLE(udf_parse_json(T4.arr[2], 'sub_name', 'sub_json')) AS T5(arr) ON TRUE
        ;

    解析的结果如下:

    +I[user_id_261, sub_1_venn_6, sub_2_sub_venn_6, sub_3_venn_6, sub_4_venn_6, sub_5_venn_6]
    +I[user_id_262, sub_1_venn_3, sub_2_sub_venn_3, sub_3_venn_3, sub_4_venn_3, sub_5_venn_3]
    +I[user_id_263, sub_1_venn_7, sub_2_sub_venn_7, sub_3_venn_7, sub_4_venn_7, sub_5_venn_7]
    +I[user_id_264, sub_1_venn_5, sub_2_sub_venn_5, sub_3_venn_5, sub_4_venn_5, sub_5_venn_5]
    +I[user_id_265, sub_1_venn_8, sub_2_sub_venn_8, sub_3_venn_8, sub_4_venn_8, sub_5_venn_8]
    +I[user_id_266, sub_1_venn_0, sub_2_sub_venn_0, sub_3_venn_0, sub_4_venn_0, sub_5_venn_0]

    执行没有任何问题

    完整代码参见: https://github.com/springMoon/sqlSubmit ,ParseJson.java 和 parse_complex_json.sql

    注:突然好久没写博客,水一篇凑个数

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

     

  • 相关阅读:
    iOS开发数据库篇—FMDB简单介绍
    iOS开发数据库篇—SQLite常用的函数
    IOS开发数据库篇—SQLite模糊查询
    iOS开发数据库篇—SQLite的应用
    iOS开发数据库篇—SQL代码应用示例
    iOS开发数据库篇—SQL
    iOS开发数据库篇—SQLite简单介绍
    江苏建工信息化继续规划
    武汉分公司年终检查
    分公司资金查询
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/14824389.html
Copyright © 2011-2022 走看看