zoukankan      html  css  js  c++  java
  • Storm WordCount Topology学习

    1,分布式单词计数的流程

    首先要有数据源,在SentenceSpout中定义了一个字符串数组sentences来模拟数据源。字符串数组中的每句话作为一个tuple发射。其实,SplitBolt接收SentenceSpout发射的tuple,它将每句话分割成每个单词,并将每个单词作为tuple发射。再次,WordCountBolt接收SplitBolt发送的tuple,它将接收到的每一个单词统计计数,并将 <单词:出现次数> 作为tuple发射。最后,ReportBolt接收WordCountBolt发送的tuple,将统计的结果存入HashMap中,并打印出结果。

    流程图如下:

    2,Topology的组成类

     ISpout、IComponent、IBolt三个接口定义了一些最基本的方法,BaseRichSpout、BaseRichBolt是接口的实现类,自定义的Spout与Bolt通过继承实现类来完成工作。

     详细解释参考代码里面的注释。参考《Storm分布式实时计算模式》第一章中的例子。

    1, SentenceSpou.java分析:

    private String[] sentences = { "my dog has fleas", "i like cold beverages",
                 "the dog ate my homework", "don't have a cow man",
                 "i don't think i like fleas" };

    定义了待发射的数据源。Spout从该字符串数组一次取一个字符串生成tuple进行发射。

    32     public void open(@SuppressWarnings("rawtypes") Map conf,
    33             TopologyContext context, SpoutOutputCollector collector) {
    34         // TODO Auto-generated method stub
    35         this.collector = collector;
    36     }

    open函数,在ISpout接口中定义,所有的Spout组件在初始化时调用这个方法。在open()中初始化了发射器。

    55     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    56         // TODO Auto-generated method stub
    57         declarer.declare(new Fields("sentence"));// 标记SentenceSpout发送的tuple的键为
    58                                                     // sentence
    59     }

    declareOutputFields函数标记了该Spout发射的tuple的(字段值)键值。下游的Bolt可以通过该键值来接收它发出的tuple

    41     public void nextTuple() {
    42         // TODO Auto-generated method stub
    43         // 以字符串数组sentences 中的每个字符串 作为参数 构造tuple
    44         this.collector.emit(new Values(sentences[index]));// 通过emit方法将构造好的tuple发送出去
    45         index++;
    46         if (index >= sentences.length) {
    47             index = 0;
    48         }
    49         Utils.sleep(100);
    50     }

    nextTuple()是所有Spout的核心方法。Storm通过调用这个方法向collector发射tuple。Values.java 继承了ArrayList,new Values(...)构造了一个List对象,并将之作为emit的参数通过collector发射出去。

    这里的发射规则是:每次发射其中一个字符串,阻塞100ms。当发射完整个字符串数组时,将索引(index)重新置0。可以继续发射。除非显示终止Topology,否则它不会停止。

     SentenceSpou.java代码如下:

     1 package org.apache.storm.storm_core;
     2 
     3 import java.util.Map;
     4 
     5 import backtype.storm.spout.SpoutOutputCollector;
     6 import backtype.storm.task.TopologyContext;
     7 import backtype.storm.topology.OutputFieldsDeclarer;
     8 import backtype.storm.topology.base.BaseRichSpout;
     9 import backtype.storm.tuple.Fields;
    10 import backtype.storm.tuple.Values;
    11 import backtype.storm.utils.Utils;
    12 
    13 public class SentenceSpout extends BaseRichSpout {
    14     /**
    15      * 
    16      */
    17     private static final long serialVersionUID = 3444934973982660864L;
    18     private SpoutOutputCollector collector;// 用来向其他Spout发射tuple
    19     private String[] sentences = { "my dog has fleas", "i like cold beverages",
    20             "the dog ate my homework", "don't have a cow man",
    21             "i don't think i like fleas" };
    22 
    23     private int index = 0;
    24 
    25     /*
    26      * open() 方法在所有的Spout组件初始化时被调用
    27      * 
    28      * @param Map conf storm 配置信息
    29      * 
    30      * @context TopologyContext topology 组件信息
    31      */
    32     public void open(@SuppressWarnings("rawtypes") Map conf,
    33             TopologyContext context, SpoutOutputCollector collector) {
    34         // TODO Auto-generated method stub
    35         this.collector = collector;
    36     }
    37 
    38     /*
    39      * Values.java extends ArrayList Storm 调用该方法向输出的collector发射tuple
    40      */
    41     public void nextTuple() {
    42         // TODO Auto-generated method stub
    43         // 以字符串数组sentences 中的每个字符串 作为参数 构造tuple
    44         this.collector.emit(new Values(sentences[index]));// 通过emit方法将构造好的tuple发送出去
    45         index++;
    46         if (index >= sentences.length) {
    47             index = 0;
    48         }
    49         Utils.sleep(100);
    50     }
    51 
    52     /*
    53      * SentenceSpout 发送的tuple它是一个包含键值对的List,该方法声明了List中包含的键值对的键为 sentence
    54      */
    55     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    56         // TODO Auto-generated method stub
    57         declarer.declare(new Fields("sentence"));// 标记SentenceSpout发送的tuple的键为
    58                                                     // sentence
    59     }
    60 }

    SplitBolt.java代码如下:

     1 package org.apache.storm.storm_core;
     2 
     3 import java.util.Map;
     4 
     5 import backtype.storm.task.OutputCollector;
     6 import backtype.storm.task.TopologyContext;
     7 import backtype.storm.topology.OutputFieldsDeclarer;
     8 import backtype.storm.topology.base.BaseRichBolt;
     9 import backtype.storm.tuple.Fields;
    10 import backtype.storm.tuple.Tuple;
    11 import backtype.storm.tuple.Values;
    12 
    13 public class SplitSentenceBolt extends BaseRichBolt {
    14     /**
    15      * 
    16      */
    17     private static final long serialVersionUID = -2107029392155190729L;
    18     private OutputCollector collector;// 用来向其他Spout发射tuple的发射器
    19 
    20     /*
    21      * (non-Javadoc) prepare方法类似于open方法,prepare在bolt初始化时被调用
    22      */
    23     public void prepare(Map stormConf, TopologyContext context,
    24             OutputCollector collector) {
    25         // TODO Auto-generated method stub
    26         this.collector = collector;// 发射器初始化
    27 
    28     }
    29 
    30     public void execute(Tuple input) {
    31         // TODO Auto-generated method stub
    32         // 接收从SentenceSpout的发射器发射过来的tuple,因为SentenceSpout中声明的tuple字段为sentence,故getStringByField方法的参数为sentence
    33         String sentence = input.getStringByField("sentence");// 该tuple是一个包含
    34                                                                 // 键为sentence
    35                                                                 // 值为字符串
    36                                                                 // 的列表List<Map<sentence,String>>
    37         String[] words = sentence.split(" ");// 将字符串分解成一个个的单词
    38         for (String word : words)
    39             this.collector.emit(new Values(word));// 将每个单词构造成tuple并发送给下一个Spout
    40     }
    41 
    42     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    43         // TODO Auto-generated method stub
    44         declarer.declare(new Fields("word"));// 定义SplitSentenceBolt发送的tuple的字段("键值")为 word
    45     }
    46 }

    WordCountBolt.java

     1 package org.apache.storm.storm_core;
     2 
     3 import java.util.HashMap;
     4 import java.util.Map;
     5 
     6 import backtype.storm.task.OutputCollector;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.OutputFieldsDeclarer;
     9 import backtype.storm.topology.base.BaseRichBolt;
    10 import backtype.storm.tuple.Fields;
    11 import backtype.storm.tuple.Tuple;
    12 import backtype.storm.tuple.Values;
    13 
    14 public class WordCountBolt extends BaseRichBolt{
    15 
    16     private OutputCollector collector;
    17     private HashMap<String, Long>counts = null;//统计每个单词出现的次数,放到HashMap中保存起来
    18     
    19     public void prepare(Map stormConf, TopologyContext context,
    20             OutputCollector collector) {
    21         // TODO Auto-generated method stub
    22         this.collector = collector;
    23         this.counts = new HashMap<String, Long>();//初始化HashMap,因为prepare会被自动调用的
    24     }
    25 
    26     public void execute(Tuple input) {
    27         // TODO Auto-generated method stub
    28         String word = input.getStringByField("word");
    29         Long count = this.counts.get(word);
    30         if(count == null)//HashMap中没有word这个单词
    31             count = 0L;
    32         count++;
    33         this.counts.put(word, count);//更新该单词在HashMap中的统计次数
    34         //此处发射的tuple包含了两个元素:单词和计数,它每次发送的是一个长度为2的List,
    35         //可理解为:List.add(new HashMap("word",word)); List.add(new HashMap(("count",count));
    36         this.collector.emit(new Values(word, count));//第一个元素的键为 "word",值为该单词(a string),第二个键为 "count",值为单词的计数
    37     }
    38 
    39     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    40         // TODO Auto-generated method stub
    41         declarer.declare(new Fields("word", "count"));
    42     }
    43 }

     ReportBolt.java如下:

    package org.apache.storm.storm_core;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
    
    public class ReportBolt extends BaseRichBolt{
    /**
         * 
         */
        private static final long serialVersionUID = 4921144902730095910L;
        //    private OutputCollector collector; ReportBolt不需要发射tuple了
        private HashMap<String, Long> counts = null;
        
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            // TODO Auto-generated method stub
            this.counts = new HashMap<String, Long>();
        }
    
        public void execute(Tuple input) {
            // TODO Auto-generated method stub
            String word = input.getStringByField("word");
            Long count = input.getLongByField("count");
            this.counts.put(word, count);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO Auto-generated method stub
            //不需要发出任何数据流
        }
        
        //Topology在storm集群中运行时,cleanup方法是不可靠的,并不能保证它一定会执行
        public void cleanup(){
            System.out.println("------ print counts ------");
            List<String> keys = new ArrayList<String>();
            keys.addAll(counts.keySet());//将HashMap中所有的键都添加到一个集合中
            Collections.sort(keys);//对键(单词)进行排序
            for(String key : keys)//输出排好序的每个单词的出现次数
                System.out.println(key + " : " + this.counts.get(key));
            System.out.println("--------bye----------");
        }
    }

    WordCountTopology.java如下:

     1 package org.apache.storm.storm_core;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.topology.TopologyBuilder;
     6 import backtype.storm.tuple.Fields;
     7 import backtype.storm.utils.Utils;
     8 
     9 public class WordCountTopology {
    10     private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    11     private static final String SPLIT_BOLT_ID = "split-bolt";
    12     private static final String COUNT_BOLT_ID = "count-bolt";
    13     private static final String REPORT_BOLT_ID = "report-bolt";
    14     private static final String TOPOLOGY_NAME = "word-count-topology";
    15     
    16     public static void main(String[] args) throws Exception{
    17         SentenceSpout spout = new SentenceSpout();
    18         SplitSentenceBolt splitBolt = new SplitSentenceBolt();
    19         WordCountBolt countBolt = new WordCountBolt();
    20         ReportBolt reportBolt = new ReportBolt();
    21         
    22         TopologyBuilder builder = new TopologyBuilder();
    23         builder.setSpout(SENTENCE_SPOUT_ID, spout);
    24         builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
    25         builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
    26         builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
    27         
    28         Config config = new Config();
    29         LocalCluster cluster = new LocalCluster();
    30         
    31         cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
    32         Utils.sleep(1000);
    33         cluster.killTopology(TOPOLOGY_NAME);
    34         cluster.shutdown();
    35         
    36     }
    37 }
  • 相关阅读:
    WebService出错 Maximum message size quota for incoming messages (65536) has been exceeded.已超过传入消息(65536)的最大消息大小配额
    php 获取系统时间
    JavaSctipt 控制网页 前进,后退
    放A片的文件夹的名字
    玩玩独轮车
    3月18日周六骑行三水大旗头村——广东名镇之一
    叫春的猫
    抓紧锻炼身体噢!
    使用Zend Framework中的 Zend_Pdf来创建pdf文档
    虚拟主机示例
  • 原文地址:https://www.cnblogs.com/hapjin/p/4588413.html
Copyright © 2011-2022 走看看