zoukankan      html  css  js  c++  java
  • storm单词计数 本地运行




    import java.io.File;
    import java.io.IOException;
    import java.util.Collection;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;


    import org.apache.commons.io.FileUtils;


    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import cn.crxy.storm.LocalStormTopology.SumBolt;


    public class WordcountStormTopology {

    public static class DataSourceSpout extends BaseRichSpout{
    private Map conf;
    private TopologyContext context;
    private SpoutOutputCollector collector;

    /**
    * 在本实例执行的时候被调用一次
    */
    public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
    this.conf = conf;
    this.context = context;
    this.collector = collector;
    }
    /**
    * 死循环调用 心跳
    */

    public void nextTuple() {
    //获取指定目录以下全部的文件
    Collection<File> files = FileUtils.listFiles(new File("D:\test"), new String[]{"txt"}, true);
    for (File file : files) {
    try {
    //解析每个文件的每一行
    List<String> readLines = FileUtils.readLines(file);

    for (String line : readLines) {
    //把每一行数据发送出去
    this.collector.emit(new Values(line));
    }

    //重命名  防止多次读
    FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
    } catch (IOException e) {

    e.printStackTrace();
    }
    }
    }
    /**
    * 声明字段名称
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    //fields就是field的列表
    declarer.declare(new Fields("line"));
    }
    }

    public static class SpiltBolt extends BaseRichBolt{

    private Map stormConf;
    private TopologyContext context;
    private OutputCollector collector;
    /**
    * 仅仅会被调用一次
    */
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    this.stormConf = stormConf;
    this.context = context;
    this.collector = collector;
    }
    /**
    * 死循环,循环的获取上一级发送过来的数据(spout/bolt)
    */
    public void execute(Tuple input) {
    //获取tuple发来数据
    String line = input.getStringByField("line");
    //对每一行数据进行分割
    String[] words = line.split(" ");
    for (String word : words) {
    //把分割的单词发送到下一个bolt
    this.collector.emit(new Values(word));
    }
    }


  • 相关阅读:
    C# WinForm界面上实现按条件检索数据
    DevExpress中XtraEditors.RadioGroup 控件如何保存获取选中的值及读取数据库中的值
    在QTP Test中利用vbs和cmd实现重新启动QTP
    VBS操作Excel的一点问题总结
    利用vbs维护qtp的虚拟对象的坐标
    Smoke Test和BVT Test的区别
    小结一下VS2012新开发环境的设置经历
    关闭EF4.x Code First的级联删除Cascade Delete
    关于Entity Framework 4.0/4.1数据验证的一点体会
    CentOS 6.3 Minimal yum 安装 PostgreSQL 9.2.3
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/7055022.html
Copyright © 2011-2022 走看看