官网示例:
-- use the existing TIMESTAMP(3) field in schema as the rowtime attribute CREATE TABLE MyTable ( ts_field TIMESTAMP(3), WATERMARK FOR ts_field AS ... ) WITH ( ... ) -- use system functions or UDFs or expressions to extract the expected TIMESTAMP(3) rowtime field CREATE TABLE MyTable ( log_ts STRING, ts_field AS TO_TIMESTAMP(log_ts), WATERMARK FOR ts_field AS ... ) WITH ( ... )
使用内置函数进行转换
TO_TIMESTAMP(log_ts) :此处的log_ts格式为:'yyyy-MM-dd HH:mm:ss' ,如果是秒级时间戳bigint格式则需要 t as TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss')) 进行转换为 TIMESTAMP(3) 类型
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/connectors/
Flink 1.10.0 SQL DDL中如何定义watermark和计算列