zoukankan      html  css  js  c++  java
  • eclipse配置storm1.1.0开发环境并本地跑起来

      storm的开发环境搭建比hadoop(参见前文http://www.cnblogs.com/wuxun1997/p/6849878.html)简单,无需安装插件,只需新建一个java项目并配置好lib包引用即可。本地跑也无需先启动storm,直接Run As->Java Application完事。下面细看:

      1、新建项目:在eclipse中点File->New->选Project->Java Project->next,输入自己想要的项目名,我这里写storm,点Finish;

      2、引入jar包:右击storm项目src目录->Build Path,选Config Build Path->Libraries->Add Library,选User Library,点next,点击User Libraries->点New,输入引用lib包名,这里写storm->点Add External JARs,选storm安装目录lib包所在路径:D:apache-storm-1.1.0lib,为了使用中文分词还要引用到IKAnalyzer2012_FF.jar,该包下载地址同样参见上面链接->一路确定后就可以开始写代码了;

      3、代码结构如下:

    src

     |---com.wulinfeng.storm.wordsplit.WordSplit.java

     |---IKAnalyzer.cfg.xml

     |---myext.dic

     |---mystopword.dic

      除了WordSplit.java要新写,其他3个文件无需修改,内容参见上面链接。

    package com.wulinfeng.storm.wordsplit;
    
    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.OutputStreamWriter;
    import java.io.StringReader;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.wltea.analyzer.core.IKSegmenter;
    import org.wltea.analyzer.core.Lexeme;
    
    public class WordSplit {
    
        /**
         * 发射数据源
         * 
         * @author Administrator
         *
         */
        public static class WordReaderSpout extends BaseRichSpout {
    
            SpoutOutputCollector _collector;
            InputStreamReader isr;
            boolean isEnd = false;
    
            @Override
            public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
                String inputFile = "D:/input/people.txt";
                try {
                    isr = new InputStreamReader(new FileInputStream(inputFile));
                } catch (FileNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                _collector = collector;
            }
    
            @Override
            public void nextTuple() {
                // 读取文件一次就无需再读了
                if (isEnd) {
                    System.out.println("*******Spout is over, no neccessary to emit.*********");
                    return;
                }
    
                // 读本地文件,一行发射一次
                String line = null;
                try (BufferedReader br = new BufferedReader(isr)) {
                    while ((line = br.readLine()) != null) {
                        System.out.printf("line : %s", line);
                        _collector.emit(new Values(line));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    isEnd = true; // 文件读完了
                }
    
            }
    
            @Override
            public void ack(Object id) {
            }
    
            @Override
            public void fail(Object id) {
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
    
        }
    
        /**
         * 处理上面发射过来的数据源
         * 
         * @author Administrator
         *
         */
        public static class SplitWordBolt extends BaseRichBolt {
    
            private OutputCollector outputCollector;
    
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                this.outputCollector = collector;
            }
    
            @Override
            public void execute(Tuple tuple) {
                String sentence = tuple.getString(0); // 一次处理一行
                IKSegmenter ikSeg = new IKSegmenter(new StringReader(sentence), true); // 智能分词
                try {
                    for (Lexeme lexeme = ikSeg.next(); lexeme != null; lexeme = ikSeg.next()) {
                        outputCollector.emit(new Values(lexeme.getLexemeText()));
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word"));
            }
    
        }
    
        /**
         * 统计从上面取到的分词,关键人名统计后的放到result.txt
         * 
         * @author Administrator
         *
         */
        public static class WordCountBolt extends BaseBasicBolt {
            Map<String, Integer> counts = new HashMap<String, Integer>();
            String out;
            Set<String> keyName = new HashSet<>();
    
            @Override
            public void prepare(Map stormConf, TopologyContext context) {
                out = "D:/out/result.txt";
    
                // 判断result文件是否已存在,是则先删掉,以待新建
                File outFile = new File(out);
                if (outFile.exists()) {
                    outFile.delete();
                }
    
                // 读字典文件并放入一个set,以备参照set里的人名读取统计结果,写入result.txt文件
                try (BufferedReader br = new BufferedReader(
                        new InputStreamReader(WordSplit.class.getClassLoader().getResourceAsStream("myext.dic")))) {
                    String peopleName = null;
                    while ((peopleName = br.readLine()) != null) {
                        keyName.add(peopleName);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
            }
    
            @Override
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                String word = tuple.getString(0); // 每次统计一个分词
                Integer count = counts.get(word);
                if (count == null)
                    count = 0;
                count++;
                counts.put(word, count);
                collector.emit(new Values(word, count));
            }
    
            @Override
            public void cleanup() {
                // 最后时刻,输出关键人名的统计结果到result.txt文件
                try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(out)))) {
                    for (Map.Entry<String, Integer> keyWord : counts.entrySet()) {
                        if (keyName.contains(keyWord.getKey())) {
                            bw.write(keyWord.getKey() + " : " + keyWord.getValue() + "
    ");
                            bw.flush();
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("word", "count"));
            }
        }
    
        /**
         * 输出分词结果到本地文件,过程数据放在tmp文件
         * 
         * @author Administrator
         *
         */
        public static class SaveOutput extends BaseRichBolt {
            String temp;
    
            @Override
            public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
                temp = "D:/out/tmp" + System.currentTimeMillis();
    
                // 判断tmp文件是否已存在,是则先删掉,以待新建
                File tempFile = new File(temp);
                if (tempFile.exists()) {
                    tempFile.delete();
                }
            }
    
            @Override
            public void execute(Tuple input) {
                // 从上面获取分词的累计次数
                String name = input.getStringByField("word");
                Integer counts = input.getIntegerByField("count");
    
                // 输出分词统计过程追加到tmp文件
                try (BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(temp, true)))) {
                    bw.write(name + " : " + counts + "
    ");
                    bw.flush();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                // TODO Auto-generated method stub
    
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            TopologyBuilder builder = new TopologyBuilder(); // 新建一个拓扑
    
            builder.setSpout("spout", new WordReaderSpout(), 1); // 设置数据源
    
            // 读取spout里的数据,进行split处理
            builder.setBolt("split", new SplitWordBolt(), 10).shuffleGrouping("spout");
    
            // 读取split后的数据,进行count处理
            builder.setBolt("count", new WordCountBolt(), 10).fieldsGrouping("split", new Fields("word"));
    
            // 保存计算结果
            builder.setBolt("save", new SaveOutput(), 10).allGrouping("count");
    
            Config conf = new Config();
            conf.setDebug(true);
    
            conf.setMaxTaskParallelism(1);
    
            // 有参数则到集群跑,没有则在本地跑
            if (args != null && args.length > 0) {
                conf.setNumWorkers(3);
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } else {
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("word-split", conf, builder.createTopology());
                Thread.sleep(300000); // 5分钟后自动结束
                cluster.shutdown();
            }
        }
    
    }

      上面的java文件直接右键选择Run As->Java Application就可以跑起来了,因为是流的形式,所以会跑得慢一些,这里设置5分钟自动结束。跑的时候可以看到D:out mpXXX.txt不断在刷数据,跑结束后可以去D:out esult.txt看那几个猪脚的出境率。跑集群的话需要先起zookeeper和storm,把上面代码和引用的lib包打个jar,到命令行里去执行storm jar,运行情况可以去localhost:8088上看。

  • 相关阅读:
    阿里云云效技术专家分享:云原生开发、调测及可靠发布解决方案
    对话李飞飞,揭秘国际体育赛事风“云”背后的黑科技
    时序数据库永远的难关 — 时间线膨胀(高基数 Cardinality)问题的解决方案
    当Java遇上机密计算,又一段奇幻之旅开始了!
    内核热补丁,真的安全么?
    在 Dubbo3.0 上服务治理的实践
    CCF-201509-3-生成模板系统
    WPF CommandParameter的使用
    UWP App Data存储和获取
    在WPF中的ItemsControl中使用事件和命令(Using events and Commands within ItemsControl in WPF)
  • 原文地址:https://www.cnblogs.com/wuxun1997/p/6884378.html
Copyright © 2011-2022 走看看