zoukankan      html  css  js  c++  java
  • 单词计数例子

    学习storm,开始编写小例子


    import java.io.File;

    import java.io.IOException;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
     
    import org.apache.commons.io.FileUtils;
     
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
     
     
    public class WordCountTopology {
     public static class  DataSourceSpout extends BaseRichSpout{
      private Map conf;
      private TopologyContext context;
      private SpoutOutputCollector collector;
      /**
       * 此方法只调用一次
       */
      public void open(Map conf, TopologyContext context,
        SpoutOutputCollector collector) {
       this.conf = conf;
       this.collector = collector;
       this.context = context;
      }
      /**
       * 死循环调用,心跳
       */
      int i=0;
      public void nextTuple() {
       //读取指定文件目录
       Collection<File> listFiles = FileUtils.listFiles(new File("d:\test"), new String[]{"txt"}, true);
       for(File file:listFiles){
        try {
         
         //获取每个文件的所有数据
         List<String> readLines = FileUtils.readLines(file);
         //文件被读取过以后进行重命名
         FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
    // file.renameTo(new File(file.getAbsolutePath()+System.currentTimeMillis()));
         for (String line : readLines) {
          //把每一行数据发射出去
          this.collector.emit(new Values(line));
         }
         
        } catch (IOException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
        }
       
       }
       
       
      }
      /**
       * 声明输出内容
       */
      public void declareOutputFields(OutputFieldsDeclarer declare) {
       declare.declare(new Fields("line"));
      }
     
     }
     public static class Splitbolt extends BaseRichBolt{
      private Map stormConf;
      private TopologyContext context;
      private OutputCollector collector;
      public void prepare(Map stormConf, TopologyContext context,
        OutputCollector collector) {
       // TODO Auto-generated method stub
       this.stormConf = stormConf;
       this.context = context;
       this.collector = collector;
       
       
      }
     
      public void execute(Tuple input) {
     
       //获取每一行数据
       String line = input.getStringByField("line");
       
       //把数据切分成一个个单词
       String[] wordsStrings = line.split(" ");
       //把每个单词都发射出去
       for (String word : wordsStrings) {
        this.collector.emit(new Values(word));
       }
      }
     
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
       declarer.declare(new Fields("words"));
      }
     
     
     }
     /**
      * 计算每个单词出现次数
      * @author tangyw
      *
      */
     public static class Countbolt extends BaseRichBolt{
      private Map stormConf;
      private TopologyContext context;
      private OutputCollector collector;
      public void prepare(Map stormConf, TopologyContext context,
        OutputCollector collector) {
       // TODO Auto-generated method stub
       this.stormConf = stormConf;
       this.context = context;
       this.collector = collector;
       
       
      }
      HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
     
      public void execute(Tuple input) {
     
       //获取每一个单词
       String word = input.getStringByField("words");
       //对所有的单词汇总
       Integer valueInteger = hashMap.get(word);
       if (valueInteger==null) {
        valueInteger=0;
       }
       valueInteger++;
       hashMap.put(word, valueInteger);
       //把结果打印出来
       System.out.println("----------------");
       for (Entry<String, Integer> entry : hashMap.entrySet()) {
        System.out.println(entry);
       }
      }
     
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
      }
     
     
     }
     public static void main(String[] args) {
      TopologyBuilder topologyBuilder = new TopologyBuilder();
      topologyBuilder.setSpout("spout_id", new DataSourceSpout());
      topologyBuilder.setBolt("bolt_id", new Splitbolt()).shuffleGrouping("spout_id");
      topologyBuilder.setBolt("bolt_id_count", new Countbolt()).shuffleGrouping("bolt_id");
     
      LocalCluster localCluster = new LocalCluster();
      localCluster.submitTopology("topology", new Config(), topologyBuilder.createTopology());
     
     
     }
    }
  • 相关阅读:
    Jmeter运行原理
    hihoOffer收割练习20题目2
    hihoOffer收割练习20题目1
    STL-map容器
    STL-map容器
    2017多校合练1
    2017多校合练1
    STL之map基础知识
    STL之map基础知识
    DP入门
  • 原文地址:https://www.cnblogs.com/tangyw/p/4728680.html
Copyright © 2011-2022 走看看