zoukankan      html  css  js  c++  java
  • Storm实现单词计数

      1 package com.mengyao.storm;
      2 
      3 import java.io.File;
      4 import java.io.IOException;
      5 import java.util.Collection;
      6 import java.util.HashMap;
      7 import java.util.List;
      8 import java.util.Map;
      9 import java.util.Map.Entry;
     10 
     11 import org.apache.commons.io.FileUtils;
     12 
     13 import backtype.storm.Config;
     14 import backtype.storm.LocalCluster;
     15 import backtype.storm.StormSubmitter;
     16 import backtype.storm.generated.AlreadyAliveException;
     17 import backtype.storm.generated.InvalidTopologyException;
     18 import backtype.storm.spout.SpoutOutputCollector;
     19 import backtype.storm.task.OutputCollector;
     20 import backtype.storm.task.TopologyContext;
     21 import backtype.storm.topology.OutputFieldsDeclarer;
     22 import backtype.storm.topology.TopologyBuilder;
     23 import backtype.storm.topology.base.BaseRichBolt;
     24 import backtype.storm.topology.base.BaseRichSpout;
     25 import backtype.storm.tuple.Fields;
     26 import backtype.storm.tuple.Tuple;
     27 import backtype.storm.tuple.Values;
     28 import backtype.storm.utils.Utils;
     29 
     30 /**
     31  * Storm中的单词计数,拓扑结构为InputSpout->SplitBolt->CountBolt = WordCountTopology
     32  * @author mengyao
     33  *
     34  */
     35 @SuppressWarnings("all")
     36 public class WordCountTopology {
     37 
     38     public static class InputSpout extends BaseRichSpout{
     39 
     40         private Map conf;
     41         private TopologyContext context;
     42         private SpoutOutputCollector collector;
     43         
     44         /**
     45          * 实例化该Spout时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法
     46          */
     47         @Override
     48         public void open(Map conf, TopologyContext context,
     49                 SpoutOutputCollector collector) {
     50             this.conf = conf;
     51             this.context = context;
     52             this.collector = collector;
     53         }
     54 
     55         /**
     56          * 死循环发射每行消息
     57          */
     58         @Override
     59         public void nextTuple() {
     60             Collection<File> listFiles = FileUtils.listFiles(new File("D:/"), new String[]{"log"}, false);
     61             for (File file : listFiles) {
     62                 try {
     63                     List<String> lines = FileUtils.readLines(file);
     64                     for (String line : lines) {
     65                         this.collector.emit(new Values(line));
     66                         System.err.println("==== InputSpout:"+line+" ====");
     67                     }
     68                     FileUtils.moveFile(file, new File(file.getAbsoluteFile()+".tmp"));
     69                 } catch (IOException e) {
     70                     e.printStackTrace();
     71                     throw new RuntimeException(e);
     72                 }
     73             }
     74         }
     75 
     76         /**
     77          * 声明字段“line”提供给下一个Bolt组件订阅
     78          */
     79         @Override
     80         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     81             declarer.declare(new Fields("line"));
     82         }
     83         
     84     }
     85     
     86     public static class SplitBolt extends BaseRichBolt{
     87 
     88         private Map stormConf;
     89         private TopologyContext context;
     90         private OutputCollector collector;
     91         
     92         /**
     93          * 实例化该Bolt时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法
     94          */
     95         @Override
     96         public void prepare(Map stormConf, TopologyContext context,
     97                 OutputCollector collector) {
     98             this.stormConf = stormConf;
     99             this.context = context;
    100             this.collector = collector;
    101         }
    102 
    103         /**
    104          * 死循环发送每个单词
    105          */
    106         @Override
    107         public void execute(Tuple input) {
    108             String line = input.getStringByField("line");
    109             String[] words = line.split("	");
    110             for (String word : words) {
    111                 this.collector.emit(new Values(word));
    112                 System.err.println("==== SplitBolt:"+word+" ====");
    113             }
    114         }
    115 
    116         /**
    117          * 声明字段“word”提供给下一个Bolt组件订阅
    118          */
    119         @Override
    120         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    121             declarer.declare(new Fields("word"));
    122         }
    123         
    124     }
    125     
    126     public static class CountBolt extends BaseRichBolt{
    127 
    128         private Map stormConf;
    129         private TopologyContext context;
    130         private OutputCollector collector;
    131         HashMap<String, Long> map = new HashMap<String, Long>();
    132         
    133         /**
    134          * 实例化该Bolt时预处理,仅会被调用一次,类似于MapReduce中Mapper/Reducer的setup()方法
    135          */
    136         @Override
    137         public void prepare(Map stormConf, TopologyContext context,
    138                 OutputCollector collector) {
    139             this.stormConf = stormConf;
    140             this.context = context;
    141             this.collector = collector;
    142         }
    143 
    144         @Override
    145         public void execute(Tuple input) {
    146             String word = input.getStringByField("word");
    147             Long value = map.get(word);
    148             if (value==null) {
    149                 value=0L;
    150             }
    151             value++;
    152             map.put(word, value);
    153             for (Entry<String, Long> entry : map.entrySet()) {
    154                 System.err.println("==== CountBolt:"+entry+" ====");
    155             }
    156         }
    157 
    158         @Override
    159         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    160         }
    161         
    162     }
    163 
    164     public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
    165         String topologyName = WordCountTopology.class.getSimpleName();
    166         TopologyBuilder builder = new TopologyBuilder();
    167         builder.setSpout("input", new InputSpout());
    168         builder.setBolt("split", new SplitBolt()).shuffleGrouping("input");
    169         builder.setBolt("count", new CountBolt()).shuffleGrouping("split");
    170         
    171         Config config = new Config();
    172         config.setDebug(true);
    173         
    174         if (args!=null && args.length>0) {        //如果是生产环境中使用集群模式提交拓扑
    175             config.setNumWorkers(3);
    176             StormSubmitter.submitTopology(topologyName, config, builder.createTopology());
    177         } else {                                      //否则使用本地模式提交拓扑
    178             LocalCluster cluster = new LocalCluster();
    179             cluster.submitTopology(topologyName, config, builder.createTopology());
    180             Utils.sleep(1000*100);
    181             cluster.killTopology(topologyName);
    182             cluster.shutdown();
    183         }
    184         
    185     }
    186 }
    187 
    188 依赖的jar包如下图:
  • 相关阅读:
    赫夫曼树编码
    根据先序和中序实现后序
    C++语言实现开心消消乐
    C语言风格实现的开心消消乐
    动态规划
    leetcode dp wordbreakII
    欧拉回路
    欧拉通路是否存在
    Python|多任务:线程、进程、协程--你想要的都在这里
    网络通信:socket、udp与tcp
  • 原文地址:https://www.cnblogs.com/mengyao/p/4738198.html
Copyright © 2011-2022 走看看