zoukankan      html  css  js  c++  java
  • Storm监控文件夹变化 统计文件单词数量

    监控指定文件夹,读取文件(新文件动态读取)里的内容,统计单词的数量。

    FileSpout.java,监控文件夹,读取新文件内容

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    package com.test.stormtest.wordcount;
     
    import java.io.File;
    import java.io.IOException;
    import java.util.Collection;
    import java.util.List;
    import java.util.Map;
     
    import org.apache.commons.io.FileUtils;
     
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
     
    public class FileSpout extends BaseRichSpout {
     
        private static final long serialVersionUID = 1L;
         
        private SpoutOutputCollector collector;
     
        private File target = new File("F:" + File.separator + "test");
        private Collection<File> cacheFiles = null;
     
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
             
            //启动的时候,将文件夹内的所有文件的内容发射出去
            cacheFiles = FileUtils.listFiles(target, nulltrue);
            for (File file : cacheFiles) {
                emitFileConent(file);
            }
        }
     
        public void nextTuple() {
            try {
                Thread.sleep(5000);
            catch (InterruptedException e1) {
                e1.printStackTrace();
            }
             
            //监控新文件,将新文件的内容发射出去
            Collection<File> files = FileUtils.listFiles(target, nulltrue);
            for (File file : files) {
                if(!cacheFiles.contains(file)) {
                    emitFileConent(file);
                }
            }
             
            cacheFiles = files;
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("line"));
        }
         
        //将文件内容按行发射出去
        private void emitFileConent(File file) {
            try {
                List<String> lines = FileUtils.readLines(file);
                for (String line : lines) {
                    this.collector.emit(new Values(line));
                }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
     
    }

    SplitBolt.java,将行拆分成单词

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    package com.test.stormtest.wordcount;
     
    import java.util.Map;
     
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
     
    public class SplitBolt extends BaseRichBolt {
        private static final long serialVersionUID = 1L;
         
        private OutputCollector collector = null;
         
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
     
        public void execute(Tuple input) {
            String line = input.getStringByField("line");
            String[] words = line.split(" ");
            for (String word : words) {
                this.collector.emit(new Values(word));
            }
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
     
    }

    SumBolt.java 统计单词数量

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    package com.test.stormtest.wordcount;
     
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.Set;
     
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.tuple.Tuple;
     
    public class SumBolt extends BaseRichBolt{
        private static final long serialVersionUID = 1L;
     
        private Map<String, Long> countMap = null;
         
        public void prepare(Map stormConf, TopologyContext context,
                OutputCollector collector) {
            countMap = new HashMap<String, Long>();
        }
     
        public void execute(Tuple input) {
            String word = input.getStringByField("word");
            Long count = countMap.get(word);
            if(count == null) {
                count = 0L;
            }
            countMap.put(word, ++count);
             
            System.out.println("-----------------------------------------------");
            Set<Entry<String, Long>> entries = countMap.entrySet();
            for (Entry<String, Long> entry : entries) {
                System.out.println(entry.getKey() + ": " + entry.getValue());
            }
        }
     
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
    }

    WordCountTopology.java 驱动类,本地模式提交topology

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    package com.test.stormtest.wordcount;
     
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.tuple.Fields;
    import backtype.storm.utils.Utils;
     
    public class WordCountTopology {
     
        public static void main(String[] args) {
            TopologyBuilder builder = new TopologyBuilder();
             
            builder.setSpout("filespout"new FileSpout());
            builder.setBolt("splitbolt"new SplitBolt()).shuffleGrouping("filespout");
            builder.setBolt("sumtblot"new SumBolt()).fieldsGrouping("splitbolt"newFields("word"));
             
            LocalCluster cluster = new LocalCluster();
            Config config = new Config();
            config.setDebug(true);
            cluster.submitTopology("wordcount", config, builder.createTopology());
             
            Utils.sleep(20000);
            cluster.killTopology("wordcount");
            cluster.shutdown();
        }
    }
  • 相关阅读:
    android之sharedpreference的两种使用方法
    andoird软件开发之一个记录账号密码的APP--bmob后台
    Android群英传笔记系列三 view的自定义:实现一个模拟下载
    android之文件操作——读取assets和raw文件下的内容
    fragment与viewPaper的使用
    Android之静态和动态加载Fragment
    Swift可选类型
    swift中的类型转化
    SDAutoLayerOut
    github 使用教程
  • 原文地址:https://www.cnblogs.com/lishouguang/p/4559206.html
Copyright © 2011-2022 走看看