zoukankan      html  css  js  c++  java
  • Esper简介

    1. CEP(Complex Event Processing, 复杂事件处理)

    事件(Event)一般情况下指的是一个系统中正在发生的事,事件可能发生在系统的各个层面上,它可以是某个动作,例如客户下单,发送消息,提交报告等,也可以是某种状态的改变,例如温度的变化,超时等等。通过对这些事件进行分析,可以提取出其中有效的信息。 根据维基百科的定义,事件处理(Event processing)指的是跟踪系统中发生的事件,分析事件中的信息并从中得到某种结论。而复杂事件处理,则是结合多个事件源中的事件,从中推断出更加复杂的情况下的事件。

    由此可见,CEP的目的包括:(1)识别所需要的事件;(2)快速地对这些事件进行处理。 通常情况下,我们想要利用CEP达到的目的是掌握当前的某种情况或者说状态,因此CEP感兴趣的不是事件本身给出的信息,而是通过这些信息所能推导出的某种结论,通过CEP,我们能够让这些事件变得有意义。

    要实现一个CEP引擎,需要考虑的事情包括: (1)吞吐量; (2)低延迟,从事件到达到事件被处理,不能有太大的延迟; (3)复杂的逻辑处理,CEP需要能够对事件进行较为复杂的操作,例如,检测事件之间的相关性,过滤,加窗,连接等。

    2. Esper

    Esper是一个开源的复杂事件处理引擎,它的目的是让用户能够通过它提供的接口,构建一个用于处理复杂事件的应用程序。 这里写图片描述 

    从Esper的架构图中,可以看出,Esper主要包括了三个部分:Input adapter,Esper engine,Output adapter。

    2.1 Input adapter & Output adapter

    输入适配器和输出适配器的主要目的是接收来自不同事件源的事件,并向不同的目的地输出事件。 目前,Esper提供的适配器包括File Input and Output adpter, Spring JMS Input and Output Adapter, AMQP Input and Output Adapter, Kafka Adapter等等。这些适配器提供了一系列接口,可以让用户从不同的数据源读取数据,并将数据发送给不同的目的数据源,用户可以不用自己单独编写客户端代码来连接这些数据源,感觉相当于对这些数据源提供了一层封装。

    2.2 Esper engine

    Esper引擎是处理事件的核心,它允许用户定义需要接收的事件以及对这些事件的处理方式。

    2.2.1 Esper支持的事件表现形式

    Esper支持多种事件表现形式,包括遵循JavaBean方式的含有getter方法的Java POJO(普通Java对象),实现了Map接口的对象,对象数组,XML文档对象,以及Apache Avro(一个支持JSON和Schema的数据序列化系统,可以将数据结构或对象转化成便于存储和传输的格式)。 这些事件表现形式的共同之处在于,它们都提供了事件类型的元数据,也就是说能够表示事件的一系列属性,例如,一个Java对象可以通过其成员变量来表示其事件属性,一个Map对象能够通过键值对来表示属性。由此可见,本质上事件是一系列属性值的集合,对事件的操作即对事件中的部分或全部属性的操作。

    2.2.2 Esper事件处理模型

    Esper的事件处理模型主要包括两个部分:Statement和Listener。 (1)Statement 利用Esper的事件处理语言EPL声明对事件进行的操作,Esper中提供了多种类型的事件操作,包括过滤、加窗、事件聚合等等。EPL是一种类似于SQL的语言,从这一点上来看,Esper恰好与数据库相反,数据库时保存数据,并在数据上运行查询语句,而Esper是保存查询语句,在这些查询上运行数据,只要事件与查询条件匹配,Esper就会实时进行处理,而不是只有在查询提交的时候才处理。 假设现在要处理的事件是用户注册事件,注册时用户需要提供用户名和年龄,那么事件中将包含用户名和年龄两个属性,而我们要做的事是计算用户的平均年龄,那么,首先应该定义一个事件类PersonEvent,并加上getter方法:

    public class PersonEvent extends Event {
        
        private String name;
        private int age;
        
        public PersonEvent(String name, int age) {
            
            this.name = name;
            this.age = age;
            
        }
    
        public String getName() {
            return name;
        }
    
        public int getAge() {
            return age;
        }    
    
    }

    然后,通过EPL语言声明对事件的操作,此处为取平均值:

    import com.espertech.esper.client.EPServiceProvider;
    import com.espertech.esper.client.EPServiceProviderManager;
    import com.espertech.esper.client.EPStatement;
    import com.events.Event;
    import com.events.OrderEvent;
    import com.events.PersonEvent;
    import com.listener.OrderEventListener;
    import com.listener.PersonEventListener;
    
    public class EsperClient {
        
        private EPServiceProvider engine;
        
        public EsperClient() {
            //obtain an engine instance
            this.engine = EPServiceProviderManager.getDefaultProvider();
            //System.out.println(engine.getURI());
        }
        
        public void personEventProcess() {
            
            //tell the engine about the event type
            engine.getEPAdministrator().getConfiguration().addEventType(PersonEvent.class);
            //create an epl statement
            String epl = "select name, age from PersonEvent";
            EPStatement statement = engine.getEPAdministrator().createEPL(epl);
            
        }
        
        public void send(Event event) {
            
            engine.getEPRuntime().sendEvent(event);
            
        }
    
    }

    其中,send方法用于向Esper引擎发送一个事件,当引擎接收到这个事件后,便可根据事件的类型进行相应的处理。 (2)Listener Listener用于监听事件的处理情况,接收事件处理的结果,通过UpdateListener接口来实现,它相当于一个回调函数,当事件处理完成之后,可以通过该回调函数向结果发送到目的地。此处将处理结果打印到控制台:

    import com.espertech.esper.client.EventBean;
    import com.espertech.esper.client.UpdateListener;
    
    public class PersonEventListener implements UpdateListener {
    
    
        @Override
        public void update(EventBean[] newEvents, EventBean[] oldEvents) {
            // TODO Auto-generated method stub
            EventBean event = newEvents[0];
            System.out.println(String.format("Name: %s, Age: %d", event.get("name"), event.get("age")));
        }
    
    
    }

    然后将对事件的操作声明和监听器关联起来:

    public void personEventProcess() {
            
            //tell the engine about the event type
            engine.getEPAdministrator().getConfiguration().addEventType(PersonEvent.class);
            //create an epl statement
            String epl = "select avg(age) from PersonEvent";
            EPStatement statement = engine.getEPAdministrator().createEPL(epl);
            //attach a callback to receive the results
            statement.addListener(new PersonEventListener());
        }

    测试:

    import com.esper.client.EsperClient;
    import com.events.PersonEvent;
    
    public class Test {
    
        @SuppressWarnings("static-access")
        public static void main(String[] args) throws InterruptedException {
            // TODO Auto-generated method stub
            
            EsperClient ec = new EsperClient();
            ec.personEventProcess();
            ec.send(new PersonEvent("name", 10));    
            ec.send(new PersonEvent("name", 20));    
    }
    }
  • 相关阅读:
    Linux的JVM可以从SUN网站上下载
    实践是最好的老师
    SCAU 8624 多项式系数累加和
    SCAU 8617 阶乘数字和 (水题)
    SCAU 8614 素数
    SCAU 8619 公约公倍
    HDU ACM 1106 排序
    Uva 465 Overflow
    SCAU 8611 大牛之路I
    SCAU 9501 ACMer不得不知道的事儿
  • 原文地址:https://www.cnblogs.com/yitudake/p/6747990.html
Copyright © 2011-2022 走看看