zoukankan      html  css  js  c++  java
  • Storm实现单词统计代码

      

      1 import java.io.File;
      2 import java.io.IOException;
      3 import java.util.Collection;
      4 import java.util.HashMap;
      5 import java.util.List;
      6 import java.util.Map;
      7 import java.util.Map.Entry;
      8 
      9 import org.apache.commons.io.FileUtils;
     10 
     11 import backtype.storm.Config;
     12 import backtype.storm.LocalCluster;
     13 import backtype.storm.spout.SpoutOutputCollector;
     14 import backtype.storm.task.OutputCollector;
     15 import backtype.storm.task.TopologyContext;
     16 import backtype.storm.topology.OutputFieldsDeclarer;
     17 import backtype.storm.topology.TopologyBuilder;
     18 import backtype.storm.topology.base.BaseRichBolt;
     19 import backtype.storm.topology.base.BaseRichSpout;
     20 import backtype.storm.tuple.Fields;
     21 import backtype.storm.tuple.Tuple;
     22 import backtype.storm.tuple.Values;
     23 import backtype.storm.utils.Utils;
     24 
     25 /**
     26  * 单词计数
     27  * 监控d:\test目录下面的文件,统计单词出现的总次数
     28  * 当有新文件出现的时候,也要能解析出来
     29  * 
     30  * @author Administrator
     31  *
     32  */
     33 public class LocalTopologyWordCount {
     34     
     35     
     36     /**
     37      * spout需要继承baserichspout,实现未实现的方法
     38      * @author Administrator
     39      *
     40      */
     41     public static class DataSourceSpout extends BaseRichSpout{
     42         private Map conf;
     43         private TopologyContext context;
     44         private SpoutOutputCollector collector;
     45         
     46         /**
     47          * 初始化方法,只会执行一次
     48          * 在这里面可以写一个初始化的代码
     49          * Map conf:其实里面保存的是topology的一些配置信息
     50          * TopologyContext context:topology的上下文,类似于servletcontext
     51          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     52          */
     53         @Override
     54         public void open(Map conf, TopologyContext context,
     55                 SpoutOutputCollector collector) {
     56             this.conf = conf;
     57             this.context = context;
     58             this.collector = collector;
     59         }
     60 
     61         /**
     62          * 这个方法是spout中最重要的方法,
     63          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     64          * 每调用一次,会向外发射一条数据
     65          */
     66         @Override
     67         public void nextTuple() {
     68             //获取指定目录下面的新文件,
     69             Collection<File> listFiles = FileUtils.listFiles(new File("d:\test"), new String[]{"txt"}, true);
     70             //分别读取每个文件
     71             for (File file : listFiles) {
     72                 try {
     73                     List<String> readLines = FileUtils.readLines(file);
     74                     for (String line : readLines) {
     75                         //把每一行封装成一个tuple,发射出去
     76                         this.collector.emit(new Values(line));
     77                     }
     78                     FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));//给文件该名字,否则会一直处理这个文件.
     79                 } catch (IOException e) {
     80                     e.printStackTrace();
     81                 }
     82             }
     83         }
     84         
     85         /**
     86          * 声明输出字段
     87          */
     88         @Override
     89         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     90             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     91             //fields中定义的参数和values中传递的数值是一一对应的
     92             declarer.declare(new Fields("line"));
     93         }
     94         
     95     }
     96     
     97     
     98     /**
     99      * 自定义bolt需要实现baserichbolt
    100      * @author Administrator
    101      *
    102      */
    103     public static class SplitBolt extends BaseRichBolt{
    104         private Map stormConf; 
    105         private TopologyContext context;
    106         private OutputCollector collector;
    107         
    108         /**
    109          * 和spout中的open方法意义一样
    110          */
    111         @Override
    112         public void prepare(Map stormConf, TopologyContext context,
    113                 OutputCollector collector) {
    114             this.stormConf = stormConf;
    115             this.context = context;
    116             this.collector = collector;
    117         }
    118 
    119         /**
    120          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    121          */
    122         @Override
    123         public void execute(Tuple input) {
    124             //获取每一行数据进行切割
    125             String line = input.getStringByField("line");
    126             String[] splits = line.split("	");
    127             //把切割出来的单词一个一个发射出去
    128             for (String word : splits) {
    129                 this.collector.emit(new Values(word));
    130             }
    131             
    132         }
    133         
    134         //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    135         //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    136         /**
    137          * 声明输出字段
    138          */
    139         @Override
    140         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    141             declarer.declare(new Fields("word"));
    142         }
    143         
    144     }
    145     
    146     
    147     
    148     /**
    149      * 自定义bolt需要实现baserichbolt
    150      * @author Administrator
    151      *
    152      */
    153     public static class CountBolt extends BaseRichBolt{
    154         private Map stormConf; 
    155         private TopologyContext context;
    156         private OutputCollector collector;
    157         
    158         /**
    159          * 和spout中的open方法意义一样
    160          */
    161         @Override
    162         public void prepare(Map stormConf, TopologyContext context,
    163                 OutputCollector collector) {
    164             this.stormConf = stormConf;
    165             this.context = context;
    166             this.collector = collector;
    167         }
    168 
    169         HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
    170         /**
    171          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    172          */
    173         @Override
    174         public void execute(Tuple input) {
    175             //获取每一个单词
    176             String word = input.getStringByField("word");
    177             //在map中进行统计
    178             Integer integer = hashMap.get(word);
    179             if(integer==null){
    180                 integer=0;
    181             }
    182             integer++;
    183             hashMap.put(word, integer);
    184             //把这个统计结果打印到控制台
    185             Utils.sleep(1000);
    186             System.out.println("=========================================");
    187             for (Entry<String, Integer> entry : hashMap.entrySet()) {
    188                 System.out.println(entry);
    189             }
    190         }
    191         
    192         //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    193         //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    194         /**
    195          * 声明输出字段
    196          */
    197         @Override
    198         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    199         }
    200         
    201     }
    202     /**
    203      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    204      * @param args
    205      */
    206     public static void main(String[] args) {
    207         //组装topology
    208         TopologyBuilder topologyBuilder = new TopologyBuilder();
    209         topologyBuilder.setSpout("spout1", new DataSourceSpout());
    210         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    211         topologyBuilder.setBolt("bolt1", new SplitBolt()).shuffleGrouping("spout1");
    212         topologyBuilder.setBolt("bolt2", new CountBolt()).shuffleGrouping("bolt1");
    213         
    214         //创建本地storm集群
    215         LocalCluster localCluster = new LocalCluster();
    216         localCluster.submitTopology("wordCountTopology", new Config(), topologyBuilder.createTopology());
    217     }
    218 }
  • 相关阅读:
    hdu4639 hehe ——斐波纳契数列,找规律
    codefoces round193a
    codeforces 192e
    abbyy cup a
    年中总结
    codeforces 192a
    codeforces 192b
    codeforces 192 c
    codeforces 192 D
    codeforces magic five --快速幂模
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/5774982.html
Copyright © 2011-2022 走看看