功能
控制Esper事件流计算结果的输入形式、时间点及频率;
格式
1 | output [after suppression_def] |
2 | [[all | first | last | snapshot] every output_rate [seconds | events]] |
3 | [and when terminated] |
说明:
after suppression_def:前置输出条件,满足条件才能输出(该参数可省);
all | first | last | snapshot:输出形式(默认为all)
output_rate:值的意义一后面紧跟的单位有关,seconds时表示事件,events时表示事件个数;
when:后置输出条件。
after
格式:
1 | output after time_period | number events [...] |
例句:
1 | // 统计最近2分针内orderEvent事件salary属性值的总和,第一次输出在事件个数达到6个时输出(注意输出条件是after 5 events也就是必须要超过5),之后是每有一个事件进入都将输出一次; |
2 | String epsql ="select sum(salary) as result from orderEvent.win:time(2 min) output after 5 events"; |
3 |
first
功能:
输出所处理的一批事件中第一个被处理的事件。
1 | 例句: |
2 | // 该句需要注意不会等到进入5个事件后就会输出,而是每进入一批事件(5个)的第一个时就输出,sum(salary)的值随着进入的事件值不断增加 |
3 | String epsql ="select sum(salary) as result from orderEvent output first every 5 events"; |
1 | // 在输出第一次执行的事件后,需进入五个事件才能执行输出,而且sum(salary)的值为刚进入的第五个事件的值,win:length(1)起到的作用 |
2 | String epsql ="select sum(salary) as result from orderEvent.win:length(1) output first every 5 events"; |
last
功能:
输出所处理的一批事件中最后一个被处理的事件.
例句:
1 | // 该句表示每有五个事件进入后才会执行,不到五个事件不会执行,sum(salary)的值随着进入的事件值不断增加 |
2 | String epsql ="select sum(salary) as result from orderEvent output last every 5 events"; |
snapshot
功能:
输出EPL保存的所有事件执行结果快照(不会将事件移除)
例句:
1 | // 每有5个事件进入执行一次快照输出 |
2 | String epsql ="select sum(salary) as result from orderEvent.win:length(1) output snapshot every 5 events"; |
at
功能:
建立定时输出。
1 | output [after suppression_def] |
2 | [[all | first | last | snapshot] at |
3 | (minutes, hours, days of month, months, days of week [, seconds])] |
4 | [and when terminated] |
例句:
1 | // 每两秒钟输出一次 |
2 | String epsql ="select sum(salary) as result from orderEvent output at (*,*,*,*,*,*/2)"; |
when
功能:
通常与变量一起使用,当变量达到某个值时控制执行输出。
格式:
1 | output [after suppression_def] |
2 | [[all | first | last | snapshot] when trigger_expression |
3 | [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]] |
4 | [and when terminated |
5 | [and termination_expression] |
6 | [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]] |
7 | ] |
说明:
trigger_expression的值为true或false,分别表示输出和不输出,当trigger_expression条件成立便触发variable_name = assign_expression给variable_name重新赋值;
例句:
1 | // 配置条件变量 |
2 | ConfigurationOperations configOper = epAdmin.getConfiguration(); |
3 | configOper.addVariable("ifbool", Boolean.class, false); |
4 | // 配置查询任务 |
5 | String epsql = "select name as result from orderEvent output when ifbool then set ifbool = false"; |
6 | |
7 | // 发送事件 |
8 | for (int i = 0; i < 20; i++) { |
9 | ...... |
10 | epRuntime.sendEvent(event); |
11 | Thread.sleep(1000); |
12 | if (i>0 && i%5 == 0){ |
13 | // 改变事件变量的值,并触发事件流的输出 |
14 | epRuntime.setVariableValue("ifbool", true); |
15 | } |
16 | } |
注意:
1、当termination_expression返回true时,Esper会输出从上一次输出之后到这次输出之间所有的insert stream和remove stream。
2、若termination_expression不断被触发并返回true时,则Esper最短的输出间隔为100毫秒。
3、termination_expression不能包含事件流的属性,聚合函数以及prev函数和prior函数。
为实现更复杂的输出,Esper还提供了如下内置属性:
例句:
1 | // 进入的事件个数达到5个就输出,且count_insert的值在事件个数达到5个时清0 |
2 | String epsql = "select name as result from orderEvent output when count_insert = 5"; |
context terminated
格式:
1 | output [after suppression_def] |
2 | [[all | first | last | snapshot] when terminated |
3 | [and termination_expression] |
4 | [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]] |
5 | ] |
例句:
1 | // 创建context |
2 | String ctsql = "create context ctEvent partition by name from orderEvent"; |
3 | // 根据事件流属性字段分组,查询事件流salary属性值,当context结束且进入的事件流格式为5的倍数时输出 |
4 | String epsql = "context ctEvent select salary as result from orderEvent output when terminated and count_insert = 5"; |