zoukankan      html  css  js  c++  java
  • Storm监控文件夹变化 统计文件单词数量

    监控指定文件夹,读取文件(新文件动态读取)里的内容,统计单词的数量。

    FileSpout.java,监控文件夹,读取新文件内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    package com.test.stormtest.wordcount;
     
    import java.io.File;
    import java.io.IOException;
    import java.util.Collection;
    import java.util.List;
    import java.util.Map;
     
    import org.apache.commons.io.FileUtils;
     
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
     
    public class FileSpout extends BaseRichSpout {
     
        private static final long serialVersionUID = 1L;
         
        private SpoutOutputCollector collector;
     
        private File target = new File("F:" + File.separator + "test");
        private Collection<File> cacheFiles = null;
     
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
             
            //启动的时候,将文件夹内的所有文件的内容发射出去
            cacheFiles = FileUtils.listFiles(target, nulltrue);
            for (File file : cacheFiles) {
                emitFileConent(file);
            }
        }
     
        public void nextTuple() {
            try {
                Thread.sleep(5000);
            catch (InterruptedException e1) {
                e1.printStackTrace();
            }
             
            //监控新文件,将新文件的内容发射出去
            Collection<File> files = FileUtils.listFiles(target, nulltrue);
            for (File file : files) {
                if(!cacheFiles.contains(file)) {
                    emitFileConent(file);
                }
            }
             
            cacheFiles = files;
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
         
        //将文件内容按行发射出去
        private void emitFileConent(File file) {
            try {
                List<String> lines = FileUtils.readLines(file);
                for (String line : lines) {
                    this.collector.emit(new Values(line));
                }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
     
    }

    SplitBolt.java,将行拆分成单词

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.test.stormtest.wordcount;
     
    import java.util.Map;
     
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
     
    public class SplitBolt extends BaseRichBolt {
        private static final long serialVersionUID = 1L;
         
        private OutputCollector collector = null;
         
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
     
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split(" ");
            for (String word : words) {
                this.collector.emit(new Values(word));
            }
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
     
    }

    SumBolt.java 统计单词数量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    package com.test.stormtest.wordcount;
     
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
     
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
     
    public class SumBolt extends BaseRichBolt{
        private static final long serialVersionUID = 1L;
     
        private Map<String, Long> countMap = null;
         
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            countMap = new HashMap<String, Long>();
        }
     
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Long count = countMap.get(word);
            if(count == null) {
                count = 0L;
            }
            countMap.put(word, ++count);
             
            System.out.println("-----------------------------------------------");
            Set<Entry<String, Long>> entries = countMap.entrySet();
            for (Entry<String, Long> entry : entries) {
                System.out.println(entry.getKey() + ": " + entry.getValue());
            }
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }

    WordCountTopology.java 驱动类,本地模式提交topology

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    package com.test.stormtest.wordcount;
     
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import backtype.storm.utils.Utils;
     
    public class WordCountTopology {
     
        public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder();
             
            builder.setSpout("filespout"new FileSpout());
            builder.setBolt("splitbolt"new SplitBolt()).shuffleGrouping("filespout");
            builder.setBolt("sumtblot"new SumBolt()).fieldsGrouping("splitbolt"newFields("word"));
             
            LocalCluster cluster = new LocalCluster();
            Config config = new Config();
            config.setDebug(true);
            cluster.submitTopology("wordcount", config, builder.createTopology());
             
            Utils.sleep(20000);
            cluster.killTopology("wordcount");
            cluster.shutdown();
        }
    }
  • 相关阅读:
    C# savefiledialog 保存文件后 再操作数据库 提示数据库文件路径错误
    转:在C#中使用Nullable类型
    转:Global.asax通过Application_BeginRequest()事件实现访问链接的静态地址映射
    DZ7.0自动伸缩广告
    转:使用HttpModule来实现sql的防注入
    转:System.Web.UI.Page类的构造函数的执行时机
    使用ISAPI_Rewrite做简单实用的301重定向
    IIS7添加ASP.NET网站碰到的问题
    IE9 崩溃的解决方法
    利用ZEND Studio与Zend server对PHP WEB进行调试
  • 原文地址:https://www.cnblogs.com/lishouguang/p/4559206.html
Copyright © 2011-2022 走看看