zoukankan      html  css  js  c++  java
  • Storm和Hadoop 区别

    Storm - 大数据Big Data实时处理架构

     

    什么是Storm?

    Storm是:
    • 快速且可扩展伸缩
    • 容错
    • 确保消息能够被处理
    • 易于设置和操作
    • 开源的分布式实时计算系统
    - 最初由Nathan Marz开发
    - 使用Java 和 Clojure 编写

    Storm和Hadoop主要区别是实时和批处理的区别:

    storm hadoop

    Storm概念 组成:Spout 和Bolt组成Topology。

    storm

    Tuple是Storm的数据模型,如['jdon',12346]

    多个Tuple组成事件流:

    tuple

    Spout是读取需要分析处理的数据源,然后转为Tuples,这些数据源可以是Web日志、 API调用、数据库等等。Spout相当于事件流的生产者。

    Bolt 处理Tuples然后再创建新的Tuples流,Bolt相当于事件流的消费者。

    Bolt 作为真正业务处理者,主要实现大数据处理的核心功能,比如转换数据,应用相应过滤器,计算和聚合数据(比如统计总和等等) 。

    以Twitter的某个Tweet为案例,看看Storm如何处理:

    top

    这些tweett贴内容是:“No Small Cell Lung #Cancer(没有小细胞肺癌#癌症)” "An #OnCology Consult...."

    这些贴被Spout读取以后,产生Tuple,字段名是tweet,内容是"No Small Cell Lung #Cancer",格式类似:['No Small Cell Lung #Cancer',133221]。

    然后进入被流 消费者Bolt进行处理,第一个Bolt是SplitSentence,将tuple内容进行分离,结果成为:一个个单词:"No" "Small" "Cell" "Lung" "#Cancer" ;然后经过第二个Bolt进行过滤HashTagFilter处理,Hash标签是单词中用#标注的,也就是Cancer;再经过HasTagCount计数,可以本地内存缓存这个计数结果,最后通过PrinterBolt打印出标签单词统计结果 。

    我们使用Stom所要做的就是编制Spout和Bolt代码:

    public class RandomSentenceSpout extends BaseRichSpout {
      SpoutOutputCollector collector;
      Random random;

      //读入外部数据
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        random = new Random();
      }
      //产生Tuple
       public void nextTuple() {
        String[] sentences = new String[] {
          "No Small Cell Lung #Cancer",
          "An #OnCology Consultant apple a day keeps the doctor away",
          "four score and seven years ago",
          "snow white and the seven dwarfs",
          "i am at two with nature"
        };
        String tweet = sentences[random.nextInt(sentences.length)];
        //定义字段名"tweet" 的值 
        collector.emit(new Values(tweet));

      }

      // 定义字段名"tweet"

      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("tweet"));
      }
      @Override
      public void ack(Object msgId) {}
      @Override
      public void fail(Object msgId) {}
    }

    下面是Bolt的代码编写:

    public class SplitSentenceBolt extends BaseRichBolt {
      OutputCollector collector;

      @Override
      public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
      }
      @Override 消费者激活主要方法:分离成单个单词
      public void execute(Tuple input) {
        for (String s : input.getString(0).split("\s")) {
          collector.emit(new Values(s));
        }
      }
      @Override 定义新的字段名
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
      }

    最后是装配运行Spout和Bolt的客户端调用代码:

    public class WordCountTopology {
      public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("tweet", new RandomSentenceSpout(), 2);
        builder.setBolt("split", new SplitSentenceBolt(), 4)
          .shuffleGrouping("tweet")
          .setNumTasks(8);
        builder.setBolt("count", new WordCountBolt(), 6)
          .fieldsGrouping("split", new Fields("word"));
        ..设置多个Bolt


        Config config = new Config();
        config.setNumWorkers(4);
        
        StormSubmitter.submitTopology("wordcount", config, builder.createTopology());

    // Local testing
    //LocalCluster cluster = new LocalCluster();
    // cluster.submitTopology("wordcount", config, builder.createTopology());
    //Thread.sleep(10000);
    //cluster.shutdown();
    }
    }

    在这个代码中定义了一些参数比如Works的数目是4,其含义在后面详细分析。

    下面我们要将上面这段代码发布部署到Storm中,首先了解Storm物理架构图:

    Nimbus是一个主后台处理器,主要负责:
    1.发布分发代码
    2.分配任务
    3.监控失败。

    Supervisor是负责当前这个节点的后台工作处理器的监听。

    Work类似Java的线程,采取JDK的Executor 。

    下面开始将我们的代码部署到这个网络拓扑中:

    startstorm

    将代码Jar包上传到Nimbus的inbox,包括所有的依赖包,然后提交。

    Nimbus将保存在本地文件系统,然后开始配置网络拓扑,分配开始拓扑。

    见下图:

    storm

    Nimbus服务器将拓扑Jar 配置和结构下载到 Supervisor,负载平衡ZooKeeper分配某个特定的Supervisor服务器,而Supervisor开始基于配置分配Work,Work调用JDK的Executor启动线程,开始任务处理。

    下面是我们代码对拓扑分配的参数示意图:

    storm bolt

    Executor启动的线程数目是12个,组件的实例是16个,那么如何在实际服务器中分配呢?如下图:

    wc

    图中RsSpout代表我们的代码中RandomSentenceSpout;SplitSentenceBolt简写为SSbolt;

    http://www.jdon.com/bigdata/storm.html

  • 相关阅读:
    DP 免费馅饼 HDU1176
    知了课堂 Python Flask零基础 笔记整理
    Splay入门
    字典树
    榨取kkksc03 多维dp
    种族并查集总结
    倍增总结
    求最大公因数(辗转相除法&更相减损术)
    Bzoj 3036: 绿豆蛙的归宿(期望)
    Bzoj 1497: [NOI2006]最大获利(最小割)
  • 原文地址:https://www.cnblogs.com/tianciliangen/p/8308668.html
Copyright © 2011-2022 走看看