zoukankan      html  css  js  c++  java
  • 图解Storm

    问题导读:
    1.你认为什么图形可以显示hadoop与storm的区别?(电梯)
    2.本文是如何形象讲解hadoop与storm的?(离线批量处理、实时流式处理)
    3.hadoop map/reduce对应storm那两个概念?(spout/bolt)
    4.storm流由谁来组成?(Tuples)
    5.tuple具体是什么形式?




    什么是Storm?
    Storm是:

    • 快速且可扩展伸缩
    • 容错
    • 确保消息能够被处理
    • 易于设置和操作
    • 开源的分布式实时计算系统
    • 最初由Nathan Marz开发
    • 使用Java 和 Clojure 编写

    区别:
    我们知道hadoop是批处理,storm是流式处理,那么是什么是批处理,什么流式处理?
    Storm和Hadoop主要区别是实时和批处理的区别:
     
    Storm概念组成:Spout和Bolt组成Topology。
     

    Tuple是Storm的数据模型,如['jdon',12346]
    多个Tuple组成事件流:


    Spout是读取需要分析处理的数据源,然后转为Tuples,这些数据源可以是Web日志、 API调用、数据库等等。Spout相当于事件流的生产者
    Bolt 处理Tuples然后再创建新的Tuples流,Bolt相当于事件流的消费者

    Bolt 作为真正业务处理者,主要实现大数据处理的核心功能,比如转换数据,应用相应过滤器,计算和聚合数据(比如统计总和等等) 。
    以Twitter的某个Tweet为案例,看看Storm如何处理:

    这些tweett贴内容是:“No Small Cell Lung #Cancer(没有小细胞肺癌#癌症)” "An #OnCology Consult...."
    这些贴被Spout读取以后,产生Tuple,字段名是tweet,内容是"No Small Cell Lung #Cancer",格式类似:['No Small Cell Lung #Cancer',133221]。
    然后进入被流 消费者Bolt进行处理,第一个Bolt是SplitSentence,将tuple内容进行分离,结果成为:一个个单词:"No" "Small" "Cell" "Lung" "#Cancer" ;然后经过第二个Bolt进行过滤HashTagFilter处理,Hash标签是单词中用#标注的,也就是Cancer;再经过HasTagCount计数,可以本地内存缓存这个计数结果,最后通过PrinterBolt打印出标签单词统计结果 。

    我们使用Stom所要做的就是编制Spout和Bolt代码:

     1 public class RandomSentenceSpout extends BaseRichSpout {
     2   SpoutOutputCollector collector;
     3   Random random;
     4   //读入外部数据
     5   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
     6     this.collector = collector;
     7     random = new Random();
     8   }
     9   //产生Tuple
    10    public void nextTuple() {
    11     String[] sentences = new String[] {
    12       "No Small Cell Lung #Cancer",
    13       "An #OnCology Consultant apple a day keeps the doctor away",
    14       "four score and seven years ago",
    15       "snow white and the seven dwarfs",
    16       "i am at two with nature"
    17     };
    18     String tweet = sentences[random.nextInt(sentences.length)];
    19     //定义字段名"tweet" 的值 
    20     collector.emit(new Values(tweet));
    21   }
    22   // 定义字段名"tweet"
    23   public void declareOutputFields(OutputFieldsDeclarer declarer) {
    24     declarer.declare(new Fields("tweet"));
    25   }
    26   @Override
    27   public void ack(Object msgId) {}
    28   @Override
    29   public void fail(Object msgId) {}
    30 }

    下面是Bolt的代码编写:

     1 public class SplitSentenceBolt extends BaseRichBolt {
     2   OutputCollector collector;
     3   @Override
     4   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     5     this.collector = collector;
     6   }
     7   @Override 消费者激活主要方法:分离成单个单词
     8   public void execute(Tuple input) {
     9     for (String s : input.getString(0).split("\s")) {
    10       collector.emit(new Values(s));
    11     }
    12   }
    13   @Override 定义新的字段名
    14   public void declareOutputFields(OutputFieldsDeclarer declarer) {
    15     declarer.declare(new Fields("word"));
    16   }

    最后是装配运行Spout和Bolt的客户端调用代码:

     1 public class WordCountTopology {
     2   public static void main(String[] args) throws Exception {
     3     TopologyBuilder builder = new TopologyBuilder();
     4     builder.setSpout("tweet", new RandomSentenceSpout(), 2);
     5     builder.setBolt("split", new SplitSentenceBolt(), 4)
     6       .shuffleGrouping("tweet")
     7       .setNumTasks(8);
     8     builder.setBolt("count", new WordCountBolt(), 6)
     9       .fieldsGrouping("split", new Fields("word"));
    10     ..设置多个Bolt
    11     Config config = new Config();
    12     config.setNumWorkers(4);
    13     
    14     StormSubmitter.submitTopology("wordcount", config, builder.createTopology());
    15 //Local testing
    16 //LocalCluster cluster = new LocalCluster();
    17 //cluster.submitTopology("wordcount", config, builder.createTopology());
    18 //Thread.sleep(10000);
    19 //cluster.shutdown();
    20 }
    21 }


    在这个代码中定义了一些参数比如Works的数目是4,其含义在后面详细分析。

    下面我们要将上面这段代码发布部署到Storm中,首先了解Storm物理架构图

    Nimbus是一个主后台处理器,主要负责:
      1.发布分发代码
      2.分配任务
      3.监控失败。
    Supervisor是负责当前这个节点的后台工作处理器的监听。
    Work类似Java的线程,采取JDK的Executor 。

    下面开始将我们的代码部署到这个网络拓扑中:
    将代码Jar包上传到Nimbus的inbox,包括所有的依赖包,然后提交。
    Nimbus将保存在本地文件系统,然后开始配置网络拓扑,分配开始拓扑。
    见下图:
    Nimbus服务器将拓扑Jar 配置和结构下载到 Supervisor,负载平衡ZooKeeper分配某个特定的Supervisor服务器,而Supervisor开始基于配置分配Work,Work调用JDK的Executor启动线程,开始任务处理。
    下面是我们代码对拓扑分配的参数示意图:
     

    Executor启动的线程数目是12个,组件的实例是16个,那么如何在实际服务器中分配呢?如下图:
    图中RsSpout代表我们的代码中RandomSentenceSpout;SplitSentenceBolt简写为SSbolt。

  • 相关阅读:
    To select the file to upload we can use the standard HTML input control of type
    Cascading Menu Script using Javascript Explained
    网站首页head区代码规范
    轻松掌握 Java 泛型
    JDK 5.0 中的泛型类型学习
    如何在firefox下获取下列框选中option的text
    是同步方法还是 synchronized 代码? 详解多线程同步规则
    javascript select option对象总结
    Select的动态取值(Text,value),添加,删除。兼容IE,FireFox
    javascript在ie和firefox下的一些差异
  • 原文地址:https://www.cnblogs.com/xymqx/p/4498669.html
Copyright © 2011-2022 走看看