zoukankan      html  css  js  c++  java
  • Storm wordcount Read from file

    source code:

    package stormdemo;
    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.FileNotFoundException;
    import java.io.FileReader;
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.StormSubmitter;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.BasicOutputCollector;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseBasicBolt;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class WordCountTopology {
      public static class WordReader extends BaseRichSpout {
            private static final long serialVersionUID = 1L;
            private SpoutOutputCollector collector;
            private FileReader fileReader;
            private boolean completed = false;
            public void ack(Object msgId) {
                System.out.println("OK:"+msgId);
            }
            public void close() {}
            public void fail(Object msgId) {
                System.out.println("FAIL:"+msgId);
            }
            /**The only thing that the methods will do It is emit each  file line*/
            public void nextTuple() {
                /**
                 * The nextuple it is called forever, so if we have been readed the file
                 * we will wait and then return
                 */
                if(completed){
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        //Do nothing
                    }
                    return;
                }
                String str;
                //Open the reader
                BufferedReader reader = new BufferedReader(fileReader);
                try{
                    //Read all lines
                    while((str = reader.readLine()) != null){
                        /**
                         * By each line emmit a new value with the line as a their
                         */
                        this.collector.emit(new Values(str),str);
                    }
                }catch(Exception e){
                    throw new RuntimeException("Error reading tuple",e);
                }finally{
                    completed = true;
                }
            }
    
            /**
             * We will create the file and get the collector object
             */
            public void open(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                try {
                    this.fileReader = new FileReader(conf.get("wordsFile").toString());
                } catch (FileNotFoundException e) {
                    throw new RuntimeException("Error reading file ["+conf.get("wordsFile")+"]");
                }
                this.collector = collector;
            }
    
            /**
             * Declare the output field "line"
             */
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("line"));
            }
        }
    
      public static class WordNormalizer extends BaseBasicBolt {
    
      private static final long serialVersionUID = 3L;
    
            public void cleanup() {}
            public void execute(Tuple input, BasicOutputCollector collector) {
                String sentence = input.getString(0);
                String[] words = sentence.split(" ");
                for(String word : words){
                    word = word.trim();
                    if(!word.isEmpty()){
                        word = word.toLowerCase();
                        collector.emit(new Values(word));
                    }
                }
            }
    
            /**
             * The bolt will only emit the field "word"
             */
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
        }
      public static class WordCount extends BaseBasicBolt {
        private static final long serialVersionUID = 2L;
        Map<String, Integer> counts = new HashMap<String, Integer>();
        BufferedWriter output = null;
        public void execute(Tuple tuple, BasicOutputCollector collector) {
          String word = tuple.getString(0);
          Integer count = counts.get(word);
          if (count == null)
            count = 0;
          count++;
          counts.put(word, count);
          //collector.emit(new Values(word, count));
          try {
              output = new BufferedWriter(new FileWriter("/home/hadoop/wordcounts.txt",false )); 
              } catch (IOException e) {
                  e.printStackTrace();
                try {
                       output.close();
                    } catch (IOException e1) {  e1.printStackTrace();  }
           }
          for(Map.Entry<String, Integer> entry : counts.entrySet()){
              try {
                output.write(entry.getKey()+": "+entry.getValue());
                output.newLine();
                output.flush();
            } catch (IOException e) {
                e.printStackTrace();
            }
      }   
        }
         public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word", "count"));
        }
      }
    
      public static void  main(String[] args) throws Exception {
    
        TopologyBuilder builder = new TopologyBuilder();
    
        builder.setSpout("spout", new WordReader());
        builder.setBolt("split", new WordNormalizer()).shuffleGrouping("spout");
        builder.setBolt("count", new WordCount()).globalGrouping("split");
    
        Config conf = new Config();
        conf.put("wordsFile", args[0]);
        conf.setDebug(false);
        //Topology run
         if (args != null && args.length > 1) {
            conf.setNumWorkers(2);
            StormSubmitter.submitTopology(args[1], conf, builder.createTopology());
        }   
        else {
            conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("wordcount", conf, builder.createTopology());
            Thread.sleep(1000);
            cluster.shutdown();
        }
    
      }
    }

    start zookeeper.(zkServer.sh start at namenode,datanode01,datanode02)

    start storm nimbus at namenode.

    start storm supervisor at datanode01 and datanode02;

    at namenode:

    cd /home/hadoop/workspace

    cd /stormsample

    mvn install

    storm jar storm-example-0.0.1-SNAPSHOT.jar stormdemo.WordCountTopology /home/hadoop/wordinput.txt wordcount

    first, you should prepare text file for the source, I put one txt file wordinput.txt in datanode01 /02 /home/hadoop/.

    after running job, I found wordcount.txt at datanode01 node.

    Looking for a job working at Home about MSBI
  • 相关阅读:
    vb.net与XML之间的操作
    C#动态生成WORD文档并进行操作。
    Excel的VBA连接数据库方法
    C#动态生成EXCLE并进行添加内容
    c#与WORD之间的基本操作
    C#运用ADO.net动态创建excle并进行相应的数据修改显示
    USACOMother's Milk
    USACOArithmetic Progressions
    USACONumber Triangles
    USACOMilking Cows
  • 原文地址:https://www.cnblogs.com/huaxiaoyao/p/4289618.html
Copyright © 2011-2022 走看看