zoukankan      html  css  js  c++  java
  • SODBASE CEP学习(四)续:类SQL语言EPL与Storm或jStorm集成-使用分布式缓存

    流式计算在一些情况下会用到分布式缓存,从而实现(1)想把统计或计算结果保存在分布缓存中。供其他模块或其他系统调用。

    (2)某一滑动时间窗体上计数。比如实时统计1小时每一个Cookie的訪问量。实时统计某商品1天内的销售数量和销售额。实时统计某商家1天内的销售量。因为SODBASE CEP引擎本身集成了Redis分布式缓存接口,同一时候在Storm上能够方便地使用SODBASE EPL语句。因此,通过Storm with SQL也就能够方便地使用Redis、在Redis上做滑动窗体了。

    1.演示样例操作步骤

    功能:实时统计1小时每一个Cookie的訪问量

    1.1制作模型文件

    本小节对用到的模型文件建模和单元測试,读者假设时间有限,也能够直接进入下一小节

    (1)网上下载一个Redis, 建议使用Redis 3.0以上linux版本号,SODBASE CEP已支持与Redis 3.0以上的Redis Cluster集成。

    假设认为麻烦想高速把样例跑起来。能够用这个Windows版,解压,默认port6379启动。

    (2)下载SODBASE Studio 2.0.22(sp2)版本号以上

    下载演示样例CEP模型cookie00.sod、cookie01.sod、cookie02.sod、cookie03.sod

    (3)执行SODBASE Studio,导入cookie00.sod、cookie01.sod、cookie02.sod、cookie03.sod

    读者能够查看各个模型的EPL语句,和输入输出配置。

    cookie00:模拟Cookie实时生成数据,连接到cookie01的输入

    cookie01:有两个输出适配器(输出适配器一个重要功能是动作运行),一个加入新数据,一个过期化旧数据,从而在缓存保持10秒滑动窗体。

    cookie02:一个输出适配器,查询计数值

    cookie03:屏幕输出

    (4)将4个EPL模型所有測试执行起来

    (5)输出结果

    (6)为了在Storm中使用EPL模型,将cookie03的输出配置为storm输出。将cookie01、cookie02、cookie03转化为XML模型文件。cookie00不须要,以下步骤在Storm中写了一个Spout作数据来源。

    1.2 编写topology代码

    下载最新版演示样例Storm-EPL-Example。解压后导入到Eclipse中。不用maven,依赖的包都在lib目录下,加到build path就可以。

    开启redisserver。打开com.sodbase.integration.storm.cookie.CookieSlidingWindowCountTopology.java

    Run As->Java Application,就能够在Eclipse中看到输出效果


    打包后的jar包能够部署到stormserver上。

    注:使用jStorm的读者。配置方法和Storm一致。

    在Eclipseproject的lib/storm中使用jstorm依赖的jar包就可以。

    2. 工作原理

    CookieSlidingWindowCountTopology代码例如以下

    package com.sodbase.integration.storm.cookie;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import backtype.storm.utils.Utils;
    
    import com.sodbase.integration.storm.PrintBolt;
    import com.sodbase.outputadaptor.storm.EPLBolt;
    
    public class CookieSlidingWindowCountTopology {
    
      public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
       
        
    	/**
    	 * EPLBolt相关配置
    	 */
        //(1)Bolt的tuple连到EPL哪个流输入上面    
        String streamname="cookie01.input";
        //(2)输出的字段。与EPL输出相应
        Fields outputFields = new Fields("CountIn10Sec","cookieid");
        //(3)EPL引擎cep home
        String cep_home="cep_home";
        //(4)EPL模型文件
        String[] cepmodelfiles=new String[]{
        		//update sliding window with redis
        		"cep_home/files/cookieexample/cookie01.xml",
        		//count
        		"cep_home/files/cookieexample/cookie02.xml",
        		//emit
        		"cep_home/files/cookieexample/cookie03.xml",
        		};
    
        builder.setSpout("event", new RandomCookieSpout(), 1);
        builder.setBolt("EPL", new EPLBolt(streamname,outputFields,cep_home,cepmodelfiles), 1).shuffleGrouping("event");
        builder.setBolt("print1", new PrintBolt(), 1).shuffleGrouping("EPL");
    
        Config conf = new Config();
        conf.setDebug(false);
    
        if (args != null && args.length > 0) {
          conf.setNumWorkers(3);
    
          StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {
    
          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("test", conf, builder.createTopology());
          Utils.sleep(1000000);
          cluster.killTopology("test");
          cluster.shutdown();
        }
      }
    }
    

    參考:

    SODBASE CEP学习(四):类SQL语言EPL与Storm或jStorm集成

    SODBASE CEP学习(四)续:类SQL语言EPL与Storm或jStorm集成-滑动窗体

    SODBASE CEP学习进阶篇(七):SODBASE CEP与Spark streaming集成

    SODBASE CEP学习进阶篇(五):与分布式缓存集成

    SODBASE CEP用于轻松、高效实施数据监測、监控类、实时交易类项目微笑

    EPL语法见SODSQL写法与演示样例

    图形化建模请使用SODBASE Studio

    嵌入式方式编程參见执行第一个EPL样例。与Storm集成參见EPL与Storm集成


    开发人员社区活动,SODBASE产品的用户如今能够领礼品 

  • 相关阅读:
    浅谈数据库设计技巧
    用Sqlhelper类简化数据访问示例
    SQL数据库对象命名详细文档
    C# 中SqlParameter类的使用方法小结
    DataGridView 列宽和行高自动调整的设定
    生成8位的不重复乱码
    DataGridView 冻结列或行
    用Sqlhelper类简化数据访问示例
    .Net中DataTable的保存
    LeapFTP 出现 “426 ”错误的解决方法
  • 原文地址:https://www.cnblogs.com/clnchanpin/p/6853745.html
Copyright © 2011-2022 走看看