zoukankan      html  css  js  c++  java
  • Storm入门2-单词计数案例学习

        【本篇文章主要是通过一个单词计数的案例学习,来加深对storm的基本概念的理解以及基本的开发流程和如何提交并运行一个拓扑】

     

      单词计数拓扑WordCountTopology实现的基本功能就是不停地读入一个个句子,最后输出每个单词和数目并在终端不断的更新结果,拓扑的数据流如下:

      

    • 语句输入Spout:  从数据源不停地读入数据,并生成一个个句子,输出的tuple格式:{"sentence":"hello world"}
    • 语句分割Bolt: 将一个句子分割成一个个单词,输出的tuple格式:{"word":"hello"}  {"word":"world"}
    • 单词计数Bolt: 保存每个单词出现的次数,每接到上游一个tuple后,将对应的单词加1,并将该单词和次数发送到下游去,输出的tuple格式:{"hello":"1"}  {"world":"3"}
    • 结果上报Bolt: 维护一份所有单词计数表,每接到上游一个tuple后,更新表中的计数数据,并在终端将结果打印出来。

      开发步骤:

        1.环境

    • 操作系统:mac os 10.10.3
    • JDK: jdk1.8.0_40
    • IDE: intellij idea 15.0.3
    • Maven: apache-maven-3.0.3

      2.项目搭建

    • 在idea新建一个maven项目工程:storm-learning
    • 修改pom.xml文件,加入strom核心的依赖,配置slf4j依赖,方便Log输出
    <dependencies>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.6.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>1.0.2</version>
            </dependency>
    </dependencies>

     3. Spout和Bolt组件的开发

    • SentenceSpout
    • SplitSentenceBolt
    • WordCountBolt
    • ReportBolt

    SentenceSpout.java

     1 public class SentenceSpout extends BaseRichSpout{
     2 
     3     private SpoutOutputCollector spoutOutputCollector;
     4 
     5     //为了简单,定义一个静态数据模拟不断的数据流产生
     6     private static final String[] sentences={
     7             "The logic for a realtime application is packaged into a Storm topology",
     8             "A Storm topology is analogous to a MapReduce job",
     9             "One key difference is that a MapReduce job eventually finishes whereas a topology runs forever",
    10             " A topology is a graph of spouts and bolts that are connected with stream groupings"
    11     };
    12 
    13     private int index=0;
    14 
    15     //初始化操作
    16     public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
    17         this.spoutOutputCollector = spoutOutputCollector;
    18     }
    19 
    20     //核心逻辑
    21     public void nextTuple() {
    22         spoutOutputCollector.emit(new Values(sentences[index]));
    23         ++index;
    24         if(index>=sentences.length){
    25             index=0;
    26         }
    27     }
    28 
    29     //向下游输出
    30     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    31         outputFieldsDeclarer.declare(new Fields("sentences"));
    32     }
    33 }
    View Code

    SplitSentenceBolt.java

     1 public class SplitSentenceBolt extends BaseRichBolt{
     2 
     3     private OutputCollector outputCollector;
     4 
     5     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
     6         this.outputCollector = outputCollector;
     7     }
     8 
     9     public void execute(Tuple tuple) {
    10         String sentence = tuple.getStringByField("sentences");
    11         String[] words = sentence.split(" ");
    12         for(String word : words){
    13             outputCollector.emit(new Values(word));
    14         }
    15     }
    16 
    17     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    18         outputFieldsDeclarer.declare(new Fields("word"));
    19     }
    20 }
    View Code

    WordCountBolt.java

     1 public class WordCountBolt extends BaseRichBolt{
     2 
     3     //保存单词计数
     4     private Map<String,Long> wordCount = null;
     5 
     6     private OutputCollector outputCollector;
     7 
     8     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
     9         this.outputCollector = outputCollector;
    10         wordCount = new HashMap<String, Long>();
    11     }
    12 
    13     public void execute(Tuple tuple) {
    14         String word = tuple.getStringByField("word");
    15         Long count = wordCount.get(word);
    16         if(count == null){
    17             count = 0L;
    18         }
    19         ++count;
    20         wordCount.put(word,count);
    21         outputCollector.emit(new Values(word,count));
    22     }
    23 
    24 
    25     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    26         outputFieldsDeclarer.declare(new Fields("word","count"));
    27     }
    28 }
    View Code

    ReportBolt.java

     1 public class ReportBolt extends BaseRichBolt {
     2     
     3     private static final Logger log = LoggerFactory.getLogger(ReportBolt.class);
     4 
     5     private Map<String, Long> counts = null;
     6 
     7     public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
     8         counts = new HashMap<String, Long>();
     9     }
    10 
    11     public void execute(Tuple tuple) {
    12         String word = tuple.getStringByField("word");
    13         Long count = tuple.getLongByField("count");
    14         counts.put(word, count);
    15         //打印更新后的结果
    16         printReport();
    17     }
    18 
    19     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    20         //无下游输出,不需要代码
    21     }
    22 
    23     //主要用于将结果打印出来,便于观察
    24     private void printReport(){
    25         log.info("--------------------------begin-------------------");
    26         Set<String> words = counts.keySet();
    27         for(String word : words){
    28             log.info("@report-bolt@: " + word + " ---> " + counts.get(word));
    29         }
    30         log.info("--------------------------end---------------------");
    31     }
    32 }
    View Code

     4.拓扑配置

    • WordCountTopology
     1 public class WordCountTopology {
     2 
     3     private static final Logger log = LoggerFactory.getLogger(WordCountTopology.class);
     4 
     5     //各个组件名字的唯一标识
     6     private final static String SENTENCE_SPOUT_ID = "sentence-spout";
     7     private final static String SPLIT_SENTENCE_BOLT_ID = "split-bolt";
     8     private final static String WORD_COUNT_BOLT_ID = "count-bolt";
     9     private final static String REPORT_BOLT_ID = "report-bolt";
    10 
    11     //拓扑名称
    12     private final static String TOPOLOGY_NAME = "word-count-topology";
    13 
    14     public static void main(String[] args) {
    15 
    16         log.info(".........begining.......");
    17         //各个组件的实例
    18         SentenceSpout sentenceSpout = new SentenceSpout();
    19         SplitSentenceBolt splitSentenceBolt = new SplitSentenceBolt();
    20         WordCountBolt wordCountBolt = new WordCountBolt();
    21         ReportBolt reportBolt = new ReportBolt();
    22 
    23         //构建一个拓扑Builder
    24         TopologyBuilder topologyBuilder = new TopologyBuilder();
    25 
    26         //配置第一个组件sentenceSpout
    27         topologyBuilder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout, 2);
    28 
    29         //配置第二个组件splitSentenceBolt,上游为sentenceSpout,tuple分组方式为随机分组shuffleGrouping
    30         topologyBuilder.setBolt(SPLIT_SENTENCE_BOLT_ID, splitSentenceBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
    31 
    32         //配置第三个组件wordCountBolt,上游为splitSentenceBolt,tuple分组方式为fieldsGrouping,同一个单词将进入同一个task中(bolt实例)
    33         topologyBuilder.setBolt(WORD_COUNT_BOLT_ID, wordCountBolt).fieldsGrouping(SPLIT_SENTENCE_BOLT_ID, new Fields("word"));
    34 
    35         //配置最后一个组件reportBolt,上游为wordCountBolt,tuple分组方式为globalGrouping,即所有的tuple都进入这一个task中
    36         topologyBuilder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(WORD_COUNT_BOLT_ID);
    37 
    38         Config config = new Config();
    39 
    40         //建立本地集群,利用LocalCluster,storm在程序启动时会在本地自动建立一个集群,不需要用户自己再搭建,方便本地开发和debug
    41         LocalCluster cluster = new LocalCluster();
    42 
    43         //创建拓扑实例,并提交到本地集群进行运行
    44         cluster.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
    45     }
    46 }
    View Code

      5.拓扑执行

    • 方法一:通过IDEA执行

      在idea中对代码进行编译compile,然后run;

      观察控制台输出会发现,storm首先在本地自动建立了运行环境,即启动了zookepeer,接着启动nimbus,supervisor;然后nimbus将提交的topology进行分发到supervisor,supervisor启动woker进程,woker进程里利用Executor来运行topology的组件(spout和bolt);最后在控制台发现不断的输出单词计数的结果。

         zookepeer的连接建立

       nimbus启动

       supervisor启动

       worker启动

         Executor启动执行

         结果输出

    • 方法二:通过maven来执行
      • 进入到该项目的主目录下:storm-learning
      • mvn compile 进行代码编译,保证代码编译通过
      • 通过mvn执行程序:
        mvn exec:java -Dexec.mainClass="wordCount.WordCountTopology"
      • 控制台输出的结果跟方法一一致
  • 相关阅读:
    使用过的一些前端工具
    文档和元素中与几何形状和滚动相关的属性和方法
    文档元素选取和遍历中的一些容易忘记的概念
    只存在于理想中的客户端JavaScript时间线
    JavaScript函数代码和执行上下文--ECMA-262-5
    JavaScript中的闭包
    Git Note
    HDOJ 4463 Outlets 最短路
    HDOJ 4548 美素数
    HDOJ 2544 最短路
  • 原文地址:https://www.cnblogs.com/jonyo/p/5861171.html
Copyright © 2011-2022 走看看