zoukankan      html  css  js  c++  java
  • Storm概念学习系列之Tuple元组(数据载体)

      不多说,直接上干货!

    Tuple元组

      Tuple 是 Storm 的主要数据结构,并且是 Storm 中使用的最基本单元、数据模型和元组。

          

    Tuple 描述

      Tuple 就是一个值列表, Tuple 中的值可以是任何类型的,动态类型的Tuple的fields可以不用声明;默认情况下,Storm中的Tuple支持私有类型、字符串、字节数组等作为它的字段值,如果使用其他类型,就需要序列化该类型

      Tuple的字段默认类型有 :  integer、 float、 double、 long、short、 string、 byte、 binary(byte[])

       

      Tuple元组,是消息传递的基本单元,是一个命名的值列表,元组中的字段可以是任何类型的对象。Storm使用元组作为其数据模型,元组支持所有的基本类型、字符串和字节数组作为字段值,只要实现类型的序列化接口就可以使用该类型的对象。

      元组本来应该是一个key-value的Map,但是由于各个组件间传递的元组的字段名称已经事先定义好,所以只要按序把元组填入各个value即可,所以元组是一个vlue的List

      Tuple是Storm采用的数据表示模型,所有的数据都以Tuple的形式在各个组件之间流动。Tuple是一组字段列表,每个字段由一个字段名和字段值组成,每个Tuple类似于数据库中的一行记录。在默认的情况下,Tuple的字段类型可以是integer、long、short、byte、string、double、float、boolean和byte array。当然,你也可以通过实现序列化器自定义类型。


      Tuple 数据结构如图 1 所示。

                            

                                 图 1 Tuple 数据结构

       Tuple 可以理解成键值对。例如,创建一个Bolt 要发送两个字段(命名为 double 和 triple),其中键就是定义在declareOutputFields 方法中的 Fields 对象,值就是在 emit 方法中发送的 Values 对象。

      以下是一个简单例子

    public class DoubleAndTripleBolt extends BaseRichBolt {
    OutputCollectorBase _collector;
    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) { _collector = collector; }
    @Override
    public void execute(Tuple input) { int val = input.getInteger(0); _collector.emit(input, new Values(val*2, val*3)); _collector.ack(input); }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("double", "triple")); }}

      此外,在使用的 Storm Java 包中, backtype.storm.tuple 主要有以下几个类:

    Fileds.class
    MessageId.class
    Tuple.class
    TupleImpl.class
    Values.class

      列出以上内容是为了更好地理解 Tuple,这样能够从本质上理解 Tuple,在使用时更加得心应手。

    Tuple 的生命周期

      了解一个 Tuple 的生命周期就需要查看源码,如下的 Java 代码展示了 Spout(消息源)接口发出 Tuple(消息)的整个过程。

    public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
    void close();
    }

      首 先, Storm 调 用 Spout(消息源)的nextTuple 方法来获取下一个Tuple, Spout通过Open 方法的参数提供的SpoutOutputCollector将新Tuple发射到其中一个输出消息流

        注意:发射Tuple 时, Spout提供一个message-id,通过这个ID 来追踪该Tuple。

       接下来, Storm跟踪该Tuple的树形结构是否成功创建,并根据 messageid调用Spout中的ack函数,以确认Tuple是否被完全处理。如果Tuple超时,则调用 Spout 的 fail 方法。

      由此看出,同一个Tuple不管是acked,还是failed都是由创建它的Spout发出并维护的,所以,即使Spout 在集群环境中同时执行很多的任务,该Tuple 也不会被其他任务调用或生成 acked或 failed 状态。总之, Storm会利用内部的 Acker 机制保证每个Tuple 被可靠地处理。最后,在任务完成后,Spout调用Close方法结束 Tuple 的使命。

      比如

  • 相关阅读:
    springboot整合springmvc原理
    springboot Thymeleaf
    springboot 首页处理
    springboot整合Druid
    springboot 整合JDBC
    CentOS安装Mysql
    springboot 多环境切换
    springboot JSR303数据校验
    【转载】WEB架构师成长之路
    一些想法
  • 原文地址:https://www.cnblogs.com/zlslch/p/5989281.html
Copyright © 2011-2022 走看看