zoukankan      html  css  js  c++  java
  • Storm wordcount Read from file

    source code:

    package stormdemo;
    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    public class WordCountTopology {
      public static class WordReader extends BaseRichSpout {
            private static final long serialVersionUID = 1L;
            private SpoutOutputCollector collector;
            private FileReader fileReader;
            private boolean completed = false;
            public void ack(Object msgId) {
            public void close() {}
            public void fail(Object msgId) {
            /**The only thing that the methods will do It is emit each  file line*/
            public void nextTuple() {
                 * The nextuple it is called forever, so if we have been readed the file
                 * we will wait and then return
                    try {
                    } catch (InterruptedException e) {
                        //Do nothing
                String str;
                //Open the reader
                BufferedReader reader = new BufferedReader(fileReader);
                    //Read all lines
                    while((str = reader.readLine()) != null){
                         * By each line emmit a new value with the line as a their
                        this.collector.emit(new Values(str),str);
                }catch(Exception e){
                    throw new RuntimeException("Error reading tuple",e);
                    completed = true;
             * We will create the file and get the collector object
            public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                try {
                    this.fileReader = new FileReader(conf.get("wordsFile").toString());
                } catch (FileNotFoundException e) {
                    throw new RuntimeException("Error reading file ["+conf.get("wordsFile")+"]");
                this.collector = collector;
             * Declare the output field "line"
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("line"));
      public static class WordNormalizer extends BaseBasicBolt {
      private static final long serialVersionUID = 3L;
            public void cleanup() {}
            public void execute(Tuple input, BasicOutputCollector collector) {
                String sentence = input.getString(0);
                String[] words = sentence.split(" ");
                for(String word : words){
                    word = word.trim();
                        word = word.toLowerCase();
                        collector.emit(new Values(word));
             * The bolt will only emit the field "word"
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
      public static class WordCount extends BaseBasicBolt {
        private static final long serialVersionUID = 2L;
        Map<String, Integer> counts = new HashMap<String, Integer>();
        BufferedWriter output = null;
        public void execute(Tuple tuple, BasicOutputCollector collector) {
          String word = tuple.getString(0);
          Integer count = counts.get(word);
          if (count == null)
            count = 0;
          counts.put(word, count);
          //collector.emit(new Values(word, count));
          try {
              output = new BufferedWriter(new FileWriter("/home/hadoop/wordcounts.txt",false )); 
              } catch (IOException e) {
                try {
                    } catch (IOException e1) {  e1.printStackTrace();  }
          for(Map.Entry<String, Integer> entry : counts.entrySet()){
              try {
                output.write(entry.getKey()+": "+entry.getValue());
            } catch (IOException e) {
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word", "count"));
      public static void  main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new WordReader());
        builder.setBolt("split", new WordNormalizer()).shuffleGrouping("spout");
        builder.setBolt("count", new WordCount()).globalGrouping("split");
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        //Topology run
         if (args != null && args.length > 1) {
            StormSubmitter.submitTopology(args[1], conf, builder.createTopology());
        else {
            conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordcount", conf, builder.createTopology());

    start zookeeper.(zkServer.sh start at namenode,datanode01,datanode02)

    start storm nimbus at namenode.

    start storm supervisor at datanode01 and datanode02;

    at namenode:

    cd /home/hadoop/workspace

    cd /stormsample

    mvn install

    storm jar storm-example-0.0.1-SNAPSHOT.jar stormdemo.WordCountTopology /home/hadoop/wordinput.txt wordcount

    first, you should prepare text file for the source, I put one txt file wordinput.txt in datanode01 /02 /home/hadoop/.

    after running job, I found wordcount.txt at datanode01 node.

    Looking for a job working at Home about MSBI
  • 相关阅读:
    [COCI2011-2012#5] POPLOCAVANJE 后缀自动机
    [SDOI2016]生成魔咒 后缀自动机
    [JSOI2009]密码 AC自动机
    CF17E Palisection manacher
    [JSOI2007]字符加密 后缀数组
    [POI2012]OKR-A Horrible Poem hash
    [APIO2014]回文串 manacher 后缀数组
    [SHOI2011]双倍回文 manacher
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/4289618.html
Copyright © 2011-2022 走看看