zoukankan      html  css  js  c++  java
  • storm单词计数 本地运行




    import java.io.File;
    import java.io.IOException;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;


    import org.apache.commons.io.FileUtils;


    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import cn.crxy.storm.LocalStormTopology.SumBolt;


    public class WordcountStormTopology {

    public static class DataSourceSpout extends BaseRichSpout{
    private Map conf;
    private TopologyContext context;
    private SpoutOutputCollector collector;

    /**
    * 在本实例执行的时候被调用一次
    */
    public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
    this.conf = conf;
    this.context = context;
    this.collector = collector;
    }
    /**
    * 死循环调用 心跳
    */

    public void nextTuple() {
    //获取指定目录以下全部的文件
    Collection<File> files = FileUtils.listFiles(new File("D:\test"), new String[]{"txt"}, true);
    for (File file : files) {
    try {
    //解析每个文件的每一行
    List<String> readLines = FileUtils.readLines(file);

    for (String line : readLines) {
    //把每一行数据发送出去
    this.collector.emit(new Values(line));
    }

    //重命名  防止多次读
    FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
    } catch (IOException e) {

    e.printStackTrace();
    }
    }
    }
    /**
    * 声明字段名称
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    //fields就是field的列表
    declarer.declare(new Fields("line"));
    }
    }

    public static class SpiltBolt extends BaseRichBolt{

    private Map stormConf;
    private TopologyContext context;
    private OutputCollector collector;
    /**
    * 仅仅会被调用一次
    */
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    this.stormConf = stormConf;
    this.context = context;
    this.collector = collector;
    }
    /**
    * 死循环,循环的获取上一级发送过来的数据(spout/bolt)
    */
    public void execute(Tuple input) {
    //获取tuple发来数据
    String line = input.getStringByField("line");
    //对每一行数据进行分割
    String[] words = line.split(" ");
    for (String word : words) {
    //把分割的单词发送到下一个bolt
    this.collector.emit(new Values(word));
    }
    }


  • 相关阅读:
    远程安装WinXP OEM版系统的痛苦经历
    许可证服务因许可证不够出现占用CPU的故障
    AvayaP133G2和3Com 3300交换机间的Vlan连接
    从win2000升级到win2003后ISA2000缓存的问题
    大型局域网中用ISA隔离部分计算机
    ORACLE学习第二天
    ORACLE ROWID解析
    ORA32773问题解决
    ORACLE学习第三天
    ORACLE表空间迁移
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/7055022.html
Copyright © 2011-2022 走看看