zoukankan      html  css  js  c++  java
  • Esper学习之七:EPL语法(三)

    1.Aggregation

    和SQL一样,EPL也有Aggregation,即聚合函数。语法如下:

    [plain] view plaincopy
     
    1. aggregate_function([all|distinct] expression)  

    aggregate_function就是聚合函数的名字,比如avg,sum等。expression通常是事件流的某个属性,也可以是不同事件流的多个属性,或者是属性和常量、函数之间的运算。举例如下。

    [plain] view plaincopy
     
    1. // 查询最新5秒的Apple的平均价格  
    2. select avg(price) as aPrice from Apple.win:time(5 sec)  
    3.   
    4. // 查询最新10个Apple的价格总和的两倍  
    5. select sum(price*2) as sPrice from Apple.win:length(10)  
    6.   
    7. // 查询最新10个Apple的价格,并用函数计算后再算平均值  
    8. select avg(Compute.getResult(price)) from Apple.win:length(10)  

    函数只能是静态方法,普通方法不可用。即使是事件流里包含的静态方法,也必须用“类名.方法名”的方式进行引用。

    可以使用distinct关键字对expression加以约束,表示去掉expression产生的重复的值。默认情况下为all关键字,即所有的expression值都参与聚合运算。例如:

    [plain] view plaincopy
     
    1. // 查询最新5秒的Apple的平均价格  
    2. select avg(distinct price) as aPrice from Apple.win:time(5 sec)  
    3.   
    4. // 假如:5秒内进入了三个Apple事件,price分别为2,1,2。则针对该EPL的平均值为(2+1)/2=1.5。因为有distinct的修饰,所以第二个2不参与运算,事件总数即为2,而不是3。  

    以上就是聚合函数的使用方法,除此之外需要注意一下几点

    1.聚合函数能用于Select和Having,但是不能用于Where

    2.sum,avg,media,stddev,avedev只能计算数值,至于media,stddev和avedev代表什么意思,请自行百度。

    3.Esper会忽略expression为null不让参与聚合运算,但是count函数除外,即使是null也认为是一个事件。如果事件流集合中没有包含任何事件,或者包含的事件中用于聚合计算的expression都是null(比如收集5秒内进入的事件即为一个事件流集合),则所有聚合函数都返回null。

    2.Group by

    Group by通常配合聚合函数使用。语法和SQL基本一样,产生的效果就是以某一个或者多个字段进行分组,然后使聚合函数作用于不同组的数据。简单语法如下:

    [plain] view plaincopy
     
    1. group by aggregate_free_expression [, aggregate_free_expression] [, ...]  

    使用Group by要注意一下几点:

    1.Group by后面的内容不能包含聚合函数

    2.Group by后面的内容不能是之前select子句中聚合函数修饰的属性名

    3.通常情况要保证分组数量有限制,以防止内存溢出。但是如果分组分了很多,就需要使用@Hint加以控制。

    2.1.Group by基本用法

    针对上面的第三点,后面再说,先举几个例子说明下简单用法:

    [plain] view plaincopy
     
    1. // 根据color和size来对10个Apple事件进行分组计算平均price  
    2. select avg(price) as aPrice, color, size from Apple.win:length_batch(10) group by color,size  

    该句子遵从SQL的标准,如果某个事件的color和size和之前进入的事件的一样,则归为一组,否则新建一组,并计算平均price

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件进行分组计算平均price和color  
    2. select avg(price) as aPrice, color, size from Apple.win:length_batch(10) group by size  

    可以发现,group by的对象只有size,而select中color不聚合,则生成的结果时,聚合函数会根据相同的size分组进行平均price的计算,但是color不是分组条件,所以color有多少个就有多少组,即使存在一样的color也不会影响分组数量(实际上就是不分组),但一定记住,聚合函数还是会根据分组条件计算其修饰的属性。

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件进行分组计算平均price和color<pre name="code" class="plain">select avg(price) as aPrice, color from Apple.win:length_batch(10) group by size</pre>  

    这一次select子句中没有包含分组的字段size,但是效果和上一个句子一样。Esper仍然会根据相同的size进行分组计算平均price,只不过计算结果中只有平均price和color,并且有十排结果。

    [plain] view plaincopy
     
    1. // 根据size乘color来对10个Apple事件进行分组计算平均price<pre name="code" class="plain">select avg(price) as aPrice, size*color from Apple.win:length_batch(10) group by size*color</pre>  

    group by的对象只是一个值,以相同的值进行分组,所以上面和和普通的属性字段一样,计算一个值进行分组。如果group by后面的表达式值为null,则所有为null的事件都被分为一组进行计算。但是如果使用了count函数,则表达式为null的事件不会被计算在内。

    2.2.@Hint

    @Hint是Esper中注解的其中一个,如果不了解注解,可以先看看Esper学习之五:EPL语法(一)的第7节再继续阅读@Hint的内容。之前对@Hint一笔带过,那是因为它是专用于Group by的。我们平时使用Group by的时候,会遇到分组数量太多的情况。比如以时间单位进行分组,那么内存使用一定是一个大问题。因此@Hint为其设计了两个属性,用于限制Group by的生存时间,使虚拟机能及时回收内存。这两个属性分别为reclaim_group_aged和reclaim_group_freq

    reclaim_group_aged

    该属性后面跟着的是正整数,以秒为单位,表示在n秒内,若分组的数据没有进行更新,则分组数据被Esper回收。例如:

    [plain] view plaincopy
     
    1. // 根据color对10秒内进入的Apple事件进行分组计算平均price,并且对5秒内没有数据更新的分组进行回收  
    2. @Hint('reclaim_group_aged=5')select avg(price) as aPrice, color from Apple.win:time(10 sec) group by color //括号内可以使单引号也可以是双引号  

    reclaim_group_freq

    该属性后面跟着的是正整数,以秒为单位,表示每n秒清理一次分组,可清理的分组是reclaim_group_aged决定的,也就是说要使用该参数,就要配合reclaim_group_aged一起使用。可能不是很好理解,先看看例子:

    [plain] view plaincopy
     
    1. // 根据color对10秒内进入的Apple事件进行分组计算平均price。对8秒内没有数据更新的分组进行回收,每2秒回收一次  
    2. @Hint('reclaim_group_aged=8,reclaim_group_freq=2')select avg(price) as aPrice, color from Apple.win:time(10 sec) group by color  

            如果不使用reclaim_group_freq属性,则默认值和reclaim_group_aged的值一样,对上面来说就是回收的条件为8秒内没有数据更新,且每8秒回收一次。这样的话有可能出现这么一种情况,上一个8秒的某个分组在下一个8秒还没到达时就已经持续8秒没有数据更新了(这句话会不会有点绕?),但是必须等到回收的时间点到达时才能回收这个分组。在分组产生很快的情况下,这样的回收不及时很可能会造成内存溢出。reclaim_group_freq正是为这种情况做准备,回收的频率高一些,在一定程度上能提高内存的使用率。

            上面这两个属性的值除了可以使用正整数之外,也可以使用预先定义的变量或者常量

    3.Having

    Having的用法和SQL一样,后面跟的是对聚合函数的计算结果进行过滤。Where子句不能包含聚合函数,所以就由Having来完成。示例如下:

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件进行分组计算平均price和color,并且排除平均price大于5的分组<pre name="code" class="plain"><pre name="code" class="plain">select avg(price) as aPrice, color from Apple.win:length_batch(10) group by size having avg(price) > 5</pre></pre>  

    通常Having配合Group by使用,如果没有使用Group by,那么就只有一组。例如:

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件计算平均price和color,如果平均price大于5,则数据被排除掉<pre name="code" class="plain"><pre name="code" class="plain">select avg(price) as aPrice, color from Apple.win:length_batch(10) having avg(price) > 5</pre></pre>  

    Having后面可以跟多个判断式子,并且用and,or或者not进行连接。例如:

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件计算平均price和color,如果平均price大于5并且平均size小于3,则数据被排除掉<pre name="code" class="plain"><pre name="code" class="plain">select avg(price) as aPrice, color from Apple.win:length_batch(10) having avg(price) > 5 and avg(size) < 3</pre></pre>  

    4.Output

    4.1.基本语法

    Output是EPL中非常有用的东西,用来控制Esper对事件流计算结果的输出时间和形式,可以以固定频率,也可以是某个时间点输出。简单语法如下:

    [plain] view plaincopy
     
    1. output [after suppression_def]  
    2. [[all | first | last | snapshot] every time_period | output_rate events]  

    after suppression_def是可选参数,表示先满足一定的条件再输出。

    all | first | last | snapshot表明输出结果的形式,默认值为all。

    every output_rate表示输出频率,即每达到规定的频率就进行输出。time_period表示时间频率,相关语法在Esper学习之五:EPL语法(一)的第2节有说到。output_rate events表示事件数量。

    举例说明如下:

    [plain] view plaincopy
     
    1. // 30分钟内,每进入一个OrderEvent,统计一次sum price,并且每60秒输出一次统计结果。  
    2. select sum(price) from OrderEvent.win:time(30 min) output snapshot every 60 seconds  

    4.2.after

    之前在讲解Context的时候,有简单说到过after。关于Context,可参看Esper学习之四:Context。after在output里的使用也很简单,语法如下:

    [plain] view plaincopy
     
    1. output after time_period | number events [...]  

    time_period表示时间段,number events表示事件数量。表示从EPL可用开始,经过一段时间或者接收到一定数量的事件再进行输出。例如:

    [plain] view plaincopy
     
    1. // 统计20个Apple事件的sum price,并且在有5个Apple事件进入后才开始输出统计结果  
    2. select sum(price) from Apple.win:length(20) output after 5 events  

    上面这个句子从第一个进入的事件进行统计,直到进入了5个事件以后才输出统计结果,之后每进入一个事件输出一次(这是win:length的特性)。但是要注意的是,after之后的时间长度和事件数量会影响之后的时间或者事件数量。什么意思?看个完整例子:

    [java] view plaincopy
     
    1. /** 
    2.  *  
    3.  * @author luonanqin 
    4.  * 
    5.  */  
    6. class Banana  
    7. {  
    8.     private int id;  
    9.     private int price;  
    10.   
    11.     public int getId()  
    12.     {  
    13.         return id;  
    14.     }  
    15.   
    16.     public void setId(int id)  
    17.     {  
    18.         this.id = id;  
    19.     }  
    20.   
    21.     public int getPrice()  
    22.     {  
    23.         return price;  
    24.     }  
    25.   
    26.     public void setPrice(int price)  
    27.     {  
    28.         this.price = price;  
    29.     }  
    30.   
    31.     public String toString()  
    32.     {  
    33.         return "id: " + id + ", price: " + price;  
    34.     }  
    35. }  
    36.   
    37. class OutputAfterListener implements UpdateListener  
    38. {  
    39.     public void update(EventBean[] newEvents, EventBean[] oldEvents)  
    40.     {  
    41.         if (newEvents != null)  
    42.         {  
    43.             int price = (Integer) newEvents[0].get("sPrice");  
    44.             System.out.println("Banana's sum price is " + price);  
    45.         }  
    46.     }  
    47. }  
    48.   
    49. public class OutputAfterTest  
    50. {  
    51.     public static void main(String[] args) throws InterruptedException  
    52.     {  
    53.         EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();  
    54.   
    55.         EPAdministrator admin = epService.getEPAdministrator();  
    56.   
    57.         String banana = Banana.class.getName();  
    58.         // 统计最新3个Banana事件的sum price,并且从EPL可用起,等待第一个事件进入后,以每两个事件进入的频率输出统计结果  
    59.         String epl = "select sum(price) as sPrice from " + banana + ".win:length(3) output after 1 events snapshot every 2 events";  
    60.   
    61.         EPStatement state = admin.createEPL(epl);  
    62.         state.addListener(new OutputAfterListener());  
    63.   
    64.         EPRuntime runtime = epService.getEPRuntime();  
    65.   
    66.         Banana b1 = new Banana();  
    67.         b1.setId(1);  
    68.         b1.setPrice(6);  
    69.         System.out.println("Send Banana Event: " + b1);  
    70.         runtime.sendEvent(b1);  
    71.   
    72.         Banana b2 = new Banana();  
    73.         b2.setId(2);  
    74.         b2.setPrice(3);  
    75.         System.out.println("Send Banana Event: " + b2);  
    76.         runtime.sendEvent(b2);  
    77.   
    78.         Banana b3 = new Banana();  
    79.         b3.setId(3);  
    80.         b3.setPrice(1);  
    81.         System.out.println("Send Banana Event: " + b3);  
    82.         runtime.sendEvent(b3);  
    83.   
    84.         Banana b4 = new Banana();  
    85.         b4.setId(4);  
    86.         b4.setPrice(2);  
    87.         System.out.println("Send Banana Event: " + b4);  
    88.         runtime.sendEvent(b4);  
    89.   
    90.         Banana b5 = new Banana();  
    91.         b5.setId(5);  
    92.         b5.setPrice(4);  
    93.         System.out.println("Send Banana Event: " + b5);  
    94.         runtime.sendEvent(b5);  
    95.     }  
    96. }  

    执行结果:

    [plain] view plaincopy
     
    1. Send Banana Event: id: 1, price: 6  
    2. Send Banana Event: id: 2, price: 3  
    3. Send Banana Event: id: 3, price: 1  
    4. Banana's sum price is 10  
    5. Send Banana Event: id: 4, price: 2  
    6. Send Banana Event: id: 5, price: 4  
    7. Banana's sum price is 7  

    由此可见,after之后的every子句要等到after后面的表达式满足后才生效。所以第一个事件进入后,every 2 events生效,即等待两个事件进入后才输出结果。对于时间也是要等到after的子句满足后才开始计时。例如:

    [plain] view plaincopy
     
    1. // 从EPL可用开始计时,经过1分钟后,每5秒输出一次当前100秒内的所有Banana的avg price(即:第一次输出在65秒时)  
    2. select avg(price) from Banana.win:time(100 sec) after 1 min snapshot every 5 sec  

    4.3.first,last,all,snapshot

    每当达到输出时间点时,可以用这四个参数来控制输出内容。下面分别介绍并举例。

    first

    表示每一批可输出的内容中的第一个事件计算结果。比如:

    [plain] view plaincopy
     
    1. select * from Fruit output first every 2 events  

    上面的句子表示每进入两个Fruit事件,输出这两个事件的第一个。
    last

    和first类似,表示每一批可输出的内容中的最后一个事件计算结果。比如:

    [plain] view plaincopy
     
    1. select * from Fruit output last every 2 events  

    上面的句子表示每进入两个Fruit事件,输出这两个事件的第二个,也就是最后一个。

    snapshot

    表示输出EPL所保持的所有事件计算结果,通常用来查看view或者window中现存的事件计算结果。比如:

    [plain] view plaincopy
     
    1. select * from Fruit.win:time(5 sec) output snapshot every 2 events  

    上面的句子表示每进入两个事件输出5 sec内的所有事件,且不会讲这些事件从5 sec范围内移除

    all

    也是默认值。和snapshot类似,也是输出所有的事件,但是不同的是,snapshot相当于对计算结果拍了一张照片,把结果复制出来并输出,而all是把计算结果直接输出,不会复制。比如:

    [plain] view plaincopy
     
    1. select * from Fruit.win:time(5 sec) output all every 2 events  

    上面的句子表示每进入两个事件输出5 sec内包含的所有事件,输出的事件不再保留于5 sec范围内。

    4.4.Crontab Output

    output的另一个语法可以建立定时输出,关键字是at。语法如下:

    [plain] view plaincopy
     
    1. output [after suppression_def]  
    2. [[all | first | last | snapshot] at  
    3. (minutes, hours, days of month, months, days of week [, seconds])]  

    minutes, hours, days of month, months, days of week [, seconds]这些都是时间单位,语法后面再细说。举个简单的例子:

    [plain] view plaincopy
     
    1. // 在8点到17点这段时间内,每15分钟输出一次  
    2. select * from Fruit output at (*/15,8:17,*,*,*)  



    4.5.when

    Output还可以使用when来实现达到某个固定条件再输出的效果,一般通过变量,用户自定义的 函数以及output内置的属性来实现。基本语法如下:

    [plain] view plaincopy
     
    1. output [after suppression_def]  
    2. [[all | first | last | snapshot] when trigger_expression  
    3. [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]]  

    trigger_expression返回true或者false,表示输出或者不输出

    then set variable_name=assign_expression表示是当trigger_expression被触发时,可对变量重新赋值。完整例子如下:

    [java] view plaincopy
     
    1. /** 
    2.  *  
    3.  * @author luonanqin 
    4.  * 
    5.  */  
    6. class Pink  
    7. {  
    8.     private int id;  
    9.     private int price;  
    10.   
    11.     public int getId()  
    12.     {  
    13.         return id;  
    14.     }  
    15.   
    16.     public void setId(int id)  
    17.     {  
    18.         this.id = id;  
    19.     }  
    20.   
    21.     public int getPrice()  
    22.     {  
    23.         return price;  
    24.     }  
    25.   
    26.     public void setPrice(int price)  
    27.     {  
    28.         this.price = price;  
    29.     }  
    30.   
    31.     public String toString()  
    32.     {  
    33.         return "id: " + id + ", price: " + price;  
    34.     }  
    35. }  
    36.   
    37. class OutputWhenListener implements UpdateListener  
    38. {  
    39.     public void update(EventBean[] newEvents, EventBean[] oldEvents)  
    40.     {  
    41.         if (newEvents != null)  
    42.         {  
    43.             for (int i = 0; i < newEvents.length; i++)  
    44.             {  
    45.                 Pink pink = (Pink) newEvents[i].getUnderlying();  
    46.                 System.out.println("Output Pink: " + pink);  
    47.             }  
    48.         }  
    49.     }  
    50. }  
    51.   
    52. public class OutputWhenTest  
    53. {  
    54.     public static void main(String[] args) throws InterruptedException  
    55.     {  
    56.         EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();  
    57.   
    58.         EPAdministrator admin = epService.getEPAdministrator();  
    59.         ConfigurationOperations config = admin.getConfiguration();  
    60.         config.addVariable("exceed", boolean.class, false);  
    61.   
    62.         String pink = Pink.class.getName();  
    63.         // 当exceed为true时,输出所有进入EPL的事件,然后设置exceed为false  
    64.         String epl = "select * from " + pink + " output when exceed then set exceed=false";  
    65.   
    66.         EPStatement state = admin.createEPL(epl);  
    67.         state.addListener(new OutputWhenListener());  
    68.   
    69.         EPRuntime runtime = epService.getEPRuntime();  
    70.   
    71.         Random r = new Random(47);  
    72.         for (int i = 1; i <= 10; i++)  
    73.         {  
    74.             int price = r.nextInt(10);  
    75.             Pink p = new Pink();  
    76.             p.setId(i);  
    77.             p.setPrice(price);  
    78.             System.out.println("Send Pink Event: " + p);  
    79.             runtime.sendEvent(p);  
    80.             // 当price大于5时,exceed变量为true  
    81.             if (price > 5)  
    82.             {  
    83.                 runtime.setVariableValue("exceed", true);  
    84.                 // 因为主线程和输出线程不是同一个,所以这里休息1秒保证输出线程将事件全部输出,方便演示。  
    85.                 Thread.sleep(1000);  
    86.             }  
    87.         }  
    88.     }  
    89. }  

    执行结果:

    [plain] view plaincopy
     
    1. Send Pink Event: id: 1, price: 8  
    2. Output Pink: id: 1, price: 8  
    3. Send Pink Event: id: 2, price: 5  
    4. Send Pink Event: id: 3, price: 3  
    5. Send Pink Event: id: 4, price: 1  
    6. Send Pink Event: id: 5, price: 1  
    7. Send Pink Event: id: 6, price: 9  
    8. Output Pink: id: 2, price: 5  
    9. Output Pink: id: 3, price: 3  
    10. Output Pink: id: 4, price: 1  
    11. Output Pink: id: 5, price: 1  
    12. Output Pink: id: 6, price: 9  
    13. Send Pink Event: id: 7, price: 8  
    14. Output Pink: id: 7, price: 8  
    15. Send Pink Event: id: 8, price: 0  
    16. Send Pink Event: id: 9, price: 2  
    17. Send Pink Event: id: 10, price: 7  
    18. Output Pink: id: 8, price: 0  
    19. Output Pink: id: 9, price: 2  
    20. Output Pink: id: 10, price: 7  

            从结果可以看出来。当price大于5的时候,设置exceed变量为true,即可输出之前进入的所有事件,then set子句将exceed设置为false,等待下一次exceed=true时触发输出。由于输出线程是单独的线程,所以如果不sleep,结果可能会和这个不同。

    对于when关键字,Esper提供了一些内置的属性帮助我们实现更复杂的输出约束。如图所示:

    以上5个属性我就不多做解释了,使用方式是作为trigger_expression跟在when关键字的后面。例如:

    [plain] view plaincopy
     
    1. // 进入的Apple事件总数达到5个时才输出,且不清零count_insert_total属性,继续累加事件总数  
    2. select * from Apple output when count_insert_total=5  
    3.   
    4. // 移除的Apple事件总数达到4个时才输出,并清零count_remove属性  
    5. select * from Apple output when count_remove=4  

    另外,在使用when的时候,有两点需要注意:

    1.当trigger_expression返回true时,Esper会输出从上一次输出之后到这次输出之间所有的insert stream和remove stream。

    2.若trigger_expression不断被触发并返回true时,则Esper最短的输出间隔为100毫秒。

    3.expression不能包含事件流的属性,聚合函数以及prev函数和prior函数

    4.6.Context Terminated

    Output还针对Context专门设计了一个输出条件,即在Context终止时输出Context中的内容。关于Context,可以看看Esper学习之四:Context。具体语法如下:

    [plain] view plaincopy
     
    1. output when terminated [and termination_expression]  
    2. [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]]]  

    when terminated是关键字,之前可以通过and连接其他的式子一起使用。termination_expression是一个返回true或者false的表达式,同trigger_expression一样。举例如下:

    [plain] view plaincopy
     
    1. // 在MyContext下,查询context的id并计算Apple的sum price,当Context结束且输入的事件总数大于10时,输出。然后设置FinishCompute变量为true  
    2. context MyContext select context.id, sum(price) from Apple output when terminated and count_insert_total > 10 then set FinishCompute = true  
    3.   
    4. // 在MyContext下,计算Apple的avg size,并每1分钟输出第一个进入的事件计算结果,当context结束时也输出一次计算结果  
    5. context MyContext select avg(size) from Apple output first every 1 min and when terminated  

            Output和Aggregation,Group by一起使用时,first,last,all,snapshot四个关键字产生的效果会比较特别。建议各位自己看看Esper的官方文档的Appendix A,有相当完整的例子做说明,因为篇幅较长,所以我没有放在文章里进行讲解。另外针对first,last,all,snapshot四个关键字,只有使用snapshot是不会缓存计算结果。其他的关键字会缓存事件直到触发了输出条件才会释放,所以如果输入的数据量比较大,就要注意输出条件被触发前的内存使用量。

            关于Output的内容比较多,使用起来也比较灵活。各位在使用的时候,也许会发现自己写的达不到预期的效果,本人在使用的时候也遇到过,所以还请各位耐心地多试几次。Group by和Aggregation和SQL的类似,所以使用起来很容易。

  • 相关阅读:
    hdu 4614 线段树 二分
    cf 1066d 思维 二分
    lca 最大生成树 逆向思维 2018 徐州赛区网络预赛j
    rmq学习
    hdu 5692 dfs序 线段树
    dfs序介绍
    poj 3321 dfs序 树状数组 前向星
    cf 1060d 思维贪心
    【PAT甲级】1126 Eulerian Path (25分)
    【PAT甲级】1125 Chain the Ropes (25分)
  • 原文地址:https://www.cnblogs.com/yudar/p/4872633.html
Copyright © 2011-2022 走看看