zoukankan      html  css  js  c++  java
  • Esper系列(五)Order by、Limit、构建事件流、Updating an Insert Stream

    Order by

    与SQL语法类似类似,默认为升序排列;
    注意:

    1. 如果order by的子句中出现了聚合函数,那么该聚合函数必须出现在select的子句中。
    2. 出现在select中的expression或者在select中定义的expression,在order by中也有效。
    3. 如果order by所在的句子没有join或者没有group by,则排序结果幂等,否则为非幂等。

     

    Limit
    格式一:

    limit row_count [offset offset_count]

    注意:row_count和offset_count既可以是常量也可以是变量.
    例句:

    // 从结果集中第三条开始输出5个事件流,offset 2表示跳过前面两个事件流,limit 5表示出入的个数
    String epsql = "select value as result from inEvent.win:length_batch(20) limit 5 offset 2";

    格式二:

    limit offset_count[, row_count]

    例句:

    String epsql = "select value as result from inEvent.win:length_batch(20) limit 2,5";

    构建事件流

    Insert(未定义的事件流)
    功能:根据已有的事件流组合生成新的事件流.

     

    格式:

    insert [istream | irstream | rstream] into event_stream_name  [ (property_name [, property_name] ) ]

    说明:

    istream表示新事件流(New Events),rstream表示旧事件流(Old Events),irstream两者都包含event_stream_name为新建事件流名称,property_name为事件流属性。

    例句:

    // 创建inEvent事件流该事件流有两个属性字段name与salary
    String insql = "insert into inEvent select name,salary from  orderEvent";
    // 创建EPL
    epAdmin.createEPL(insql);
    // 查询inEvent事件流中属性salary大于150的name属性字段
    String epsql = "select name as result from inEvent where salary > 150 

    多事件流Insert

    例句:

    // 将事件流orderEvent的name、salary属性值分别赋值插给inEvent事件流的content、price属性
    String insql1 = "insert into inEvent(content,price) select name,salary from orderEvent";
    epAdmin.createEPL(insql1);
     
    // 将事件流orderEvent中的JavaBean,bean中的属性key、value属性值分别赋值插给inEvent事件流的content、price属性
    String insql2 = "insert into inEvent (content,price) select bean.key,bean.value from orderEvent ";
    epAdmin.createEPL(insql2);
     
    // 查询数据流inEvent中content属性值
    10  String epsql = "select content as result from inEvent ";

    注意:

    Insert新创建的事件流属性字段,可由自定义静态函数返回,但一定要返回javabean,map,或者Object数组,且不能用as来为转换后的结果设置别名;

    例子:

    String insql = "insert into msgEvent select BaseUntil.getEvent() from orderEvent";
    epAdmin.createEPL(insql);
    String epsql = "select * from msgEvent";

    其中BaseUntil.getEvent()返回orderEvent类型的javaBean;

    Insert(已定义的事件流)

    上面对事件流的构建都是新生成的,即事件流没有预定义。在事件流有预定义的情况下,Insert中引用该事件流时必须带包名。
    例如:

    文件名:msgEvent.java

    // msgEvent事件流定义
    public class msgEvent {
        private int msgId;
        private String msgInfo;
        // 注意该事件流定义中没有对应属性字段的set方法,只能通过构造函数改变属性值
        public msgEvent(int msgId, String msgInfo) {
            super();
            this.msgId = msgId;
            this.msgInfo = msgInfo;
    10      }
    11   
    12      @Override
    13      public String toString() {
    14          return "msgId:"+msgId+",msgInfo:"+msgInfo;
    15      }
    16  }

    文件名:BaseUntil.java

    public class BaseUntil {
       
        public static int Add(int n){
            return n+100;
        }
       
        public static String UpdataText(String str){
            return str+",你好!";
        }
    10     
    11      public static orderEvent getEvent(){
    12          orderEvent event = new orderEvent();
    13          event.setName("张三");
    14          event.setSalary(50000);
    15          return event;
    16      }
    17  }

    文件名:orderListener

    /**
     * 用于监听某个EPL在引擎中的运行情况,事件进入并产生结果后会回调UpdateListener
     * 必须实现 UpdateListener 接口
     */
    public class orderListener implements UpdateListener {
     
        /**
         * arg0对应newEvent,arg1对应oldEvent
         */
    10      @Override
    11      public void update(EventBean[] arg0, EventBean[] arg1) {
    12          if (null != arg0) {
    13              for (int i=0;i<arg0.length;i++){
    14                  System.out.println("orderEvent Count is "+arg0.length+",EventBean is "+arg0[i].getUnderlying());
    15              }
    16          } else {
    17              System.out.println("EventBean is Null ");
    18          }
    19      }
    20  }

    文件名:orderMainTest.java

    public class orderMainTest {
     
        public static void main(String[] args) throws InterruptedException {
     
            // 添加配置(包所在路劲),方面后面的引用自动添加包名前缀
            Configuration config = new Configuration();
            config.addEventTypeAutoName("cn.chenx.esper.insert");
            //
            EPServiceProvider epServiceProvider = EPServiceProviderManager
    10                  .getDefaultProvider(config);
    11          EPAdministrator epAdmin = epServiceProvider.getEPAdministrator();
    12         
    13          ConfigurationOperations configOper = epAdmin.getConfiguration();
    14          configOper.addVariable("ifbool", Boolean.class, false);
    15          configOper.addImport(BaseUntil.class);
    16   
    17          // 事件流名称
    18          String className = "orderEvent";// orderEvent.class.getName();
    19          System.out.println("className is " + className);
    20     
    21          String insql = "insert into msgEvent select BaseUntil.getEvent() from orderEvent";
    22          epAdmin.createEPL(insql);
    23          String epsql = "select * from msgEvent";
    24   
    25          System.out.println("epsql:" + epsql);
    26          EPStatement epstate = epAdmin.createEPL(epsql);
    27          epstate.addListener(new orderListener());
    28          EPRuntime epRuntime = epServiceProvider.getEPRuntime();
    29          //
    30          for (int i = 0; i < 5; i++) {
    31              int seed = (int) (Math.random() * 100);
    32              orderEvent event = new orderEvent("" + seed, 100 + seed);
    33              System.out.println("seed name:" + event.getName() + ",salary:"
    34                      + event.getSalary());
    35              orderBean bean = new orderBean();
    36              bean.setKey("BeanKey:" + i);
    37              bean.setValue(seed+i);
    38              event.setBean(bean);
    39   
    40              List<orderBean> list = new ArrayList<orderBean>();
    41              for (int j = 0; j < 10; j++) {
    42                  bean = new orderBean();
    43                  bean.setKey("ListKey:" + j);
    44                  bean.setValue(seed+j);
    45                  list.add(bean);
    46              }
    47              event.setOrderBeans(list);
    48   
    49              Map<Integer, orderBean> map = new HashMap<Integer, orderBean>();
    50              for (int k = 0; k < 10; k++) {
    51                  bean = new orderBean();
    52                  bean.setKey("MapKey" + k);
    53                  bean.setValue(seed+k);
    54                  map.put(k, bean);
    55              }
    56              event.setOrderMap(map);
    57   
    58              epRuntime.sendEvent(event);
    59          }
    60      }
    61  }

    Updating an Insert Stream

    功能:
    在事件即将被用于计算前,改变其自身的属性值,然后再将其用于后面的计算.


    格式:

    update istream event_type [as stream_name]
      set property_name = set_expression [, property_name = set_expression] [,...]
      [where where_expression]

    说明:

    • 因为istream的限制,所以该语法只支持新输入的事件.
    • event_type代表要更新的事件,set之后的property_name是要更新的事件属性,最后可以用where子句进行简单的过滤.
    • update句首用@Priority这个注解,使更新事件的顺序是以优先级最高的最先更新;

    注意:

    • 如果事件是POJO,那么要实现java.io.Serializable接口。因为引擎内部的update操作实际上是要先深复制原事件再更新拷贝后的事件,不会对原事件作出任何修改。
    • 设置属性的表达式不能用聚合函数.
    • 如果事件是xml,update语法则不适用.
    • update操作不可用于嵌套的事件.
  • 相关阅读:
    小结Fragment与FragmentPagerAdapter的生命周期及其关系
    Android的Touch事件分发机制简单探析
    个人项目开源之c++基于epoll实现高并发游戏盒子(服务端+客户端)源代码
    个人项目开源之Django文件中转站源代码
    个人项目开源之Django图书借阅系统源代码
    个人项目开源之实现矩形树图代码统计工具源代码
    颜色渐变算法
    echarts玩转图表之矩形树图
    解决MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk.问题
    引号有几种
  • 原文地址:https://www.cnblogs.com/jianyuan/p/4980293.html
Copyright © 2011-2022 走看看