zoukankan      html  css  js  c++  java
  • Siddhi cep java 集成简单使用

    Siddhi 是一个开源的cep (Complex Event Processing)类库,有一个明显的例子是uber 的事件处理,具体可以google

    几张参考cep 以及siddhi 图








    java 集成使用(使用maven)

    • 代码
    maven 
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.dalong</groupId>
        <artifactId>cepappdemo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <dependencies>
    
          <dependency>
              <groupId>org.wso2.siddhi</groupId>
              <artifactId>siddhi-core</artifactId>
              <version>4.1.7</version>
          </dependency>
          <dependency>
              <groupId>org.wso2.siddhi</groupId>
              <artifactId>siddhi-query-api</artifactId>
              <version>4.1.7</version>
          </dependency>
          <dependency>
              <groupId>org.wso2.siddhi</groupId>
              <artifactId>siddhi-query-compiler</artifactId>
              <version>4.1.7</version>
          </dependency>
          <dependency>
              <groupId>org.wso2.siddhi</groupId>
              <artifactId>siddhi-annotations</artifactId>
              <version>4.1.7</version>
          </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
            <dependency>
                <groupId>quartz</groupId>
                <artifactId>quartz</artifactId>
                <version>1.6.0</version>
            </dependency>
            <dependency>
                <groupId>com.googlecode.disruptor</groupId>
                <artifactId>disruptor</artifactId>
                <version>2.9</version>
            </dependency>
        </dependencies>
    
    </project>
    
    package com.dalong;
    import org.wso2.siddhi.core.SiddhiAppRuntime;
    import org.wso2.siddhi.core.SiddhiManager;
    import org.wso2.siddhi.core.event.Event;
    import org.wso2.siddhi.core.stream.input.InputHandler;
    import org.wso2.siddhi.core.stream.output.StreamCallback;
    
    public class Application {
    
        public static void main(String[] args) throws InterruptedException {
            String siddhiApp = "define stream StockEventStream (symbol string, price float, volume long); " +
                    " " +
                    "@info(name = 'query1') " +
                    "from StockEventStream#window.time(5 sec)  " +
                    "select symbol, sum(price) as price, sum(volume) as volume " +
                    "group by symbol " +
                    "insert into AggregateStockStream ;";
    
            SiddhiManager siddhiManager = new SiddhiManager();
            SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);
            siddhiAppRuntime.addCallback("AggregateStockStream", new StreamCallback() {
                @Override
                public void receive(Event[] event) {
                    for (int i = 0; i < event.length; i++) {
                        System.out.println("get data:"+event[i].toString());
                    }
                }
            });
            InputHandler inputHandler = siddhiAppRuntime.getInputHandler("StockEventStream");
            siddhiAppRuntime.start();
            inputHandler.send(new Object[]{"IBM", 100f, 100L});
            Thread.sleep(1000);
            inputHandler.send(new Object[]{"IBM", 200f, 300L});
            inputHandler.send(new Object[]{"WSO2", 60f, 200L});
            Thread.sleep(1000);
            inputHandler.send(new Object[]{"WSO2", 70f, 400L});
            inputHandler.send(new Object[]{"GOOG", 50f, 30L});
            Thread.sleep(5000);
            inputHandler.send(new Object[]{"IBM", 200f, 400L});
            Thread.sleep(1000);
            inputHandler.send(new Object[]{"WSO2", 70f, 50L});
            Thread.sleep(5000);
            inputHandler.send(new Object[]{"WSO2", 80f, 400L});
            inputHandler.send(new Object[]{"GOOG", 60f, 30L});
            Thread.sleep(3000);
            siddhiAppRuntime.shutdown();
            siddhiManager.shutdown();
        }
    }
    
    • 输出结果
    get data:Event{timestamp=1526826180972, data=[IBM, 100.0, 100], isExpired=false}
    get data:Event{timestamp=1526826181977, data=[IBM, 300.0, 400], isExpired=false}
    get data:Event{timestamp=1526826181978, data=[WSO2, 60.0, 200], isExpired=false}
    get data:Event{timestamp=1526826182983, data=[WSO2, 130.0, 600], isExpired=false}
    get data:Event{timestamp=1526826182983, data=[GOOG, 50.0, 30], isExpired=false}
    get data:Event{timestamp=1526826187988, data=[IBM, 200.0, 400], isExpired=false}
    get data:Event{timestamp=1526826188993, data=[WSO2, 70.0, 50], isExpired=false}
    get data:Event{timestamp=1526826193993, data=[WSO2, 80.0, 400], isExpired=false}
    get data:Event{timestamp=1526826193994, data=[GOOG, 60.0, 30], isExpired=false}
    
    • 代码说明
      比较简单,就是进行5s内事件数据的和,代码中的sleep 就是进行时间控制的,实际中可以进行调整,观看不同的效果

    参考资料

    https://wso2.github.io/siddhi/documentation/siddhi-quckstart-4.0/
    https://github.com/wso2/siddhi
    https://wso2.github.io/siddhi/documentation/user-guide/
    https://github.com/rongfengliang/siddhi-javademo

  • 相关阅读:
    MySQL数据类型
    Linux网络编程:客户端/服务器的简单实现
    初学JAVA
    依据函数名字符串执行函数
    Windows Server 2012学习文档
    DELPHI WEBSERVICE
    常用函数、常量、类型记录
    CAD2007_DWG转PDF
    MCU_头文件编写
    MCU_存储器
  • 原文地址:https://www.cnblogs.com/rongfengliang/p/9064984.html
Copyright © 2011-2022 走看看