zoukankan      html  css  js  c++  java
  • Storm实现单词统计代码

      

      1 import java.io.File;
      2 import java.io.IOException;
      3 import java.util.Collection;
      4 import java.util.HashMap;
      5 import java.util.List;
      6 import java.util.Map;
      7 import java.util.Map.Entry;
      8 
      9 import org.apache.commons.io.FileUtils;
     10 
     11 import backtype.storm.Config;
     12 import backtype.storm.LocalCluster;
     13 import backtype.storm.spout.SpoutOutputCollector;
     14 import backtype.storm.task.OutputCollector;
     15 import backtype.storm.task.TopologyContext;
     16 import backtype.storm.topology.OutputFieldsDeclarer;
     17 import backtype.storm.topology.TopologyBuilder;
     18 import backtype.storm.topology.base.BaseRichBolt;
     19 import backtype.storm.topology.base.BaseRichSpout;
     20 import backtype.storm.tuple.Fields;
     21 import backtype.storm.tuple.Tuple;
     22 import backtype.storm.tuple.Values;
     23 import backtype.storm.utils.Utils;
     24 
     25 /**
     26  * 单词计数
     27  * 监控d:\test目录下面的文件,统计单词出现的总次数
     28  * 当有新文件出现的时候,也要能解析出来
     29  * 
     30  * @author Administrator
     31  *
     32  */
     33 public class LocalTopologyWordCount {
     34     
     35     
     36     /**
     37      * spout需要继承baserichspout,实现未实现的方法
     38      * @author Administrator
     39      *
     40      */
     41     public static class DataSourceSpout extends BaseRichSpout{
     42         private Map conf;
     43         private TopologyContext context;
     44         private SpoutOutputCollector collector;
     45         
     46         /**
     47          * 初始化方法,只会执行一次
     48          * 在这里面可以写一个初始化的代码
     49          * Map conf:其实里面保存的是topology的一些配置信息
     50          * TopologyContext context:topology的上下文,类似于servletcontext
     51          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     52          */
     53         @Override
     54         public void open(Map conf, TopologyContext context,
     55                 SpoutOutputCollector collector) {
     56             this.conf = conf;
     57             this.context = context;
     58             this.collector = collector;
     59         }
     60 
     61         /**
     62          * 这个方法是spout中最重要的方法,
     63          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     64          * 每调用一次,会向外发射一条数据
     65          */
     66         @Override
     67         public void nextTuple() {
     68             //获取指定目录下面的新文件,
     69             Collection<File> listFiles = FileUtils.listFiles(new File("d:\test"), new String[]{"txt"}, true);
     70             //分别读取每个文件
     71             for (File file : listFiles) {
     72                 try {
     73                     List<String> readLines = FileUtils.readLines(file);
     74                     for (String line : readLines) {
     75                         //把每一行封装成一个tuple,发射出去
     76                         this.collector.emit(new Values(line));
     77                     }
     78                     FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));//给文件该名字,否则会一直处理这个文件.
     79                 } catch (IOException e) {
     80                     e.printStackTrace();
     81                 }
     82             }
     83         }
     84         
     85         /**
     86          * 声明输出字段
     87          */
     88         @Override
     89         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     90             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     91             //fields中定义的参数和values中传递的数值是一一对应的
     92             declarer.declare(new Fields("line"));
     93         }
     94         
     95     }
     96     
     97     
     98     /**
     99      * 自定义bolt需要实现baserichbolt
    100      * @author Administrator
    101      *
    102      */
    103     public static class SplitBolt extends BaseRichBolt{
    104         private Map stormConf; 
    105         private TopologyContext context;
    106         private OutputCollector collector;
    107         
    108         /**
    109          * 和spout中的open方法意义一样
    110          */
    111         @Override
    112         public void prepare(Map stormConf, TopologyContext context,
    113                 OutputCollector collector) {
    114             this.stormConf = stormConf;
    115             this.context = context;
    116             this.collector = collector;
    117         }
    118 
    119         /**
    120          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    121          */
    122         @Override
    123         public void execute(Tuple input) {
    124             //获取每一行数据进行切割
    125             String line = input.getStringByField("line");
    126             String[] splits = line.split("	");
    127             //把切割出来的单词一个一个发射出去
    128             for (String word : splits) {
    129                 this.collector.emit(new Values(word));
    130             }
    131             
    132         }
    133         
    134         //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    135         //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    136         /**
    137          * 声明输出字段
    138          */
    139         @Override
    140         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    141             declarer.declare(new Fields("word"));
    142         }
    143         
    144     }
    145     
    146     
    147     
    148     /**
    149      * 自定义bolt需要实现baserichbolt
    150      * @author Administrator
    151      *
    152      */
    153     public static class CountBolt extends BaseRichBolt{
    154         private Map stormConf; 
    155         private TopologyContext context;
    156         private OutputCollector collector;
    157         
    158         /**
    159          * 和spout中的open方法意义一样
    160          */
    161         @Override
    162         public void prepare(Map stormConf, TopologyContext context,
    163                 OutputCollector collector) {
    164             this.stormConf = stormConf;
    165             this.context = context;
    166             this.collector = collector;
    167         }
    168 
    169         HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
    170         /**
    171          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    172          */
    173         @Override
    174         public void execute(Tuple input) {
    175             //获取每一个单词
    176             String word = input.getStringByField("word");
    177             //在map中进行统计
    178             Integer integer = hashMap.get(word);
    179             if(integer==null){
    180                 integer=0;
    181             }
    182             integer++;
    183             hashMap.put(word, integer);
    184             //把这个统计结果打印到控制台
    185             Utils.sleep(1000);
    186             System.out.println("=========================================");
    187             for (Entry<String, Integer> entry : hashMap.entrySet()) {
    188                 System.out.println(entry);
    189             }
    190         }
    191         
    192         //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    193         //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    194         /**
    195          * 声明输出字段
    196          */
    197         @Override
    198         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    199         }
    200         
    201     }
    202     /**
    203      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    204      * @param args
    205      */
    206     public static void main(String[] args) {
    207         //组装topology
    208         TopologyBuilder topologyBuilder = new TopologyBuilder();
    209         topologyBuilder.setSpout("spout1", new DataSourceSpout());
    210         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    211         topologyBuilder.setBolt("bolt1", new SplitBolt()).shuffleGrouping("spout1");
    212         topologyBuilder.setBolt("bolt2", new CountBolt()).shuffleGrouping("bolt1");
    213         
    214         //创建本地storm集群
    215         LocalCluster localCluster = new LocalCluster();
    216         localCluster.submitTopology("wordCountTopology", new Config(), topologyBuilder.createTopology());
    217     }
    218 }
  • 相关阅读:
    C. Shaass and Lights 解析(思維、組合)
    D. Binary String To Subsequences(队列)(贪心)
    CodeForces 1384B2. Koa and the Beach (Hard Version)(贪心)
    CodeForces 1384B1. Koa and the Beach (Easy Version)(搜索)
    CodeForces 1384C. String Transformation 1(贪心)(并查集)
    CodeForces 1384A. Common Prefixes
    POJ-2516 Minimum Cost(最小费用最大流)
    POJ3261-Milk Patterns(后缀数组)
    HDU-1300 Pearls(斜率DP)
    HDU-4528 小明系列故事-捉迷藏(BFS)
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/5774982.html
Copyright © 2011-2022 走看看