zoukankan      html  css  js  c++  java
  • Apache Storm技术实战之1 -- WordCountTopology

    欢迎转载,转载请注意出处,徽沪一郎。

    “源码走读系列”从代码层面分析了storm的具体实现,接下来通过具体的实例来说明storm的使用。因为目前storm已经正式迁移到Apache,文章系列也由twitter storm转为apache storm.

    WordCountTopology 使用storm来统计文件中的每个单词的出现次数。

    通过该例子来说明tuple发送时的几个要素

    1. source component   发送源
    2. destination component 接收者
    3. stream 消息通道
    4. tuple    消息本身

    本文涉及到的开发环境搭建可以参考前面的两篇博文。

    1. arch linux简明安装指南
    2. 在archlinux上搭建storm cluster

    awk实现

    其实对文件中的单词进行统计是Linux下一个很常见的任务,用awk就可以轻松的解决(如果文件不是太大的话),下面是进行word counting的awk脚本,将其保存为名为wordcount.awk文件。

    wordcount.awk

    {
      for (i = 1; i<=NF; i++)
        freq[$i]++
    }
    END{
      for (word in freq)
        printf "%s	%d
    ",word,freq[word]
    }

    运行该脚本,对文件中的单词进行统计

    gawk -f wordcount.awk filename

    原始版本

    从github上复制内容

    git clone https://github.com/nathanmarz/storm-starter.git

    编译运行

    lein deps
    lein compile
    java -cp $(lein classpath) WordCountTopology

    main函数

    main函数的主要内容

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new RandomSentenceSpout(), 5);
        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

    注意:grouping操作的时候,如果没有显示指定stream id,则使用的是default stream. 如shuffleGrouping("spout")表示从名为spout的component中接收从default stream发送过来的tuple.

    改进版本

    在原始版本中,spout不停的向split bolt随机发送句子,Count bolt统计每个单词出现的次数。

    那么能不能让Spout在读取完文件之后,通知下游的bolt显示最柊的统计结果呢?

    要想达到上述的改进目标,采用如上图所示的结构即可。改变的地方如下,

    1. 在Spout中添加一个SUCCESS_STREAM
    2. 添加只有一个运行实例的statistics bolt
    3. 当spout读取完文件内容之后,通过SUCCESS_STREAM告诉statistics bolt,文件已经处理完毕,可以打印当前的统计结果

    RandomSentenceSpout.java

    declareOutputFields

    添加SUCCESS_STREAM

    @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
        declarer.declareStream("SUCCESS_STREAM",new Fields("word"));
      }

    nextTuple

    使用SUCCESS_STREAM通知下游,文件处理完毕

    @Override
      public void nextTuple() {
        Utils.sleep(100);
        String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",
            "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
        if ( count == sentences.length ) 
        {
          System.out.println(count+" try to emit tuple by success_stream");
          _collector.emit("SUCCESS_STREAM",new Values(sentences[0]));
          count++;
        }else if ( count < sentences.length ){
          _collector.emit(new Values(sentences[count]));
          count++;
        }
      }

    WordCountTopology.java

    添加静态类WordCount2

    public static class WordCount2 extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap<String, Integer>();
    
        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
          if ( tuple.getSourceStreamId() == "SUCCESS_STREAM" ) {
        System.out.println("prepare to print the statistics");
        for (String key : counts.keySet()) {
          System.out.println(key+"	"+counts.get(key));
        }
        System.out.println("finish printing");
          }else {
    
        String word = tuple.getString(0);
        Integer count = counts.get(word);
        if (count == null)
          count = 0;
        count++;
        counts.put(word, count);
          }
        }

    main函数

    将spout的并行数由5改为1

     builder.setSpout("spout", new RandomSentenceSpout(), 1);

    在原有的Topology中添加WordCount2 Bolt

     builder.setBolt("count2", new WordCount2(), 1).globalGrouping("count").globalGrouping("spout","SUCCESS_STREAM");

    WordCount2 Bolt会接收从Count Bolt通过default stream发送的tuple,同时接收Spout通过SUCCESS_STREAM发送的tuple,也就是说wordcount2会接收从两个stream来的数据。

    编译

    编译修改后的源文件

    cd $STROM_STARTER
    lein compile storm.starter

    可能会出现以下异常信息,该异常可以忽略。

    Exception in thread "main" java.io.FileNotFoundException: Could not locate storm/starter/WordCountTopology__init.class or storm/starter/WordCountTopology.clj on classpath:

    运行

    在local模式下运行修改后的WordCountTopology

    java -cp $(lein classpath) storm.starter.WordCountTopology

    如果一切正常,日志如下所示,线程的名字可能会有所不同。

    moon    1
    score    1
    cow    1
    doctor    1
    over    1
    nature    1
    snow    1
    four    1
    keeps    1
    with    1
    a    1
    white    1
    dwarfs    1
    at    1
    the    4
    and    2
    i    1
    two    1
    away    1
    seven    2
    apple    1
    am    1
    an    1
    jumped    1
    day    1
    years    1
    ago    1

     结果验证

    可以将WordCountTopology的运行结果和awk脚本的运行结果相比对,结果应该是一致的。

    小技巧

    1. awk脚本的执行结果存为一个文件result1.log, WordCountTopology的输出中单词统计部分存为result2.log
    2. 用vim打开result1.log,进行sorting,保存结果;用vim打开result2.log,进行sorting,保存。
    3. 然后用vimdiff来进行比较 vimdiff result1.log result2.log
  • 相关阅读:
    设计模式- 结构型模式,装饰器模式(5)
    设计模式- 结构型模式,装饰器模式(5)
    jar/war/ear包的区别
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
  • 原文地址:https://www.cnblogs.com/hseagle/p/3505938.html
Copyright © 2011-2022 走看看