距离上一篇博客已经有非常多个月的时间了,因为工作的原因。没怎么腾出手来写博客。再加上自己已计划算法学习为第一要务。更新博客的事情临时就放缓了脚步。
所以各位童鞋大可不必等我的博客。先把文档看起来,有什么不懂的先记下来,能够私信问我也能够等文章出来后再理解一遍,这样就不耽误大家的学习了。
上一篇我们说到了named window,这在Esper中是一个核心内容,基本上真实的业务场景下都会用到这东西,所以篇幅也较长。希望大家能消化好。今天这篇主要讨论的内容不复杂,都是几个小知识点。各位在之后的应用中也会用得到。
1. Splitting and Duplicating Streams
在之前我们学过的内容中有一个insert into的语法,作用是将某个epl生成的事件流输入到另外的事件中。以方便再次对数据进行处理。
当然我也能够将事件流不进行处理就输出到下一个事件流中,使同一个事件流能够交给不同的epl进行处理。可是你会发现这样写起来比較麻烦,每个insert into是一个句子,引擎要分别对他们进行注冊,这样管理起来并不太easy。这一节的内容讲的就是一种简便的方法来达到将同一个事件流分发到不同的事件流中。且他是以一个句子就能够完毕的。语法例如以下:
[context context_name] on event_type[(filter_criteria)] [as stream_name] insert into insert_into_def select select_list [where condition] [insert into insert_into_def select select_list [where condition]] [insert into...] [output first | all]
第一行就不多说了,第二行为触发以下一系列操作的基础事件。也是基础事件流。event_type之后能够跟着过滤条件以及对事件流添加别名。第三行到第五行都是详细的insert句子。多个句子不须要用符号隔开,空格分离就好了。这里面的insert句子和之前学习的没差别,主要就是包括输入到什么事件流要写清楚,以及select的详细内容,where条件为可选。
最后一行也是可选内容。output first表示从第一个insert句子開始到最后一个,基础事件依次匹配其where条件,仅仅要满足了某一个,那么插入到相应的事件流以后,这个基础事件就不再有机会和之后的where进行匹配,即不会再进入之后的事件流中。而output all则表示基础事件和全部的insert句子的where进行匹配,能满足的就进入相应的事件流,即不会像first那样限制基础事件仅仅能被输入到某一个流中。注意:select子句和where子句不能用聚合函数(这一点我认为有点不爽,本来这个语法就是方便做事件分流的。不能用聚合函数的话就显得高不成低不就了)。
以下我给大家一个完整的样例进行说明:
package example; import com.espertech.esper.client.EPAdministrator; import com.espertech.esper.client.EPOnDemandQueryResult; import com.espertech.esper.client.EPRuntime; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EventBean; import java.io.Serializable; /** * Created by Luonanqin on 4/26/14. */ class SplitEvent implements Serializable { private int size; private int price; public SplitEvent(int price, int size) { this.price = price; this.size = size; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } public int getSize() { return size; } public void setSize(int size) { this.size = size; } public String toString() { return "SplitEvent{" + "size=" + size + ", price=" + price + '}'; } } public class SplitDuplicateTest { public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); String splitEvent = SplitEvent.class.getName(); String epl1 = "create window LargeOrder.win:keepall() (price int) "; String epl2 = "create window MediumOrder.win:keepall() (price int)"; String epl3 = "create window SmallOrder.win:keepall() (price int)"; String epl4 = "create window LargeSize.win:keepall() (size int)"; String epl5 = "create window MediumSize.win:keepall() (size int)"; String epl6 = "create window SmallSize.win:keepall() (size int)"; // output first String insert1 = "insert into LargeOrder select price where price > 8"; String insert2 = "insert into MediumOrder select price where price between 3 and 10"; String insert3 = "insert into SmallOrder select price where price <= 3"; String epl7 = "on " + splitEvent + " " + insert1 + " " + insert2 + " " + insert3; // output all String insert4 = "insert into LargeSize select size where size > 5"; String insert5 = "insert into MediumSize select size where size between 2 and 8"; String insert6 = "insert into SmallSize select size where size <= 2"; String epl8 = "on " + splitEvent + " " + insert4 + " " + insert5 + " " + insert6 + " output all"; System.out.println("Output first(default): "); System.out.println(insert1); System.out.println(insert2); System.out.println(insert3); System.out.println(); System.out.println("Output all: "); System.out.println(insert4); System.out.println(insert5); System.out.println(insert6); String selectLargeOrder = "select * from LargeOrder"; String selectMediumOrder = "select * from MediumOrder"; String selectSmallOrder = "select * from SmallOrder"; String selectLargeSize = "select * from LargeSize"; String selectMediumSize = "select * from MediumSize"; String selectSmallSize = "select * from SmallSize"; System.out.println(); admin.createEPL(epl1); admin.createEPL(epl2); admin.createEPL(epl3); admin.createEPL(epl4); admin.createEPL(epl5); admin.createEPL(epl6); admin.createEPL(epl7); admin.createEPL(epl8); SplitEvent se1 = new SplitEvent(1, 2); SplitEvent se2 = new SplitEvent(9, 4); SplitEvent se3 = new SplitEvent(3, 1); SplitEvent se4 = new SplitEvent(5, 6); SplitEvent se5 = new SplitEvent(7, 9); System.out.println(se1); runtime.sendEvent(se1); System.out.println(se2); runtime.sendEvent(se2); System.out.println(se3); runtime.sendEvent(se3); System.out.println(se4); runtime.sendEvent(se4); System.out.println(se5); runtime.sendEvent(se5); EPOnDemandQueryResult selectLOrder = runtime.executeQuery(selectLargeOrder); EPOnDemandQueryResult selectMOrder = runtime.executeQuery(selectMediumOrder); EPOnDemandQueryResult selectSOrder = runtime.executeQuery(selectSmallOrder); EPOnDemandQueryResult selectLSize = runtime.executeQuery(selectLargeSize); EPOnDemandQueryResult selectMSize = runtime.executeQuery(selectMediumSize); EPOnDemandQueryResult selectSSize = runtime.executeQuery(selectSmallSize); outputResult(selectLargeOrder, selectLOrder); outputResult(selectMediumOrder, selectMOrder); outputResult(selectSmallOrder, selectSOrder); outputResult(selectLargeSize, selectLSize); outputResult(selectMediumSize, selectMSize); outputResult(selectSmallSize, selectSSize); } private static void outputResult(String selectSentence, EPOnDemandQueryResult result) { System.out.println(" " + selectSentence); EventBean[] events = result.getArray(); for (int i = 0; i < events.length; i++) { EventBean event = events[i]; System.out.println(event.getUnderlying()); } } }运行结果:
Output first(default): insert into LargeOrder select price where price > 8 insert into MediumOrder select price where price between 3 and 10 insert into SmallOrder select price where price <= 3 Output all: insert into LargeSize select size where size > 5 insert into MediumSize select size where size between 2 and 8 insert into SmallSize select size where size <= 2 SplitEvent{size=6, price=5} SplitEvent{size=2, price=1} SplitEvent{size=4, price=9} SplitEvent{size=1, price=3} SplitEvent{size=9, price=7} select * from LargeOrder {price=9} select * from MediumOrder {price=3} {price=5} {price=7} select * from SmallOrder {price=1} select * from LargeSize {size=6} {size=9} select * from MediumSize {size=2} {size=4} {size=6} select * from SmallSize {size=2} {size=1}
从上面的样例中能够看出,在默认情况下,也就是output first的时候,price=9进入LargeOrder后就不再进入MediumOrder,即使9是大于3小于10。
2. Variables and Constants
变量和常量在各类计算机语言中我相信大家都不陌生,在Esper相同有这个定义,而且用途广泛。比方说:时间、过滤条件等等。能够用于各种计算表达式。变量和常量都能够通过三种方式建立:API、EPL、配置文件。各有各的长处,这一节仅仅讲前两种创建方式,配置文件后面会有专门的章节进行解说。
调用API创建变量和常量:
EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); ConfigurationOperations conf = admin.getConfiguration(); conf.addVariable("abc", String.class, "initVariable"); conf.addVariable("constabc", int.class.getName(), 123, true); ... equals ... Configuration conf = new Configuration(); conf.addVariable("abc", String.class, "initVariable"); conf.addVariable("constabc", int.class.getName(), 123, true); EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(conf);能够看到,调用API也有两种方式。一种是建立了EPAdministrator之后再设置变量及常量。一种是先配置好变量及常量,再生成epSerivce对象。两种各有优点。大家自己斟酌。
用EPL创建变量和常量:
create [constant] variable variable_type [[]]variable_name [ = assignment_expression ]constant为可选keyword,显式声明则表示声明的是常量,否则声明的是变量。
variable_type和variable_name分别表示变量的数据类型和变量名。变量名必须唯一。variable_type之后的[]表示这是一个数组类型的变量。之后的assignment_expression为该变量的初始值,假设不声明则表示没有初始值。
以下的列表是可声明的变量类型:
stringcharcharacteroolooleanyteshortintintegerlongdoublefloatobjectenum_classclass_nameevent_type_name
以下举几个样例来看看:
// 创建一个long类型的变量,没有初始值 create variable long var_threshold // 创建一个integer类型的变量,初始值为10 create variable integer var_output_rate = 10 // 创建一个object类型的变量,没有初始值(使用时须要转换成相应的对象类型) create variable object varobj // 创建一个PageHitEvent事件类型的变量,没有初始值(在此之前须要导入这个事件类型到引擎) create variable PageHitEvent varPageHitZero // 创建一个OrderEvent事件类型的变量。没有初始值(能够直接使用类全名作为事件类型) create variable com.abc.OrderEvent orderEvent // 创建一个string类型的常量,初始值为'GE' create constant variable string const_filter_symbol = 'GE' // 创建一个string数组类型的常量。初始值为{'GE', 'MSFT'} create constant variable string[] const_filters = {'GE', 'MSFT'} // 创建一个enumeration数组类型的常量,初始值为{Color.RED, Color.BLUE} create constant variable Color[] const_colors = {Color.RED, Color.BLUE}
常量创建后自然是不能改动的,可是变量肯定是能够改变的。语法例如以下:
on event_type[(filter_criteria)] [as stream_name] set variable_name = expression [, variable_name = expression [,...]]
简单来说就是通过接收某类事件并加上一定的过滤条件,就行将变量又一次赋值。而且可以同一时候为多个变量赋值。
举比例如以下:
on NewOutputRateEvent set var_output_rate = rate on ThresholdUpdateEvent as t set var_threshold_lower = t.lower, var_threshold_higher = t.higher
3. Contained-Event Selection
这一节解说的查询语法比較特殊,该语法主要是针对某个事件的属性仍是事件的一种查询方式。非常多人都有把它翻译成中文的习惯,比方:容器事件查询,又或者查询容器事件。啥叫容器事件?反正我是翻译不出来,并且我也建议的大家不要翻译。以免在学习的过程中对不上。仅仅要理解这个词的概念即可了。以下我们先用最简单的select-from子句进行解说。语法例如以下:
[select select_expressions from] contained_expression [@type(eventtype_name)] [as alias_name] [where filter_expression]方括号中的东西是可选的,而且大部分大家应该都非常熟悉,除了那个@type,这个我们之后再讲。
该语法的重点在于contained_expression。这个表达式须要返回事件的属性,该属性能够是还有一个事件也能够是普通数据类型,也能够是一组查询结果,比方一个数组。一个集合等等。官方文档的5.20节有一个xml的样例。以下的演示样例我将以此作为基础。而且因为在之前我没有解说xml作为事件是怎样处理的,所以特地将xml转换成了相应的POJO进行測试。方便各位理解。
基础事件定义:
public class Item { private int itemId; private int productId; private int amount; private double price; // getter and setter method... public String toString() { return "Item{" + "itemId=" + itemId + ", productId=" + productId + ", amount=" + amount + ", price=" + price + '}'; } } public class Items { private List<Item> item; // getter and setter method... } public class Review { private int reviewId; private String comment; // getter and setter method... public String toString() { return "Review{" + "reviewId=" + reviewId + ", comment='" + comment + ''' + '}'; } } public class Book { private int bookId; private String author; private Review review; // getter and setter method... public String toString() { return "Book{" + "bookId=" + bookId + ", author='" + author + ''' + ", review=" + review + '}'; } } public class Books { private List<Book> book; // getter and setter method... } public class MediaOrder { private int orderId; private Items items; private Books books; // getter and setter method... public String toString() { return "MediaOrder{" + "orderId=" + orderId + ", items=" + items + ", books=" + books + '}'; } }
主类:
class SelectContainedListener implements UpdateListener { public void update(EventBean[] newEvents, EventBean[] oldEvents) { if (newEvents != null) { for (int i = 0; i < newEvents.length; i++) { if (newEvents[i] == null) { continue; } System.out.println(newEvents[i].getUnderlying()); } } } } public class SelectContainedEventTest { public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); Review r1 = new Review(); r1.setReviewId(1); r1.setComment("r1"); Book b1 = new Book(); b1.setAuthor("b1"); b1.setBookId(1); b1.setReview(r1); Book b2 = new Book(); b2.setAuthor("b2"); b2.setBookId(2); Item i1 = new Item(); i1.setItemId(1); i1.setProductId(1); i1.setPrice(1.11); i1.setAmount(2); MediaOrder mo1 = new MediaOrder(); Books bs = new Books(); Items is = new Items(); List<Item> items = new ArrayList<Item>(); List<Book> books = new ArrayList<Book>(); items.add(i1); books.add(b1); books.add(b2); mo1.setOrderId(1); bs.setBook(books); is.setItem(items); mo1.setItems(is); mo1.setBooks(bs); String mediaOrder = MediaOrder.class.getName(); String epl = "select * from " + mediaOrder + "[books.book]"; EPStatement stat1 = admin.createEPL(epl); stat1.addListener(new SelectContainedListener()); runtime.sendEvent(mo1); } }输出结果:
Book{bookId=1, author='b1', review=Review{reviewId=1, comment='r1'}} Book{bookId=2, author='b2', review=null}
样例非常easy。就是将Books中的Book筛选出来。再进一步,假设仅仅想把Review筛选出来,句子应该这么写:
select * from MediaOrder[books.book][review] // not valid select * from MediaOrder[books.book.review]另外一种写法错误是由于books.book返回的是一个Book对象的数组,数组自然是不会包括Review属性的。
所以改成以下这样就能够了:
select * from MediaOrder[books.book[1].review] // 取Book数组中下标为1的对象的review属性
3.1 Select Clause in a Contained-Event Selection
上面的几个样例都是查出某个对象的全部属性,比方Book和Review。假设要查部分属性。则须要select子句的帮助。也就是之前说的可选部分[select select_expressions from]
还是上面的数据,我们写几个句子看看:
// 查询bookId,orderId 1)select * from MediaOrder[select bookId, orderId from books.book][review] ... equals ... 2)select * from MediaOrder[books.book][select bookId, orderId from review] // not valid 3)select * from MediaOrder[select bookId, orderId, reviewId from books.book][review] // valid 查询bookId。orderId。reviewId 4)select * from MediaOrder[books.book][select bookId, orderId, reviewId from review]大家可能会有疑问,为什么1)中orderId是来自books.book?又为什么3)的这样的写法不正确?既然orderId能够,reviewId不能够?文档中对这点没有不论什么的说明。相当于是一种固定语法了。可是细想一下,orderId和book都是MediaOrder的属性而且是同级的。所以我个人觉得(纯属猜想),从语法分析的角度考虑。同一级别的属性,不会仅仅能解析出book而不会找不到orderId。
因此3)中,因为reviewId是book的review属性的属性。所以仅仅能来自review,没法跳过review,直接从book中获得。
1)和2)中的select子句仅仅有bookId和orderId,所以1)最后的[review]实际上没有什么用。除非你写成这样:select * from MediaOrder[select bookId, orderId from books.book][select * from review]。得出的就是bookId,orderId以及Review对象的全部属性。
在使用上面的语法时,要注意子查询,聚合函数等不能用在contained-event的select子句中。
3.2 Where Clause in a Contained-Event Selection
除了能够在Contained-Event上用select子句,相同也能够用where子句。比方:
// 查询author是luonanqin的book select * from MediaOrder[books.book where author = 'luonanqin'] // 查询comment中包括good字符串的review select * from MediaOrder[books.book][review where comment like 'good']
3.3 Contained-Event Selection and Joins
Contained-Event除了一些普通的使用方法之外。还能够使用在join查询中。
包含内联、左联、右联以及全外连接。因为使用方法简单。所以我直接给了一个完整的样例。POJO类仍然利用上面的定义不变。
package example; import com.espertech.esper.client.EPAdministrator; import com.espertech.esper.client.EPRuntime; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EventBean; import com.espertech.esper.client.UpdateListener; import example.model.contained.Book; import example.model.contained.Books; import example.model.contained.Item; import example.model.contained.Items; import example.model.contained.MediaOrder; import example.model.contained.Review; import java.util.ArrayList; import java.util.List; /** * Created by Luonanqin on 7/30/14. */ class JoinContainedListener implements UpdateListener { public void update(EventBean[] newEvents, EventBean[] oldEvents) { if (newEvents != null) { for (int i = 0; i < newEvents.length; i++) { if (newEvents[i] == null) { continue; } System.out.println(newEvents[i].getUnderlying()); } } } } public class JoinContainedEventTest { public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); Review r1 = new Review(); r1.setReviewId(1); r1.setComment("r1"); Book b1 = new Book(); b1.setAuthor("b1"); b1.setBookId(1); b1.setReview(r1); Book b2 = new Book(); b2.setAuthor("b2"); b2.setBookId(2); b2.setReview(new Review()); Item i1 = new Item(); i1.setItemId(1); i1.setProductId(1); i1.setPrice(1.11); i1.setAmount(2); Item i2 = new Item(); i2.setItemId(3); i2.setProductId(3); i2.setPrice(3.11); i2.setAmount(5); MediaOrder mo1 = new MediaOrder(); Books bs = new Books(); Items is = new Items(); List<Item> items = new ArrayList<Item>(); List<Book> books = new ArrayList<Book>(); items.add(i1); items.add(i2); books.add(b1); books.add(b2); mo1.setOrderId(1); bs.setBook(books); is.setItem(items); mo1.setItems(is); mo1.setBooks(bs); String mediaOrder = MediaOrder.class.getName(); String join1 = "select book.bookId, item.itemId from " + mediaOrder + "[books.book] as book, " + mediaOrder + "[items.item] as item where productId = bookId"; EPStatement stat1 = admin.createEPL(join1); stat1.addListener(new JoinContainedListener()); System.out.println("EPL1: " + join1); runtime.sendEvent(mo1); stat1.destroy(); System.out.println(); String join2 = "select book.bookId, item.itemId from " + mediaOrder + "[books.book] as book left outer join " + mediaOrder + "[items.item] as item on productId = bookId"; EPStatement stat2 = admin.createEPL(join2); stat2.addListener(new JoinContainedListener()); System.out.println("EPL2: " + join2); runtime.sendEvent(mo1); stat2.destroy(); System.out.println(); String join3 = "select book.bookId, item.itemId from " + mediaOrder + "[books.book] as book full outer join " + mediaOrder + "[items.item] as item on productId = bookId"; EPStatement stat3 = admin.createEPL(join3); stat3.addListener(new JoinContainedListener()); System.out.println("EPL3: " + join3); runtime.sendEvent(mo1); stat3.destroy(); System.out.println(); String join4 = "select count(*) from " + mediaOrder + "[books.book] as book, " + mediaOrder + "[items.item] as item where productId = bookId"; EPStatement stat4 = admin.createEPL(join4); stat4.addListener(new JoinContainedListener()); System.out.println("EPL4: " + join4); runtime.sendEvent(mo1); runtime.sendEvent(mo1); stat4.destroy(); System.out.println(); String join5 = "select count(*) from " + mediaOrder + "[books.book] as book unidirectional, " + mediaOrder + "[items.item] as item where productId = bookId"; EPStatement stat5 = admin.createEPL(join5); stat5.addListener(new JoinContainedListener()); System.out.println("EPL5: " + join5); runtime.sendEvent(mo1); runtime.sendEvent(mo1); stat5.destroy(); System.out.println(); } }运行结果:
EPL1: select book.bookId, item.itemId from example.model.contained.MediaOrder[books.book] as book, example.model.contained.MediaOrder[items.item] as item where productId = bookId {item.itemId=1, book.bookId=1} EPL2: select book.bookId, item.itemId from example.model.contained.MediaOrder[books.book] as book left outer join example.model.contained.MediaOrder[items.item] as item on productId = bookId {item.itemId=1, book.bookId=1} {item.itemId=null, book.bookId=2} EPL3: select book.bookId, item.itemId from example.model.contained.MediaOrder[books.book] as book full outer join example.model.contained.MediaOrder[items.item] as item on productId = bookId {item.itemId=1, book.bookId=1} {item.itemId=null, book.bookId=2} {item.itemId=3, book.bookId=null} EPL4: select count(*) from example.model.contained.MediaOrder[books.book] as book, example.model.contained.MediaOrder[items.item] as item where productId = bookId {count(*)=1} {count(*)=2} EPL5: select count(*) from example.model.contained.MediaOrder[books.book] as book unidirectional, example.model.contained.MediaOrder[items.item] as item where productId = bookId {count(*)=1} {count(*)=1}前三个样例应该很好理解,和普通连接操作没太大差别。4和5的差别在于unidirectional。4中不包括这个keyword时。发送两个相同的mediaOrder对象。聚合函数count是要累加的,而5有了这个keyword后,就将每次连接操作的结果都独立起来了。
可是有一点要记住:假设Contained-Event来自于named window,那么连接操作就必须加上unidirectionalkeyword
上面的三小节基本上就是Contained-Event的所有内容,官方文档上还有别的样例能够看看。只是说实话本人事实上也没用过,可是存在即合理,所以咱们暂且先学习着。以后肯定能用到。
4. Updating an Insert Stream
在之前解说基础语法的时候。大家都接触了select、insert into、on delete、on update等操作,而且后两个是针对named window的。
那有没有单独的update语法呢?有!为什么放到这一节才讲呢?我事实上也是跟着文档来进行总结的,简直就是坑货嘛。
今天先教大家,改日有空了我再移到前面和基础语法放到一起。
多说了几句废话。。。update语法简单,意义也简单,就是在事件即将被用于计算前,改变其自身的属性值,然后再将其用于后面的计算。语法例如以下:
update istream event_type [as stream_name] set property_name = set_expression [, property_name = set_expression] [,...] [where where_expression]istream表明更新的是新输入的事件(可是没有rstream,不知道这里为啥要指明这个keyword。。)。event_type代表要更新的事件,set之后的property_name是要更新的事件属性,最后能够用where子句进行简单的过滤。
由于事件会在计算前被更新,所以在事件还未到来之前,两类句子的创建顺序无论如何都会使得update生效。可是假设有部分事件已经參与计算了,之后才创建update句子,那已经參与计算的事件就不会被更新了,之后的进入引擎的事件才会被更新。
上面说的是不同于update句子的创建顺序导致的不同结果。假设有多个update句子作用于同一类事件,那么先创建的会先更新事件。可是假设在update句首用@Priority这个注解。那么更新事件的顺序是以优先级最高的最先更新。所以我建议大家在同一时候使用多个update句子的时候,最好使用优先级,由于你非常可能搞不清楚全部update句子的创建顺序。
针对以上两点。我提供了一个简单的样例供大家參考。
package example; import com.espertech.esper.client.Configuration; import com.espertech.esper.client.EPAdministrator; import com.espertech.esper.client.EPRuntime; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EventBean; import com.espertech.esper.client.UpdateListener; import java.io.Serializable; /** * Created by Luonanqin on 7/31/14. */ class UpdateEvent implements Serializable { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String toString() { return "UpdateEvent{" + "id=" + id + ", name='" + name + ''' + '}'; } } class UpdateEventListener implements UpdateListener { public void update(EventBean[] newEvents, EventBean[] oldEvents) { if (newEvents != null) { for (int i = 0; i < newEvents.length; i++) { System.out.println("newEvent: " + newEvents[i].getUnderlying()); } } if (oldEvents != null) { for (int i = 0; i < oldEvents.length; i++) { System.out.println("oldEvent: " + oldEvents[i].getUnderlying()); } } } } public class UpdateEventTest { public static void main(String[] args) { /** * config: * * <engine-settings> * <defaults> * <execution prioritized="true"/> * </defaults> * </engine-settings> */ Configuration config = new Configuration(); config.configure("esper.examples.cfg.xml"); EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); UpdateEventListener listener = new UpdateEventListener(); UpdateEvent ue = new UpdateEvent(); ue.setId(1); ue.setName("luonanqin"); UpdateEvent ue2 = new UpdateEvent(); ue2.setId(2); ue2.setName("qinnanluo"); String updateEvent = UpdateEvent.class.getName(); String select = "select * from " + updateEvent; String update1 = "@Priority(2)update istream " + updateEvent + " set name='qnoul' where id = 1"; String update2 = "@Priority(1)update istream " + updateEvent + " set name='luonq' where id = 1"; EPStatement stat1 = admin.createEPL(select); stat1.addListener(listener); System.out.println("select EPL: " + select); runtime.sendEvent(ue); runtime.sendEvent(ue2); System.out.println(); EPStatement stat2 = admin.createEPL(update1); stat2.addListener(listener); System.out.println("update1 EPL: " + update1); runtime.sendEvent(ue); runtime.sendEvent(ue2); System.out.println(); EPStatement stat3 = admin.createEPL(update2); stat3.addListener(listener); System.out.println("update2 EPL: " + update2); runtime.sendEvent(ue); runtime.sendEvent(ue2); } }运行结果(凝视是我自己加入的。不属于运行结果):
select EPL: select * from example.UpdateEvent newEvent: UpdateEvent{id=1, name='luonanqin'} newEvent: UpdateEvent{id=2, name='qinnanluo'} update1 EPL: @Priority(2)update istream example.UpdateEvent set name='qnoul' where id = 1 newEvent: UpdateEvent{id=1, name='qnoul'} // update1 result oldEvent: UpdateEvent{id=1, name='luonanqin'} // update1 result newEvent: UpdateEvent{id=1, name='qnoul'} // select result newEvent: UpdateEvent{id=2, name='qinnanluo'} // select result update2 EPL: @Priority(1)update istream example.UpdateEvent set name='luonq' where id = 1 newEvent: UpdateEvent{id=1, name='luonq'} // update2 result oldEvent: UpdateEvent{id=1, name='luonanqin'} // update2 result newEvent: UpdateEvent{id=1, name='qnoul'} // update1 result oldEvent: UpdateEvent{id=1, name='luonq'} // update1 result newEvent: UpdateEvent{id=1, name='qnoul'} // select result newEvent: UpdateEvent{id=2, name='qinnanluo'} // select resultupdateListener方法中,newEvents表示更新后的属性值,oldEvents表示更新前的属性值。
使用update语法有几点要注意:
1). 假设事件是POJO。那么要实现java.io.Serializable接口。
由于引擎内部的update操作实际上是要先深复制原事件再更新拷贝后的事件,不会对原事件作出不论什么改动。
2). 设置属性的表达式不能用聚合函数
3). 假设事件是xml。update语法则不适用
4). update操作不可用于嵌套的事件
5. Controlling Event Delivery : The For Clause
大家应该都注意到在此之前我们的UpdateListener在达到触发条件时,都是仅仅触发一次,而且把满足这次触发的全部事件传给update方法。Esper对此提供了一个比較另类的触发方式——可设置条件对Listener进行多次触发,也就是这节所讲的For子句。各位可别把这个For子句想成for循环了,根本就是两码事。
For子句可应用在全部的Select子句之后,而且分为两类触发。一个是依据分组条件进行多次触发。即每一组触发一次。还有一个是一个事件触发一次。相应的语法例如以下:
// 分组触发 ... for grouped_delivery (group_expression [, group_expression] [,...]) // 每一个事件都触发 ... for discrete_delivery
针对上面两类触发。我提供了一个简单的演示样例。
package example; import com.espertech.esper.client.EPAdministrator; import com.espertech.esper.client.EPRuntime; import com.espertech.esper.client.EPServiceProvider; import com.espertech.esper.client.EPServiceProviderManager; import com.espertech.esper.client.EPStatement; import com.espertech.esper.client.EventBean; import com.espertech.esper.client.UpdateListener; /** * Created by Luonanqin on 7/31/14. */ class ForEvent { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String toString() { return "ForEvent{" + "name='" + name + ''' + ", age=" + age + '}'; } } class ForListener implements UpdateListener { // 用于记录调用次数 private int num = 1; public void update(EventBean[] newEvents, EventBean[] oldEvents) { System.out.println("invocation: " + num++); if (newEvents != null) { for (int i = 0; i < newEvents.length; i++) { System.out.println(newEvents[i].getUnderlying()); } } } } public class ForTest { public static void main(String[] args) { EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(); EPAdministrator admin = epService.getEPAdministrator(); EPRuntime runtime = epService.getEPRuntime(); String forEvent = ForEvent.class.getName(); String select = "select * from " + forEvent + ".win:length_batch(3)"; String for1 = "select * from " + forEvent + ".win:length_batch(3) for grouped_delivery (age)"; String for2 = "select * from " + forEvent + ".win:length_batch(3) for discrete_delivery"; ForEvent fe1 = new ForEvent(); fe1.setName("luo"); fe1.setAge(1); ForEvent fe2 = new ForEvent(); fe2.setName("nan"); fe2.setAge(2); ForEvent fe3 = new ForEvent(); fe3.setName("qin"); fe3.setAge(1); EPStatement stat1 = admin.createEPL(select); stat1.addListener(new ForListener()); System.out.println("select EPL1: " + select); System.out.println(); runtime.sendEvent(fe1); runtime.sendEvent(fe2); runtime.sendEvent(fe3); stat1.destroy(); System.out.println(); EPStatement stat2 = admin.createEPL(for1); stat2.addListener(new ForListener()); System.out.println("for EPL2: " + for1); System.out.println(); runtime.sendEvent(fe1); runtime.sendEvent(fe2); runtime.sendEvent(fe3); stat2.destroy(); System.out.println(); EPStatement stat3 = admin.createEPL(for2); stat3.addListener(new ForListener()); System.out.println("for EPL3: " + for2); System.out.println(); runtime.sendEvent(fe1); runtime.sendEvent(fe2); runtime.sendEvent(fe3); } }运行结果:
select EPL1: select * from example.ForEvent.win:length_batch(3) invocation: 1 ForEvent{name='luo', age=1} ForEvent{name='nan', age=2} ForEvent{name='qin', age=1} for EPL2: select * from example.ForEvent.win:length_batch(3) for grouped_delivery (age) invocation: 1 ForEvent{name='luo', age=1} ForEvent{name='qin', age=1} invocation: 2 ForEvent{name='nan', age=2} for EPL3: select * from example.ForEvent.win:length_batch(3) for discrete_delivery invocation: 1 ForEvent{name='luo', age=1} invocation: 2 ForEvent{name='nan', age=2} invocation: 3 ForEvent{name='qin', age=1}
能够看到第二个For子句中的分组条件是age,所以仅仅触发了两次。
既然这样,以下这个句子有问题吗:
select name from example.ForEvent.win:length_batch(3) for grouped_delivery (age)select的内容是name,分组条件是age,Esper不同意这样做。
这也是使用For子句时唯一要注意的地方,大家别忽略了。
EPL的基本的语法到这篇为止算是告一段落了,包含前面的八篇文章。都是要熟练掌握的。之后的内容是各位一直期待的Pattern篇,初步预计用3~4篇说完,太长了也怕你们看累了。^_^