zoukankan      html  css  js  c++  java
  • Storm—Storm常用API

    4.1 API简介

    4.1.1 Component组件

    1)基本接口

           (1)IComponent接口

           (2)ISpout接口

           (3)IRichSpout接口

           (4)IStateSpout接口

           (5)IRichStateSpout接口

           (6)IBolt接口

           (7)IRichBolt接口

           (8)IBasicBolt接口

    2)基本抽象类

           (1)BaseComponent抽象类

           (2)BaseRichSpout抽象类

           (3)BaseRichBolt抽象类

    (4)BaseTransactionalBolt抽象类

           (5)BaseBasicBolt抽象类

    4.1.2 spout水龙头

    Spout的最顶层抽象是ISpout接口

    (1)Open()

    是初始化方法

    (2)close()

    在该spout关闭前执行,但是并不能得到保证其一定被执行,kill -9时不执行,Storm kill {topoName} 时执行

    (3)activate()

           当Spout已经从失效模式中激活时被调用。该Spout的nextTuple()方法很快就会被调用。

    (4)deactivate ()

           当Spout已经失效时被调用。在Spout失效期间,nextTuple不会被调用。Spout将来可能会也可能不会被重新激活。

    (5)nextTuple()

           当调用nextTuple()方法时,Storm要求Spout发射元组到输出收集器(OutputCollecctor)。NextTuple方法应该是非阻塞的,所以,如果Spout没有元组可以发射,该方法应该返回。nextTuple()、ack()和fail()方法都在Spout任务的单一线程内紧密循环被调用。当没有元组可以发射时,可以让nextTuple去sleep很短的时间,例如1毫秒,这样就不会浪费太多的CPU资源。

    (6)ack()

    成功处理tuple回调方法

    (7)fail()

    处理失败tuple回调方法

    原则:通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。

    4.1.3 bolt转接头

    bolt的最顶层抽象是IBolt接口

    (1)prepare()

           prepare ()方法在集群的工作进程内被初始化时被调用,提供了Bolt执行所需要的环境。

    (2)execute()

    接受一个tuple进行处理,也可emit数据到下一级组件。

    (3)cleanup()

           Cleanup方法当一个IBolt即将关闭时被调用。不能保证cleanup()方法一定会被调用,因为Supervisor可以对集群的工作进程使用kill -9命令强制杀死进程命令。

           如果在本地模式下运行Storm,当拓扑被杀死的时候,可以保证cleanup()方法一定会被调用。

    实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现 IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做了prepare方法和collector.emit.ack(inputTuple)。

    4.1.4 spout的tail特性

    Storm可以实时监测文件数据,当文件数据变化时,Storm自动读取。

    4.2 网站日志处理案例

    4.2.1 实操环境准备

    1)打开eclipse,创建一个java工程

    2)在工程目录中创建lib文件夹

    3)解压apache-storm-1.1.0,并把解压后lib包下的文件复制到java工程的lib文件夹中。然后执行build path。

    4.2.2 需求1:将接收到日志的会话id打印在控制台

    1)需求:

           (1)模拟访问网站的日志信息,包括:网站名称、会话id、访问网站时间等

           (2)将接收到日志的会话id打印到控制台

    2)分析

           (1)创建网站访问日志工具类

           (2)在spout中读取日志文件,并一行一行发射出去

           (3)在bolt中将获取到的一行一行数据的会话id获取到,并打印到控制台。

           (4)main方法负责拼接spout和bolt的拓扑。

    3)案例实操

           (1)创建网站访问日志

    package com.atguigu.storm.weblog;

    import java.io.File;

    import java.io.FileOutputStream;

    import java.io.IOException;

    import java.util.Random;

    // 生成数据

    public class GenerateData {

           public static void main(String[] args) {

                  File logFile = new File("e:/website.log");

                  Random random = new Random();

                  // 1 网站名称

                  String[] hosts = { "www.atguigu.com" };

                  // 2 会话id

                  String[] session_id = { "ABYH6Y4V4SCVXTG6DPB4VH9U123", "XXYH6YCGFJYERTT834R52FDXV9U34",

                                "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" };

                  // 3 访问网站时间

                  String[] time = { "2017-08-07 08:40:50", "2017-08-07 08:40:51", "2017-08-07 08:40:52", "2017-08-07 08:40:53",

                                "2017-08-07 09:40:49", "2017-08-07 10:40:49", "2017-08-07 11:40:49", "2017-08-07 12:40:49" };

                  // 4 拼接网站访问日志

                  StringBuffer sbBuffer = new StringBuffer();

                  for (int i = 0; i < 40; i++) {

                         sbBuffer.append(hosts[0] + " " + session_id[random.nextInt(5)] + " " + time[random.nextInt(8)] + " ");

                  }

                  // 5 判断log日志是否存在,不存在要创建

                  if (!logFile.exists()) {

                         try {

                                logFile.createNewFile();

                         } catch (IOException e) {

                                System.out.println("Create logFile fail !");

                         }

                  }

                  byte[] b = (sbBuffer.toString()).getBytes();

                  // 6 将拼接的日志信息写到日志文件中

                  FileOutputStream fs;

                  try {

                         fs = new FileOutputStream(logFile);

                         fs.write(b);

                         fs.close();

                         System.out.println("generate data over");

                  } catch (Exception e) {

                         e.printStackTrace();

                  }

           }

    }

           (2)创建spout

    package com.atguigu.storm.weblog;

    import java.io.BufferedReader;

    import java.io.FileInputStream;

    import java.io.InputStreamReader;

    import java.util.Map;

    import org.apache.storm.spout.SpoutOutputCollector;

    import org.apache.storm.task.TopologyContext;

    import org.apache.storm.topology.IRichSpout;

    import org.apache.storm.topology.OutputFieldsDeclarer;

    import org.apache.storm.tuple.Fields;

    import org.apache.storm.tuple.Values;

    public class WebLogSpout implements IRichSpout{

           private static final long serialVersionUID = 1L;

          

           private BufferedReader br;                 

           private SpoutOutputCollector collector = null;

           private String str = null;

          

           @Override

           public void nextTuple() {

                  // 循环调用的方法

                  try {

                         while ((str = this.br.readLine()) != null) {

                                // 发射出去

                                collector.emit(new Values(str));

                               

    //                          Thread.sleep(3000);

                         }

                  } catch (Exception e) {

                        

                  }

           }

          

           @SuppressWarnings("rawtypes")

           @Override

           public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

                  // 打开输入的文件

                  try {

                         this.collector = collector;

                         this.br = new BufferedReader(new InputStreamReader(new FileInputStream("e:/website.log"), "UTF-8"));

                  } catch (Exception e) {

                         e.printStackTrace();

                  }

           }

          

           @Override

           public void declareOutputFields(OutputFieldsDeclarer declarer) {

                  // 声明输出字段类型

                  declarer.declare(new Fields("log"));

           }

           @Override

           public void ack(Object arg0) {

                 

           }

           @Override

           public void activate() {

                 

           }

           @Override

           public void close() {

                 

           }

           @Override

           public void deactivate() {

                 

           }

           @Override

           public void fail(Object arg0) {

                 

           }

           @Override

           public Map<String, Object> getComponentConfiguration() {

                  return null;

           }

    }

           (3)创建bolt

    package com.atguigu.storm.weblog;

    import java.util.Map;

    import org.apache.storm.task.OutputCollector;

    import org.apache.storm.task.TopologyContext;

    import org.apache.storm.topology.IRichBolt;

    import org.apache.storm.topology.OutputFieldsDeclarer;

    import org.apache.storm.tuple.Fields;

    import org.apache.storm.tuple.Tuple;

    public class WebLogBolt implements IRichBolt {

           private static final long serialVersionUID = 1L;

           private OutputCollector collector = null;

           private int num = 0;

           private String valueString = null;

           @Override

           public void execute(Tuple input) {

                  try {

                         // 1 获取传递过来的数据

                         valueString = input.getStringByField("log");

                         // 2 如果输入的数据不为空,行数++

                         if (valueString != null) {

                                num++;

                                System.err.println(Thread.currentThread().getName() + "lines  :" + num + "   session_id:" + valueString.split(" ")[1]);

                         }

                         // 3 应答Spout接收成功

                         collector.ack(input);

                         Thread.sleep(2000);

                  } catch (Exception e) {

                         // 4 应答Spout接收失败

                         collector.fail(input);

                         e.printStackTrace();

                  }

           }

           @SuppressWarnings("rawtypes")

           @Override

           public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

                  this.collector = collector;

           }

           @Override

           public void declareOutputFields(OutputFieldsDeclarer declarer) {

                  // 声明输出字段类型

                  declarer.declare(new Fields(""));

           }

           @Override

           public void cleanup() {

                 

           }

           @Override

           public Map<String, Object> getComponentConfiguration() {

                  return null;

           }

    }

           (4)创建main

    package com.atguigu.storm.weblog;

    import org.apache.storm.Config;

    import org.apache.storm.LocalCluster;

    import org.apache.storm.StormSubmitter;

    import org.apache.storm.topology.TopologyBuilder;

    public class WebLogMain {

           public static void main(String[] args) {

                  // 1 创建拓扑对象

                  TopologyBuilder builder = new TopologyBuilder();

                  // 2 设置Spout和bolt

                  builder.setSpout("weblogspout", new WebLogSpout(), 1);

                  builder.setBolt("weblogbolt", new WebLogBolt(), 1).shuffleGrouping("weblogspout");

                  // 3 配置Worker开启个数

               Config conf =  new Config();

               conf.setNumWorkers(4);

              

                  if (args.length > 0) {

                         try {

                                // 4 分布式提交

                                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

                         } catch (Exception e) {

                                e.printStackTrace();

                         }

                  }else {

                         // 5 本地模式提交

                         LocalCluster localCluster = new LocalCluster();

                         localCluster.submitTopology("weblogtopology", conf, builder.createTopology());

                  }

           }

    }

    4.2.3 需求2:动态增加日志,查看控制台打印信息(tail特性)

    1)在需求1基础上,运行程序。

    2)打开website.log日志文件,增加日志调试并保存。

    3)观察控制台打印的信息。

    结论:Storm可以动态实时监测文件的增加信息,并把信息读取到再处理。

     

  • 相关阅读:
    Community Server 2.0 学习笔记:如何实现在线人数?
    CommunityServer2.0改造的一些心得[粗糙版]
    DotLucene源码浅读笔记(2) : Lucene.Net.Documents
    有意思.在线版的photoshop
    电子商务教程[资源]
    Lucene 1.9 多目录搜索的的一个bug
    小总结:DotLucene如何才能快速生成索引?
    DotLucene源码浅读笔记(1)补遗:编写简单中文分词器ChineseAnalyzer
    Windows下傻瓜式快速搭建Discuz论坛(也可以参考用于搭建其他php论坛)
    Lucene.net常见功能实现知识汇总
  • 原文地址:https://www.cnblogs.com/tesla-turing/p/13699003.html
Copyright © 2011-2022 走看看