zoukankan      html  css  js  c++  java
  • esper(4-1)-简单context

    1、创建context语法

    create context context_name 
    partition [by] event_property [and event_property [and ...]] 
    from stream_def [, event_property [...] from stream_def] [, ...]

    context_name为context的名字,并且唯一。如果重复,会说明已存在。

    event_property为事件的属性名,多个属性名之间用and连接,也可以用逗号连接。

    stream_def就是view。

    例如:

    //一个流:id和name是User的属性
    create context NewUser partition by id and name from User
    
    //多个流:sid是Student的属性,tid是Teacher的属性
    create context Person partition by sid from Student, tid from Teacher

    多个流一定要注意,每个流的中用于context的属性的数量要一样,数据类型也要一致。比如下面这几个就是错误的:

    // 错误:sid是int,tname是String,数据类型不一致
    create context Person partition by sid from Student, tname from Teacher
    
    // 错误:Student有一个属性,Teacher有两个属性,属性数量不一致
    create context Person partition by sid from Student, tid,tname from Teacher
    
    // 错误:sid对应tname,sname对应tid,并且sname和tname是String,sid和tid是int,属性数量一样,但是对应的数据类型不一致
    create context Person partition by sid,sname from Student, tname,tid from Teacher

    可以对进入context的事件增加过滤条件,不符合条件的就被过滤掉,就像下面这样:

    // age大于20的Student事件才能建立或者进入context
    create context Person partition by sid from Student(age > 20)

    例子:

    import cn.hutool.core.lang.Console;
    import com.espertech.esper.client.*;
    import com.espertech.esper.client.context.ContextPartitionSelectorSegmented;
    import lombok.*;
    
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     * @author yaoyuan2
     * @date 2019/3/26
     */
    public class SimpleContext {
        static EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider();
        static EPAdministrator admin = epService.getEPAdministrator();
        static String view = User.class.getName();
        public static void main(String[] args) {
            String epl1 = "create context esbtest partition by id from "+view;//必须首先创建,否则报错:没有declare context
            String epl2 = "context esbtest select avg(amount) as avgAmount,id from " + view;
            Console.log(epl1);
            Console.log(epl2);
            EPStatement context = admin.createEPL(epl1);
            EPStatement state = admin.createEPL(epl2);
            state.addListener(new UpdateListener() {
                @Override
                public void update(EventBean[] newEvents, EventBean[] oldEvents) {
                    if (newEvents != null) {
                        Double avgAmount = (Double) (newEvents[0].get("avgAmount"));
                        Integer id = (Integer) (newEvents[0].get("id"));
                        Console.log("id={},avgAmount={}",id,avgAmount);
                    }
                }
            });
            EPRuntime runtime = epService.getEPRuntime();
            //查看id为1的平均金额
            ContextPartitionSelectorSegmented selectCtx = new ContextPartitionSelectorSegmented() {
    
                // 该方法的实现方式与context定义的properties有关,如果有两个property id和time,则Object数组长度为2,obj[0]为id值,
                // obj[1]为time值,然后再添加到list中并返回
                @Override
                public List<Object[]> getPartitionKeys() {
                    Object[] o = new Object[1];
                    o[0] = 1;
                    List<Object[]> list = new ArrayList<Object[]>();
                    list.add(o);
                    return list;
                }
            };
            User u1 = new User(1,20);
            Console.log("sendEvent: id={},amount={}",u1.getId(),u1.getAmount());
            runtime.sendEvent(u1);
            User u2 = new User(2,30);
            Console.log("sendEvent: id={},amount={}",u2.getId(),u2.getAmount());
            runtime.sendEvent(u2);
            User u3 = new User(1,30);
            Console.log("sendEvent: id={},amount={}",u3.getId(),u3.getAmount());
            runtime.sendEvent(u3);
    
            Iterator<EventBean> it = state.iterator(selectCtx);
            EventBean event = it.hasNext() ? it.next() : null;
            Console.log("Iterator context:id=1,avgAmount={}",event.get("avgAmount"));
    
            User u4 = new User(2,40);
            Console.log("sendEvent: id={},amount={}",u4.getId(),u4.getAmount());
            runtime.sendEvent(u4);
    
        }
    }
    @Setter
    @Getter
    @ToString
    @NoArgsConstructor
    @AllArgsConstructor
    class User implements Serializable {
        private int id;
        private int amount;
    }

    输出

    create context esbtest partition by id from com.ebc.User
    context esbtest select avg(amount) as avgAmount,id from com.ebc.User
    
    sendEvent: id=1,amount=20
    id=1,avgAmount=20.0
    sendEvent: id=2,amount=30
    id=2,avgAmount=30.0
    sendEvent: id=1,amount=30
    id=1,avgAmount=25.0
    Iterator context:id=1,avgAmount=25.0
    sendEvent: id=2,amount=40
    id=2,avgAmount=35.0

    通过例子发现,partition by id会根据id分组,每组进入各自的context。上例中,会建2个context,一个id=1的,另外1个id=2的。

    如果partition by后边有同一个流的多个属性值,如:partition by id,name。A事件id=1,amount=20,B事件id=1,amount=21,C事件id=1,amount=20,那么A和C在一个context,B在另外1个context。

  • 相关阅读:
    [数据结构与算法 01] 什么是数据结构?什么是算法?联系是什么?常用的数据结构/算法有?
    程序员面试金典-面试题 16.05. 阶乘尾数
    程序员面试金典-面试题 16.04. 井字游戏
    程序员面试金典-面试题 16.02. 单词频率
    程序员面试金典-面试题 16.01. 交换数字
    程序员面试金典-面试题 10.11. 峰与谷
    程序员面试金典-面试题 10.10. 数字流的秩
    程序员面试金典-面试题 10.09. 排序矩阵查找
    程序员面试金典-面试题 10.05. 稀疏数组搜索
    程序员面试金典-面试题 10.03. 搜索旋转数组
  • 原文地址:https://www.cnblogs.com/yaoyuan2/p/10601240.html
Copyright © 2011-2022 走看看