zoukankan      html  css  js  c++  java
  • Flink udf的小问题:无参数的udf函数会被优化成常量表达式

    这两天有一个使用flink sql的实时流项目,需要在每个结果中标记当前时间。因为flink没有获取当前时间的函数,于是我自己定义了一个udf:

    import org.apache.flink.table.functions.ScalarFunction;
    
    public class CurrentUnixTimeFunction extends ScalarFunction {
        public Long eval() {
            return System.currentTimeMillis() / 1000;
        }
    }

    然后注册成currentUnixTime()进行使用:

    tableEnv.registerFunction("currentUnixTime", new CurrentUnixTimeFunction());

    本来以为就是这么简单的事情,结果看实时流数据的时候发现,这个时间一直都是任务启动时间,有点摸不着头脑。于是开启了研究,以下为内容转自:https://www.cnblogs.com/Springmoon-venn/p/12954824.html

    在使用 Flink 1.10 的 SQL 的时候,遇到个小问题: 一个返回当前时间的函数返回的结果是启动的时间,并且保持不变。

    比如下面这个UDF,获取当前时间的 时分秒(HH:mm:ss 格式)

     1 import org.apache.flink.api.common.typeinfo.TypeInformation;
     2 import org.apache.flink.api.common.typeinfo.Types;
     3 import org.apache.flink.table.functions.ScalarFunction;
     4 
     5 import java.text.SimpleDateFormat;
     6 
     7 /**
     8  * Sysdate 返回当前时间的 HH:mm:ss 格式的字符串
     9  */
    10 public class Systime extends ScalarFunction {
    11 
    12     private static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
    13 
    14     public Systime() {
    15     }
    16 
    17     public String eval() throws Exception {
    18         // 当前时间的毫秒值,转为 HH:mm:ss 
    19         return sdf.format(System.currentTimeMillis());
    20     }
    21 
    22 
    23     public TypeInformation<?> getResultType(Class<?>[] signature) {
    24         return Types.STRING;
    25     }
    26 }

    最开始用的时候,直接写了这么个UDF,还自我感觉很溜。。。

    在测试的时候,也没有留意到返回的值都是相同的,在线上跑了几天才发现。。。

    如sql :

    INSERT INTO user_log_sink
    SELECT user_id, systime(), item_id
    FROM user_log a;

    返回值如下:

    全是一样的

    在代码里面添加 日志信息,只有在启动的时候输出了一次日志信息(在客户端),在 taskmanager.log 里没有对应日志,说明确实没有执行

    对比其他的UDF 除了没有参数,其他好像没有什么不一样,都是继承的 ScalarFunction 、也都在客户端注册。。

    在其他UDF 中也添加日志信息,测试发现:     

    都是在注册的时候,调用了对应的构造方法,但是无参的 UDF,在任务初始化阶段,还调用了一次,而需要参数的UDF 并没有;数据进入的时候,无参的UDF 并没有调用 eval 方法,有参的UDF 就正常调用了。

    直接debug 无参的UDF,从日志中发现除了在注册的时候 执行了 构造方法,还调用了有一次  eval 方法,所以在 eval 中添加断点:

     于是就找到这个类:

     1 package org.apache.flink.table.planner.codegen
     2 /**
     3   * Evaluates constant expressions with code generator.   计算常量表达式
     4   *
     5   * @param allowChangeNullability If the reduced expr's nullability can be changed, e.g. a null
     6   *                               literal is definitely nullable and the other literals are
     7   *                               not null.
     8   */
     9 class ExpressionReducer(
    10     config: TableConfig,
    11     allowChangeNullability: Boolean = false)
    12   extends RexExecutor

    从注释就可以看出,Flink 在计算常量表达式,而我的 systime() 就被认为是常量表达式了,在客户端执行一次,得到结果,之后的函数,直接使用对应常量,而不再进函数计算返回了。

     随后的获取到的SQL输出结果:

     看到这里,问题就很清楚了,一些常量的表达式,Flink 在客户端初始化的时候,直接执行一次,缓存了结果,之后就直接返回这个结果,而不是去执行表达式

     解决也很简单,直接给 UDF 添加一个参数(注:必须是SQL的字段,如果是常量也会被Flink 优化)

    public String eval(String input) throws Exception {
            // 当前时间的毫秒值,转为 HH:mm:ss
            return sdf.format(System.currentTimeMillis());
        }

    使用的SQL:

    INSERT INTO user_log_sink
    SELECT user_id, systime(), systime_param(user_id)
    FROM user_log a;

    systime_param 并不会再初始化的时候执行

    返回结果如下:

     搞定

  • 相关阅读:
    POJ 2528 Mayor's posters 【区间离散化+线段树区间更新&&查询变形】
    HDU 5687 Problem C 【字典树删除】
    HDU 1298 T9【字典树增加||查询】
    AIM Tech Round 5 (rated, Div. 1 + Div. 2) C. Rectangles 【矩阵交集】
    基本数据结构:栈-队列-双向队列
    Elasticsearch 环境准备
    django项目部署
    linux基础
    购物车
    Python内置函数
  • 原文地址:https://www.cnblogs.com/fnlingnzb-learner/p/13560644.html
Copyright © 2011-2022 走看看