zoukankan      html  css  js  c++  java
  • Storm- 使用Storm实现词频汇总

    需求:读取指定目录的数据,并实现单词计数的功能

    实现方案

      Spout来读取指定目录的数据,作为后续Bolt处理的input

      使用一个Bolt把input 的数据,切割分开,我们按照逗号进分割

      使用一个Bolt来进行最终的单词次数统计操作并输出

    拓扑设计:DataSourceSpout ==>SpiltBolt ==>CountBolt

    Storm编程注意,Topology,Spout,Bolt等命名不能重复,伤到集群需要注意出现重复命名,会报错的。

    package com.imooc.bigdata;
    
    import org.apache.commons.io.FileUtils;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.*;
    
    /**
     * 使用Storm完成词频统计功能
     */
    public class LocalWordCountStormTopology {
        public static class DataSourceSpout extends BaseRichSpout{
            private SpoutOutputCollector collector;
    
            @Override
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                this.collector = collector;
            }
    
    
            /**
             * 业务逻辑
             * 1) 读取指定目录文件夹下的数据:E:isolinux
             * 2) 把每一行的数据发射出去
             */
            @Override
            public void nextTuple() {
    
                // 获取所有文件
                Collection<File> files = FileUtils.listFiles(new File("E:\iso\linux"), new String[]{"txt"}, true);
                for (File file: files){
                    try {
                        // 获取文件中的所有内容
                        List<String> lines = FileUtils.readLines(file);
    
                        // 获取文件中的每行的内容
                        for (String line: lines){
    
                            // 发射出去
                            this.collector.emit(new Values(line));
                        }
    
                        // TODO... 数据处理完成之后,改名,否则一直重复执行
                        FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("line"));
    
            }
        }
    
    
        /**
         * 对数据进行分割
         */
        public static class SplitBolt extends BaseRichBolt{
            private OutputCollector collector;
    
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                this.collector = collector;
            }
    
            /**
             * 业务逻辑:
             *  line: 对line进行分割,按逗号进行分割
             * @param input
             */
            @Override
            public void execute(Tuple input) {
                String line = input.getStringByField("line");
                String[] words = line.split(",");
    
                for (String word: words){
                    this.collector.emit(new Values(word));
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
        }
    
        /**
         * 词频汇总Bolt
         */
        public static class WordCountBlot extends BaseRichBolt{
    
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
    
            }
    
            Map<String, Integer> map = new HashMap<String, Integer>();
            /**
             * 业务逻辑:
             * 1)获取每个单词
             * 2)对所有单词进行汇总
             * 3)输出
             * @param input
             */
            @Override
            public void execute(Tuple input) {
    
                // 1)获取每个单词
                String word = input.getStringByField("word");
                Integer count = map.get(word);
                if (count == null){
                    count = 0;
                }
                count ++;
    
    
                // 2)对所有单词进行汇总
                map.put(word, count);
    
                // 3)输出
                System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~");
                Set<Map.Entry<String, Integer>> entries = map.entrySet();
                for (Map.Entry<String, Integer> entry: entries) {
                    System.out.println(entry);
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
            }
        }
    
        public static void main(String[] args) {
    
            // 通过TopologyBuilder根据Spout和Bilt构建Topology
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("DataSourceSpout", new DataSourceSpout());
            builder.setBolt("SplitBolt", new SplitBolt()).shuffleGrouping("DataSourceSpout");
            builder.setBolt("WordCountBlot", new WordCountBlot()).shuffleGrouping("SplitBolt");
    
            // 创建本地集群
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWordCountStormTopology", new Config(), builder.createTopology());
    
        }
    }
  • 相关阅读:
    nginx uwsgi django
    ubuntu config proxy
    jdbc调用sparksql
    jdbc调用sparksql on yarn
    JDK错误
    JDK错误
    docker错误
    docker错误
    Django网站直接使用supervisor部署
    Django网站直接使用supervisor部署
  • 原文地址:https://www.cnblogs.com/RzCong/p/9383141.html
Copyright © 2011-2022 走看看