zoukankan      html  css  js  c++  java
  • Esper学习之七:EPL语法(三)

    1.Aggregation

    和SQL一样,EPL也有Aggregation,即聚合函数。语法如下:

    [plain] view plaincopy
     
    1. aggregate_function([all|distinct] expression)  

    aggregate_function就是聚合函数的名字,比如avg,sum等。expression通常是事件流的某个属性,也可以是不同事件流的多个属性,或者是属性和常量、函数之间的运算。举例如下。

    [plain] view plaincopy
     
    1. // 查询最新5秒的Apple的平均价格  
    2. select avg(price) as aPrice from Apple.win:time(5 sec)  
    3.   
    4. // 查询最新10个Apple的价格总和的两倍  
    5. select sum(price*2) as sPrice from Apple.win:length(10)  
    6.   
    7. // 查询最新10个Apple的价格,并用函数计算后再算平均值  
    8. select avg(Compute.getResult(price)) from Apple.win:length(10)  

    函数只能是静态方法,普通方法不可用。即使是事件流里包含的静态方法,也必须用“类名.方法名”的方式进行引用。

    可以使用distinct关键字对expression加以约束,表示去掉expression产生的重复的值。默认情况下为all关键字,即所有的expression值都参与聚合运算。例如:

    [plain] view plaincopy
     
    1. // 查询最新5秒的Apple的平均价格  
    2. select avg(distinct price) as aPrice from Apple.win:time(5 sec)  
    3.   
    4. // 假如:5秒内进入了三个Apple事件,price分别为2,1,2。则针对该EPL的平均值为(2+1)/2=1.5。因为有distinct的修饰,所以第二个2不参与运算,事件总数即为2,而不是3。  

    以上就是聚合函数的使用方法,除此之外需要注意一下几点

    1.聚合函数能用于Select和Having,但是不能用于Where

    2.sum,avg,media,stddev,avedev只能计算数值,至于media,stddev和avedev代表什么意思,请自行百度。

    3.Esper会忽略expression为null不让参与聚合运算,但是count函数除外,即使是null也认为是一个事件。如果事件流集合中没有包含任何事件,或者包含的事件中用于聚合计算的expression都是null(比如收集5秒内进入的事件即为一个事件流集合),则所有聚合函数都返回null。

    2.Group by

    Group by通常配合聚合函数使用。语法和SQL基本一样,产生的效果就是以某一个或者多个字段进行分组,然后使聚合函数作用于不同组的数据。简单语法如下:

    [plain] view plaincopy
     
    1. group by aggregate_free_expression [, aggregate_free_expression] [, ...]  

    使用Group by要注意一下几点:

    1.Group by后面的内容不能包含聚合函数

    2.Group by后面的内容不能是之前select子句中聚合函数修饰的属性名

    3.通常情况要保证分组数量有限制,以防止内存溢出。但是如果分组分了很多,就需要使用@Hint加以控制。

    2.1.Group by基本用法

    针对上面的第三点,后面再说,先举几个例子说明下简单用法:

    [plain] view plaincopy
     
    1. // 根据color和size来对10个Apple事件进行分组计算平均price  
    2. select avg(price) as aPrice, color, size from Apple.win:length_batch(10) group by color,size  

    该句子遵从SQL的标准,如果某个事件的color和size和之前进入的事件的一样,则归为一组,否则新建一组,并计算平均price

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件进行分组计算平均price和color  
    2. select avg(price) as aPrice, color, size from Apple.win:length_batch(10) group by size  

    可以发现,group by的对象只有size,而select中color不聚合,则生成的结果时,聚合函数会根据相同的size分组进行平均price的计算,但是color不是分组条件,所以color有多少个就有多少组,即使存在一样的color也不会影响分组数量(实际上就是不分组),但一定记住,聚合函数还是会根据分组条件计算其修饰的属性。

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件进行分组计算平均price和color<pre name="code" class="plain">select avg(price) as aPrice, color from Apple.win:length_batch(10) group by size</pre>  

    这一次select子句中没有包含分组的字段size,但是效果和上一个句子一样。Esper仍然会根据相同的size进行分组计算平均price,只不过计算结果中只有平均price和color,并且有十排结果。

    [plain] view plaincopy
     
    1. // 根据size乘color来对10个Apple事件进行分组计算平均price<pre name="code" class="plain">select avg(price) as aPrice, size*color from Apple.win:length_batch(10) group by size*color</pre>  

    group by的对象只是一个值,以相同的值进行分组,所以上面和和普通的属性字段一样,计算一个值进行分组。如果group by后面的表达式值为null,则所有为null的事件都被分为一组进行计算。但是如果使用了count函数,则表达式为null的事件不会被计算在内。

    2.2.@Hint

    @Hint是Esper中注解的其中一个,如果不了解注解,可以先看看Esper学习之五:EPL语法(一)的第7节再继续阅读@Hint的内容。之前对@Hint一笔带过,那是因为它是专用于Group by的。我们平时使用Group by的时候,会遇到分组数量太多的情况。比如以时间单位进行分组,那么内存使用一定是一个大问题。因此@Hint为其设计了两个属性,用于限制Group by的生存时间,使虚拟机能及时回收内存。这两个属性分别为reclaim_group_aged和reclaim_group_freq

    reclaim_group_aged

    该属性后面跟着的是正整数,以秒为单位,表示在n秒内,若分组的数据没有进行更新,则分组数据被Esper回收。例如:

    [plain] view plaincopy
     
    1. // 根据color对10秒内进入的Apple事件进行分组计算平均price,并且对5秒内没有数据更新的分组进行回收  
    2. @Hint('reclaim_group_aged=5')select avg(price) as aPrice, color from Apple.win:time(10 sec) group by color //括号内可以使单引号也可以是双引号  

    reclaim_group_freq

    该属性后面跟着的是正整数,以秒为单位,表示每n秒清理一次分组,可清理的分组是reclaim_group_aged决定的,也就是说要使用该参数,就要配合reclaim_group_aged一起使用。可能不是很好理解,先看看例子:

    [plain] view plaincopy
     
    1. // 根据color对10秒内进入的Apple事件进行分组计算平均price。对8秒内没有数据更新的分组进行回收,每2秒回收一次  
    2. @Hint('reclaim_group_aged=8,reclaim_group_freq=2')select avg(price) as aPrice, color from Apple.win:time(10 sec) group by color  

            如果不使用reclaim_group_freq属性,则默认值和reclaim_group_aged的值一样,对上面来说就是回收的条件为8秒内没有数据更新,且每8秒回收一次。这样的话有可能出现这么一种情况,上一个8秒的某个分组在下一个8秒还没到达时就已经持续8秒没有数据更新了(这句话会不会有点绕?),但是必须等到回收的时间点到达时才能回收这个分组。在分组产生很快的情况下,这样的回收不及时很可能会造成内存溢出。reclaim_group_freq正是为这种情况做准备,回收的频率高一些,在一定程度上能提高内存的使用率。

            上面这两个属性的值除了可以使用正整数之外,也可以使用预先定义的变量或者常量

    3.Having

    Having的用法和SQL一样,后面跟的是对聚合函数的计算结果进行过滤。Where子句不能包含聚合函数,所以就由Having来完成。示例如下:

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件进行分组计算平均price和color,并且排除平均price大于5的分组<pre name="code" class="plain"><pre name="code" class="plain">select avg(price) as aPrice, color from Apple.win:length_batch(10) group by size having avg(price) > 5</pre></pre>  

    通常Having配合Group by使用,如果没有使用Group by,那么就只有一组。例如:

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件计算平均price和color,如果平均price大于5,则数据被排除掉<pre name="code" class="plain"><pre name="code" class="plain">select avg(price) as aPrice, color from Apple.win:length_batch(10) having avg(price) > 5</pre></pre>  

    Having后面可以跟多个判断式子,并且用and,or或者not进行连接。例如:

    [plain] view plaincopy
     
    1. // 根据size来对10个Apple事件计算平均price和color,如果平均price大于5并且平均size小于3,则数据被排除掉<pre name="code" class="plain"><pre name="code" class="plain">select avg(price) as aPrice, color from Apple.win:length_batch(10) having avg(price) > 5 and avg(size) < 3</pre></pre>  

    4.Output

    4.1.基本语法

    Output是EPL中非常有用的东西,用来控制Esper对事件流计算结果的输出时间和形式,可以以固定频率,也可以是某个时间点输出。简单语法如下:

    [plain] view plaincopy
     
    1. output [after suppression_def]  
    2. [[all | first | last | snapshot] every time_period | output_rate events]  

    after suppression_def是可选参数,表示先满足一定的条件再输出。

    all | first | last | snapshot表明输出结果的形式,默认值为all。

    every output_rate表示输出频率,即每达到规定的频率就进行输出。time_period表示时间频率,相关语法在Esper学习之五:EPL语法(一)的第2节有说到。output_rate events表示事件数量。

    举例说明如下:

    [plain] view plaincopy
     
    1. // 30分钟内,每进入一个OrderEvent,统计一次sum price,并且每60秒输出一次统计结果。  
    2. select sum(price) from OrderEvent.win:time(30 min) output snapshot every 60 seconds  

    4.2.after

    之前在讲解Context的时候,有简单说到过after。关于Context,可参看Esper学习之四:Context。after在output里的使用也很简单,语法如下:

    [plain] view plaincopy
     
    1. output after time_period | number events [...]  

    time_period表示时间段,number events表示事件数量。表示从EPL可用开始,经过一段时间或者接收到一定数量的事件再进行输出。例如:

    [plain] view plaincopy
     
    1. // 统计20个Apple事件的sum price,并且在有5个Apple事件进入后才开始输出统计结果  
    2. select sum(price) from Apple.win:length(20) output after 5 events  

    上面这个句子从第一个进入的事件进行统计,直到进入了5个事件以后才输出统计结果,之后每进入一个事件输出一次(这是win:length的特性)。但是要注意的是,after之后的时间长度和事件数量会影响之后的时间或者事件数量。什么意思?看个完整例子:

    [java] view plaincopy
     
    1. /** 
    2.  *  
    3.  * @author luonanqin 
    4.  * 
    5.  */  
    6. class Banana  
    7. {  
    8.     private int id;  
    9.     private int price;  
    10.   
    11.     public int getId()  
    12.     {  
    13.         return id;  
    14.     }  
    15.   
    16.     public void setId(int id)  
    17.     {  
    18.         this.id = id;  
    19.     }  
    20.   
    21.     public int getPrice()  
    22.     {  
    23.         return price;  
    24.     }  
    25.   
    26.     public void setPrice(int price)  
    27.     {  
    28.         this.price = price;  
    29.     }  
    30.   
    31.     public String toString()  
    32.     {  
    33.         return "id: " + id + ", price: " + price;  
    34.     }  
    35. }  
    36.   
    37. class OutputAfterListener implements UpdateListener  
    38. {  
    39.     public void update(EventBean[] newEvents, EventBean[] oldEvents)  
    40.     {  
    41.         if (newEvents != null)  
    42.         {  
    43.             int price = (Integer) newEvents[0].get("sPrice");  
    44.             System.out.println("Banana's sum price is " + price);  
    45.         }  
    46.     }  
    47. }  
    48.   
    49. public class OutputAfterTest  
    50. {  
    51.     public static void main(String[] args) throws InterruptedException  
    52.     {  
    53.         EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();  
    54.   
    55.         EPAdministrator admin = epService.getEPAdministrator();  
    56.   
    57.         String banana = Banana.class.getName();  
    58.         // 统计最新3个Banana事件的sum price,并且从EPL可用起,等待第一个事件进入后,以每两个事件进入的频率输出统计结果  
    59.         String epl = "select sum(price) as sPrice from " + banana + ".win:length(3) output after 1 events snapshot every 2 events";  
    60.   
    61.         EPStatement state = admin.createEPL(epl);  
    62.         state.addListener(new OutputAfterListener());  
    63.   
    64.         EPRuntime runtime = epService.getEPRuntime();  
    65.   
    66.         Banana b1 = new Banana();  
    67.         b1.setId(1);  
    68.         b1.setPrice(6);  
    69.         System.out.println("Send Banana Event: " + b1);  
    70.         runtime.sendEvent(b1);  
    71.   
    72.         Banana b2 = new Banana();  
    73.         b2.setId(2);  
    74.         b2.setPrice(3);  
    75.         System.out.println("Send Banana Event: " + b2);  
    76.         runtime.sendEvent(b2);  
    77.   
    78.         Banana b3 = new Banana();  
    79.         b3.setId(3);  
    80.         b3.setPrice(1);  
    81.         System.out.println("Send Banana Event: " + b3);  
    82.         runtime.sendEvent(b3);  
    83.   
    84.         Banana b4 = new Banana();  
    85.         b4.setId(4);  
    86.         b4.setPrice(2);  
    87.         System.out.println("Send Banana Event: " + b4);  
    88.         runtime.sendEvent(b4);  
    89.   
    90.         Banana b5 = new Banana();  
    91.         b5.setId(5);  
    92.         b5.setPrice(4);  
    93.         System.out.println("Send Banana Event: " + b5);  
    94.         runtime.sendEvent(b5);  
    95.     }  
    96. }  

    执行结果:

    [plain] view plaincopy
     
    1. Send Banana Event: id: 1, price: 6  
    2. Send Banana Event: id: 2, price: 3  
    3. Send Banana Event: id: 3, price: 1  
    4. Banana's sum price is 10  
    5. Send Banana Event: id: 4, price: 2  
    6. Send Banana Event: id: 5, price: 4  
    7. Banana's sum price is 7  

    由此可见,after之后的every子句要等到after后面的表达式满足后才生效。所以第一个事件进入后,every 2 events生效,即等待两个事件进入后才输出结果。对于时间也是要等到after的子句满足后才开始计时。例如:

    [plain] view plaincopy
     
    1. // 从EPL可用开始计时,经过1分钟后,每5秒输出一次当前100秒内的所有Banana的avg price(即:第一次输出在65秒时)  
    2. select avg(price) from Banana.win:time(100 sec) after 1 min snapshot every 5 sec  

    4.3.first,last,all,snapshot

    每当达到输出时间点时,可以用这四个参数来控制输出内容。下面分别介绍并举例。

    first

    表示每一批可输出的内容中的第一个事件计算结果。比如:

    [plain] view plaincopy
     
    1. select * from Fruit output first every 2 events  

    上面的句子表示每进入两个Fruit事件,输出这两个事件的第一个。
    last

    和first类似,表示每一批可输出的内容中的最后一个事件计算结果。比如:

    [plain] view plaincopy
     
    1. select * from Fruit output last every 2 events  

    上面的句子表示每进入两个Fruit事件,输出这两个事件的第二个,也就是最后一个。

    snapshot

    表示输出EPL所保持的所有事件计算结果,通常用来查看view或者window中现存的事件计算结果。比如:

    [plain] view plaincopy
     
    1. select * from Fruit.win:time(5 sec) output snapshot every 2 events  

    上面的句子表示每进入两个事件输出5 sec内的所有事件,且不会讲这些事件从5 sec范围内移除

    all

    也是默认值。和snapshot类似,也是输出所有的事件,但是不同的是,snapshot相当于对计算结果拍了一张照片,把结果复制出来并输出,而all是把计算结果直接输出,不会复制。比如:

    [plain] view plaincopy
     
    1. select * from Fruit.win:time(5 sec) output all every 2 events  

    上面的句子表示每进入两个事件输出5 sec内包含的所有事件,输出的事件不再保留于5 sec范围内。

    4.4.Crontab Output

    output的另一个语法可以建立定时输出,关键字是at。语法如下:

    [plain] view plaincopy
     
    1. output [after suppression_def]  
    2. [[all | first | last | snapshot] at  
    3. (minutes, hours, days of month, months, days of week [, seconds])]  

    minutes, hours, days of month, months, days of week [, seconds]这些都是时间单位,语法后面再细说。举个简单的例子:

    [plain] view plaincopy
     
    1. // 在8点到17点这段时间内,每15分钟输出一次  
    2. select * from Fruit output at (*/15,8:17,*,*,*)  



    4.5.when

    Output还可以使用when来实现达到某个固定条件再输出的效果,一般通过变量,用户自定义的 函数以及output内置的属性来实现。基本语法如下:

    [plain] view plaincopy
     
    1. output [after suppression_def]  
    2. [[all | first | last | snapshot] when trigger_expression  
    3. [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]]  

    trigger_expression返回true或者false,表示输出或者不输出

    then set variable_name=assign_expression表示是当trigger_expression被触发时,可对变量重新赋值。完整例子如下:

    [java] view plaincopy
     
    1. /** 
    2.  *  
    3.  * @author luonanqin 
    4.  * 
    5.  */  
    6. class Pink  
    7. {  
    8.     private int id;  
    9.     private int price;  
    10.   
    11.     public int getId()  
    12.     {  
    13.         return id;  
    14.     }  
    15.   
    16.     public void setId(int id)  
    17.     {  
    18.         this.id = id;  
    19.     }  
    20.   
    21.     public int getPrice()  
    22.     {  
    23.         return price;  
    24.     }  
    25.   
    26.     public void setPrice(int price)  
    27.     {  
    28.         this.price = price;  
    29.     }  
    30.   
    31.     public String toString()  
    32.     {  
    33.         return "id: " + id + ", price: " + price;  
    34.     }  
    35. }  
    36.   
    37. class OutputWhenListener implements UpdateListener  
    38. {  
    39.     public void update(EventBean[] newEvents, EventBean[] oldEvents)  
    40.     {  
    41.         if (newEvents != null)  
    42.         {  
    43.             for (int i = 0; i < newEvents.length; i++)  
    44.             {  
    45.                 Pink pink = (Pink) newEvents[i].getUnderlying();  
    46.                 System.out.println("Output Pink: " + pink);  
    47.             }  
    48.         }  
    49.     }  
    50. }  
    51.   
    52. public class OutputWhenTest  
    53. {  
    54.     public static void main(String[] args) throws InterruptedException  
    55.     {  
    56.         EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();  
    57.   
    58.         EPAdministrator admin = epService.getEPAdministrator();  
    59.         ConfigurationOperations config = admin.getConfiguration();  
    60.         config.addVariable("exceed", boolean.class, false);  
    61.   
    62.         String pink = Pink.class.getName();  
    63.         // 当exceed为true时,输出所有进入EPL的事件,然后设置exceed为false  
    64.         String epl = "select * from " + pink + " output when exceed then set exceed=false";  
    65.   
    66.         EPStatement state = admin.createEPL(epl);  
    67.         state.addListener(new OutputWhenListener());  
    68.   
    69.         EPRuntime runtime = epService.getEPRuntime();  
    70.   
    71.         Random r = new Random(47);  
    72.         for (int i = 1; i <= 10; i++)  
    73.         {  
    74.             int price = r.nextInt(10);  
    75.             Pink p = new Pink();  
    76.             p.setId(i);  
    77.             p.setPrice(price);  
    78.             System.out.println("Send Pink Event: " + p);  
    79.             runtime.sendEvent(p);  
    80.             // 当price大于5时,exceed变量为true  
    81.             if (price > 5)  
    82.             {  
    83.                 runtime.setVariableValue("exceed", true);  
    84.                 // 因为主线程和输出线程不是同一个,所以这里休息1秒保证输出线程将事件全部输出,方便演示。  
    85.                 Thread.sleep(1000);  
    86.             }  
    87.         }  
    88.     }  
    89. }  

    执行结果:

    [plain] view plaincopy
     
    1. Send Pink Event: id: 1, price: 8  
    2. Output Pink: id: 1, price: 8  
    3. Send Pink Event: id: 2, price: 5  
    4. Send Pink Event: id: 3, price: 3  
    5. Send Pink Event: id: 4, price: 1  
    6. Send Pink Event: id: 5, price: 1  
    7. Send Pink Event: id: 6, price: 9  
    8. Output Pink: id: 2, price: 5  
    9. Output Pink: id: 3, price: 3  
    10. Output Pink: id: 4, price: 1  
    11. Output Pink: id: 5, price: 1  
    12. Output Pink: id: 6, price: 9  
    13. Send Pink Event: id: 7, price: 8  
    14. Output Pink: id: 7, price: 8  
    15. Send Pink Event: id: 8, price: 0  
    16. Send Pink Event: id: 9, price: 2  
    17. Send Pink Event: id: 10, price: 7  
    18. Output Pink: id: 8, price: 0  
    19. Output Pink: id: 9, price: 2  
    20. Output Pink: id: 10, price: 7  

            从结果可以看出来。当price大于5的时候,设置exceed变量为true,即可输出之前进入的所有事件,then set子句将exceed设置为false,等待下一次exceed=true时触发输出。由于输出线程是单独的线程,所以如果不sleep,结果可能会和这个不同。

    对于when关键字,Esper提供了一些内置的属性帮助我们实现更复杂的输出约束。如图所示:

    以上5个属性我就不多做解释了,使用方式是作为trigger_expression跟在when关键字的后面。例如:

    [plain] view plaincopy
     
    1. // 进入的Apple事件总数达到5个时才输出,且不清零count_insert_total属性,继续累加事件总数  
    2. select * from Apple output when count_insert_total=5  
    3.   
    4. // 移除的Apple事件总数达到4个时才输出,并清零count_remove属性  
    5. select * from Apple output when count_remove=4  

    另外,在使用when的时候,有两点需要注意:

    1.当trigger_expression返回true时,Esper会输出从上一次输出之后到这次输出之间所有的insert stream和remove stream。

    2.若trigger_expression不断被触发并返回true时,则Esper最短的输出间隔为100毫秒。

    3.expression不能包含事件流的属性,聚合函数以及prev函数和prior函数

    4.6.Context Terminated

    Output还针对Context专门设计了一个输出条件,即在Context终止时输出Context中的内容。关于Context,可以看看Esper学习之四:Context。具体语法如下:

    [plain] view plaincopy
     
    1. output when terminated [and termination_expression]  
    2. [then set variable_name = assign_expression [, variable_name = assign_expression [,...]]]]  

    when terminated是关键字,之前可以通过and连接其他的式子一起使用。termination_expression是一个返回true或者false的表达式,同trigger_expression一样。举例如下:

    [plain] view plaincopy
     
    1. // 在MyContext下,查询context的id并计算Apple的sum price,当Context结束且输入的事件总数大于10时,输出。然后设置FinishCompute变量为true  
    2. context MyContext select context.id, sum(price) from Apple output when terminated and count_insert_total > 10 then set FinishCompute = true  
    3.   
    4. // 在MyContext下,计算Apple的avg size,并每1分钟输出第一个进入的事件计算结果,当context结束时也输出一次计算结果  
    5. context MyContext select avg(size) from Apple output first every 1 min and when terminated  

            Output和Aggregation,Group by一起使用时,first,last,all,snapshot四个关键字产生的效果会比较特别。建议各位自己看看Esper的官方文档的Appendix A,有相当完整的例子做说明,因为篇幅较长,所以我没有放在文章里进行讲解。另外针对first,last,all,snapshot四个关键字,只有使用snapshot是不会缓存计算结果。其他的关键字会缓存事件直到触发了输出条件才会释放,所以如果输入的数据量比较大,就要注意输出条件被触发前的内存使用量。

            关于Output的内容比较多,使用起来也比较灵活。各位在使用的时候,也许会发现自己写的达不到预期的效果,本人在使用的时候也遇到过,所以还请各位耐心地多试几次。Group by和Aggregation和SQL的类似,所以使用起来很容易。

  • 相关阅读:
    docker-compose 命令详解
    Ubuntu 安装 rabbitmq
    scrapy.cmdline.execute
    queue.Queue()
    多线程通信
    多线程(thread+queue 售票)
    协程
    线程
    利用Nginx实现反向代理web服务器
    利用Nginx实现反向代理web服务器
  • 原文地址:https://www.cnblogs.com/yudar/p/4872633.html
Copyright © 2011-2022 走看看