zoukankan      html  css  js  c++  java
  • Storm概念学习系列之Spout数据源

      不多说,直接上干货!

    Spout 数据源

      消息源Spout是Storm的Topology中的消息生产者(即Tuple的创造者)

    Spout 介绍

      1. Spout 的结构
      Spout 是 Storm 的核心组件之一,最源头的接口是 IComponent,如图 1所示,几个Spout接口都继承自IComponent。
              

                            图 1    Spout 类图

      2. Spout 发出的消息

       Spout从外部获取数据后,向Topology中发出的Tuple可以是可靠的,也可以是不可靠的。

      注意:一个可靠的消息源可以重新发射一个Tuple(如果该 Tuple 没有被 Storm 成功处理),但是一个不可靠的消息源Spout 一旦发出,一个Tuple 就把它彻底“遗忘”,也就不可能再发了。



      3.Spout 发射的流
      Spout 可以发射多个流。要达到这样的效果,使用 OutputFieldsDeclarer.declareStream 来定义多个流(即定义多个 Stream),然后使用 SpoutOutputCollector 的emit来发射指定的流。

      4.Spout 的重要方法

      Spout 的重要方法是 nextTuple()。 nextTuple 方法发射一个新的元组到 Topology,如果没有新元组发射,则直接返回。注意任务 Spout 的 nextTuple 方法都不要实现成阻塞的,因为Storm 是在相同的线程中调用 Spout 的方法。 Spout 的另外两个重要方法是 ack ()和 fail() 方法。当 Spout 发射的元组被拓扑成功处理时,调用 ack 方法;当处理失败时,调用 fail 方法。 ack和 fail 方法仅被可靠的 Spout 调用。



      5.Spout 的组件
     Spout的最顶层抽象是ISpout接口。在通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout。

          


     Spout 实例

      下面通过创建一个实例RandomSpout来介绍Spout, 图1为RandomSpout继承自BasicRichSpout及其实现的原理图。
        图2 列出了实例 RandomSpout 继承自 BaseRichSpout 中的几个重要方法。

      下面对图2 中的方法进行详细介绍。
      (1) open 方法
      当一个 Task 被初始化时会调用此 open 方法。一般都会在此方法中初始化发送 Tuple 的对象 SpoutOutputCollector 和配置对象 TopologyContext。
      代码示例如下:

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
      this.collector = collector;
      random = new Random();
    }

                

                            图2  RandomSpout 类图

                    

                          图 3-4 RandomSpout 类的主要方法

      (2) declareOutputFields 方法
      此方法用于声明当前 Spout 的 Tuple 发送流。流的定义是通过 OutputFieldsDeclare.declareStream方法完成的,其中的参数包括了发送的域 Fields。
      示例代码如下:

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
      declarer.declare(new Fields("randomInt"));
    }

      (3) nextTuple 方法
      这是 Spout 类中最重要的一个方法。发射一个 Tuple 到 Topology 都是通过该方法来实现的。

       示例代码如下:

    public void nextTuple() {
      while(true){
        Values val = new Values(random.nextInt(100));
        collector.emit(val);
      try {
        Thread.sleep(500);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      }
    }

      以上代码从 100 以内的整数中随机生成一个数作为 Tuple 的值,然后通过_collector 发送到 Topology。
      另外,除了上述几个方法之外,还有 getComponentConf iguration、ack、fail 和 close 方法等。 getComponentConfiguration 方法用于配置当前组件的参数, Storm 监测到一个 Tuple 被成功处理时调用 ack 方法,处理失败时调用 fail 方法,这两个方法在 BaseRichSpout 类中已经被隐式实现了。

  • 相关阅读:
    基于SAR对Linux资源的监控shell脚本
    Python3+RobotFramewok 用户自定义库的开发(四)
    Python3+RobotFramewok 循环判断以及Evaluate用法(三)
    Python3+RobotFramewok 快速入门(二)
    Python3+RobotFramewok RIDE环境搭建(一)
    MySQL主从双向同步
    笔记:网络协议
    Jmeter组成结构及运行原理
    Selenium WebDriver的实现及工作原理
    Jenkins+maven环境部署
  • 原文地址:https://www.cnblogs.com/zlslch/p/5989285.html
Copyright © 2011-2022 走看看