zoukankan      html  css  js  c++  java
  • storm

    理论

    Hadoop的出现虽然为大数据计算提供了一条捷径,但其仍然存在自身难以克服的缺点:实时性不足。Hadoop的一轮计算的启动需要较长时间,因此其满足不了对实时性有较高要求的场景。

    Storm由此应运而生,提供了可扩展的,可靠的,易于使用,而且是编程语言无关的实时大数据处理框架。

    使用

    Components of a storm cluster

    Storm集群类似于Hadoop集群,storm运行与topo之上。

    Storm集群中存在两类节点:master节点和worker节点。master运行在一个守护进程上,该守护进程称为Nimbus,负责集群中的代码分发,任务分配以及错误处理。

    每个worker节点运行在守护进程Supervisor上,Supervisor根据Nimbus的命令,负责worker的启停。

    每个worker都运行一个topo的子集,一个topo是由分布于多台主机间的多个worker组成的。

     

    另外,Nimbus和Supervisor是由zookeeper来协调的,Nimbus和Supervisor是无状态的,所有的状态都被zookeeper保存在本地硬盘上,因此对Nimbus和Supervisor的操作是非常安全的,在强行终止进程后,状态也能够快速从zookeeper中恢复。

    Topology

    Topology是一个计算拓扑,拓扑中的每个节点都包含着计算逻辑,而且节点中的连接代表了数据的流向。

    启动命令:

    storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

    all-my-code.jar中通过main()实现了topo,storm命令连接Nimbus并上传jar。

    Stream

    Stream是Storm的核心抽象,由无限的tuple组成,storm提供将stream转换为另外的stream的能力。

    能力的提供是由spout和bolt来实现的。Spout和bolt都有接口可以实现自己的业务逻辑。

    spout是stream的入口,spout可以从不同的数据源读入数据。

    bolt消费stream,并且做处理,可能在处理后会输出新的stream,通过多个bolt的连接,可以实现复杂的处理流程。

    spout和bolt的网络构成了topo,是storm的最高层次的抽象。

     

    topo中的所有节点都是并行运行的,在topo中,我们可以指定并行度,storm将会根据并行度生成线程并在整个集群内运行。

    topo使用运行,不会停止,除非人为干预。最为重要的是,storm保证数据不丢失,包括消息,任务等。

    Data Model

    storm使用tuple作为其数据模型,tuple是一组值的集合,tuple中的值域可以是任意类型,包括java中的基本类型或者其他自定义类型(需要实现serializer)。

    集群中的每个节点都需要声明它要输出的tuple的值域。如下这个bolt就声明了“double”和“triple”这两个域。

    public class DoubleAndTripleBolt extends BaseRichBolt {
    
        private OutputCollectorBase _collector;
    
     
    
        @Override
    
        public void prepare(Map conf, TopologyContext context, OutputCollectorBase collector) {
    
            _collector = collector;
    
        }
    
     
    
        @Override
    
        public void execute(Tuple input) {
    
            int val = input.getInteger(0);       
    
            _collector.emit(input, new Values(val*2, val*3));
    
            _collector.ack(input);
    
        }
    
     
    
        @Override
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            declarer.declare(new Fields("double", "triple"));
    
    }   
    
    }

    declareOutputFields声明了两个输出域“double”和“triple”。该bolt后续的bolt将会根据这个输入解析bolt。

    A simple topology

    setSpout/setBolt

    TopologyBuilder builder = new TopologyBuilder();        
    builder.setSpout("words", new TestWordSpout(), 10); //设置spout的ID及并行度       
    builder.setBolt("exclaim1", new ExclamationBolt(), 3)
            .shuffleGrouping("words");  //设置bolt的id及并行度,且在spout后
    builder.setBolt("exclaim2", new ExclamationBolt(), 2)
            .shuffleGrouping("exclaim1"); 

    使用上面的语句就构造了一个topo,该topo有一个spout和两个bolt。

    构成的topo为:

     

    parallelism

    并行度是节点上的概念,表示在整个集群执行该节点的逻辑应当启动几个线程。

    shuffleGroup方法表示消息在节点中的分发是随机的,也会有其他的分发方式,需要另文解释。

    另外,可以采用链式调用来指定多个输入。

    builder.setBolt("exclaim2", new ExclamationBolt(), 5)
                .shuffleGrouping("words")
                .shuffleGrouping("exclaim1");

    如果Bolt如此定义,则整个topo将会修改为:

     

    由此可见bolt节点可以存在多个输入。

    nextTuple

    public void nextTuple() {
        Utils.sleep(100);
        final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
        final Random rand = new Random();
        final String word = words[rand.nextInt(words.length)];
        _collector.emit(new Values(word));
    }

    如上面的方法所示,实现了该方法的节点将会随机的发送word。

    Bolt实现的方法类ExclamationBolt会读取输入,并在文字末尾加入“!!!”。实现如下:

    public static class ExclamationBolt implements IRichBolt {
        OutputCollector _collector;
     
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }
     
        public void execute(Tuple tuple) {
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }
     
        public void cleanup() {
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
     
        public Map getComponentConfiguration() {
            return null;
        }
    }

    Prepare

    prepare方法想bolt节点中设置了OutputCollector,通过OutputCollector发射tuples。可以在任何时候调用outputcollector.emit()方法发射tuple。

    prepare方法仅起到初始化作用。

    Execute

    execute方法时bolt的主要逻辑,可以通过调用方法Tuple#getSourceComponent来获取tuple的来源。

    方法ACK用于向上一个节点反馈执行情况。

    Cleanup

    清理方法,不可靠调用,通常用于本地调试

    declareOutputFields

    用于声明输出域

    getComponentConfigurateion

    允许用户设置节点的一些配置参数,属于高级特性

    Example

    public static class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;
     
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }
     
        public void execute(Tuple tuple) {
            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
            _collector.ack(tuple);
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }    
    }

    Running Topology in local mode

    采用本地模式可以进行topo调试,此时,所有的worker将会使用线程进行模拟。

    Config conf = new Config();
    conf.setDebug(true);
    conf.setNumWorkers(2);
     
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("test", conf, builder.createTopology());
    Utils.sleep(10000);
    cluster.killTopology("test");
    cluster.shutdown();

    Stream grouping

    stream grouping表示怎样在两个component(spout和bolt都是组件)发送tuple。需要记住的是,spouts和blots在整个集群中实际上是并行执行的,如下图:

     

    此时,就需要stream groups来决定tuple是发送到Bolt A还是Bolt B。

    STORM中已有有一些成熟的实现,如shuffleGrouping和fieldGrouping。

    具体使用,可以参考文档:

    http://storm.apache.org/documentation/Concepts.html

    Guaranteeing message processing

    http://storm.apache.org/documentation/Guaranteeing-message-processing.html

    使用该机制保证消息都被正确处理

    Transactional topologies

  • 相关阅读:
    hibernate对应关系详解(转)
    mybatis genertor两种使用方式(文件+项目)
    YII2 union 不同数据结构时 解决方案
    Yii2 分表后 使用 union all 分页实现代码
    Beyond Compare 4.2.10手动破解
    Xshell 6+Xftp 6官方下载免费版
    Navicat Premium
    yii2的Console定时任务创建
    内嵌多个youtube视频并展现该频道所有视频列表
    video.js 动态获取URL 并播放youtube视频
  • 原文地址:https://www.cnblogs.com/jiyuqi/p/4665533.html
Copyright © 2011-2022 走看看