一个同学在群里问的这个问题,刚好好久没写过window 的sql了,玩一玩
手上没有环境,一起从简了
使用 datagen 生成数据,id 随机生成,最小值
输出直接到 console 窗口
-- flink window tvf calc pv&uv create table if not exists datagen_source ( id int ,name string ,sex string ,age int ,birthday string ,proc_time as proctime() ) with ( 'connector' = 'datagen' ,'rows-per-second' = '10000' ,'fields.id.kind' = 'random' ,'fields.id.min' = '1' ,'fields.id.max' = '2000000' ); create table if not exists print_sink( start_time string ,end_time string ,pv bigint ,uv bigint ) with ( 'connector' = 'print' ); insert into print_sink select date_format(window_start, 'HH:mm:ss') , date_format(window_end, 'HH:mm:ss') , count(id) , count(distinct id) FROM TABLE( TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '10' SECOND )) GROUP BY window_start, window_end union all select date_format(window_start, 'HH:mm:ss') , date_format(window_end, 'HH:mm:ss') , count(id) , count(distinct id) FROM TABLE( TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '20' SECOND )) GROUP BY window_start, window_end union all select date_format(window_start, 'HH:mm:ss') , date_format(window_end, 'HH:mm:ss') , count(id) , count(distinct id) FROM TABLE( TUMBLE(TABLE datagen_source, DESCRIPTOR(proc_time), INTERVAL '30' SECOND )) GROUP BY window_start, window_end ;
查看结果:
+I[10:45:00, 10:45:20, 20000, 19900] # 20 s +I[10:45:10, 10:45:20, 20000, 19913] # 10 s +I[10:45:00, 10:45:30, 120000, 116420] # 30 s +I[10:45:20, 10:45:30, 100000, 97497] +I[10:45:30, 10:45:40, 100000, 97558] +I[10:45:20, 10:45:40, 200000, 190314]
流图:
功能倒是实现了,有点麻烦的是,现在 SQL api 没有 trigger,不能中途输出计算结果,几分钟的窗口结束的时候输出数据还可以,小时、天的窗口,要窗口结束才输出一次结果,那还不如跑离线
注: Window TVF 支持 GROUPING SETS、ROLLUP、CUBE
欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文