zoukankan      html  css  js  c++  java
  • Flink 1.10 UDF 的一个小问题

    在使用 Flink 1.10 的 SQL 的时候,遇到个小问题: 一个返回当前时间的函数返回的结果是启动的时间,并且保持不变。

    比如下面这个UDF,获取当前时间的 时分秒(HH:mm:ss 格式)

    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.table.functions.ScalarFunction;
    
    import java.text.SimpleDateFormat;
    
    /**
     * Sysdate 返回当前时间的 HH:mm:ss 格式的字符串
     */
    public class Systime extends ScalarFunction {
    
        private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    
        public Systime() {
        }
    
        public String eval() throws Exception {
            // 当前时间的毫秒值,转为 HH:mm:ss 
            return sdf.format(System.currentTimeMillis());
        }
    
    
        public TypeInformation<?> getResultType(Class<?>[] signature) {
            return Types.STRING;
        }
    }

    最开始用的时候,直接写了这么个UDF,还自我感觉很溜。。。

    在测试的时候,也没有留意到返回的值都是相同的,在线上跑了几天才发现。。。

    如sql :

    INSERT INTO user_log_sink
    SELECT user_id, systime(), item_id
    FROM user_log a;

    返回值如下:

    全是一样的

    在代码里面添加 日志信息,只有在启动的时候输出了一次日志信息(在客户端),在 taskmanager.log 里没有对应日志,说明确实没有执行

    对比其他的UDF 除了没有参数,其他好像没有什么不一样,都是继承的 ScalarFunction 、也都在客户端注册。。

    在其他UDF 中也添加日志信息,测试发现:     

    都是在注册的时候,调用了对应的构造方法,但是无参的 UDF,在任务初始化阶段,还调用了一次,而需要参数的UDF 并没有;数据进入的时候,无参的UDF 并没有调用 eval 方法,有参的UDF 就正常调用了。

    直接debug 无参的UDF,从日志中发现除了在注册的时候 执行了 构造方法,还调用了有一次  eval 方法,所以在 eval 中添加断点:

     于是就找到这个类:

    package org.apache.flink.table.planner.codegen
    /**
      * Evaluates constant expressions with code generator.   计算常量表达式
      *
      * @param allowChangeNullability If the reduced expr's nullability can be changed, e.g. a null
      *                               literal is definitely nullable and the other literals are
      *                               not null.
      */
    class ExpressionReducer(
        config: TableConfig,
        allowChangeNullability: Boolean = false)
      extends RexExecutor

    从注释就可以看出,Flink 在计算常量表达式,而我的 systime() 就被认为是常量表达式了,在客户端执行一次,得到结果,之后的函数,直接使用对应常量,而不再进函数计算返回了。

     随后的获取到的SQL输出结果:

     看到这里,问题就很清楚了,一些常量的表达式,Flink 在客户端初始化的时候,直接执行一次,缓存了结果,之后就直接返回这个结果,而不是去执行表达式

     解决也很简单,直接给 UDF 添加一个参数(注:必须是SQL的字段,如果是常量也会被Flink 优化)

    public String eval(String input) throws Exception {
            // 当前时间的毫秒值,转为 HH:mm:ss
            return sdf.format(System.currentTimeMillis());
        }

    使用的SQL:

    INSERT INTO user_log_sink
    SELECT user_id, systime(), systime_param(user_id)
    FROM user_log a;

    systime_param 并不会再初始化的时候执行

    返回结果如下:

     搞定

    欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

     

  • 相关阅读:
    关于JSON可能出现的错误,待更/todo
    mongoose的安装与使用(书签记录) 2017
    HTTP的学习记录3--HTTPS和HTTP
    HTTP的学习记录(二)头部
    HTTP(一)概述
    LeetCode 455. Assign Cookies
    LeetCode 453. Minimum Moves to Equal Array Elements
    LeetCode 448. Find All Numbers Disappeared in an Array
    LeetCode 447. Number of Boomerangs
    LeetCode 416. Partition Equal Subset Sum
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/12954824.html
Copyright © 2011-2022 走看看