zoukankan      html  css  js  c++  java
  • storm 001

    Hadoop、Storm系统和组件接口对比表:

    package storm;

    import org.apache.storm.Config;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.StormTopology;
    import org.apache.storm.topology.TopologyBuilder;

    /**
    * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
    * 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
    */
    public class TopoMain {//storm.TopoMain
    public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    // 将我们的spout组件设置到topology中去
    // parallelism_hint :4 表示用4个excutor来执行这个组件
    // setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task

    builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);

    // 将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
    // .shuffleGrouping("randomspout")包含两层含义:
    // 1、upperbolt组件接收的tuple消息一定来自于randomspout组件
    // 2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping

    builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");

    // 将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息

    builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");

    // 用builder来创建一个topology
    StormTopology demotop = builder.createTopology();

    // 配置一些topology在集群中运行时的参数
    Config conf = new Config();
    // 这里设置的是整个demotop所占用的槽位数,也就是worker的数量
    conf.setNumWorkers(4);
    conf.setDebug(true);
    conf.setNumAckers(0);
    //System.setProperty("storm.jar", "/home/hadoop/storm.jar");
    // 将这个topology提交给storm集群运行
    StormSubmitter.submitTopology("demotopo2", conf, demotop);
    //Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.
    }
    }

    -----------------------------------------------------------------------------------------------------------------

    package storm;

    import java.util.Map;
    import java.util.Random;

    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;

    public class RandomWordSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;

    // 模拟一些数据
    String[] str = { "hello", "word", "you", "how", "are" };

    // 初始化方法,在spout组件实例化时调用一次
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
    this.collector = collector;
    }

    // 声明本spout组件发送出去的tuple中的数据的字段名
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("orignname"));
    }

    @Override
    public void nextTuple() {
    // 随机挑选出一个名称
    Random random = new Random();
    int index = random.nextInt(str.length);

    // 获取名称
    String name = str[index];

    // 将名称进行封装成tuple,发送消息给下一个组件
    collector.emit(new Values(name));

    }

    }

    ************************************************

    package storm;

    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.Map;
    import java.util.UUID;

    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Tuple;

    public class SuffixBolt extends BaseBasicBolt {
    FileWriter fileWriter = null;

    // 在bolt组件运行过程中只会被调用一次
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
    try {
    fileWriter = new FileWriter("/home/hadoop/stormoutput2/" + UUID.randomUUID());
    } catch (IOException e) {
    throw new RuntimeException(e);
    }
    }

    // 该bolt组件的核心处理逻辑
    // 每收到一个tuple消息,就会被调用一次
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
    // 先拿到上一个组件发送过来的名称
    String upper_name = tuple.getString(0);
    String suffix_name = upper_name + "_itisok";

    // 为上一个组件发送过来的商品名称添加后缀
    try {
    fileWriter.write(suffix_name);
    fileWriter.write(" ");
    fileWriter.flush();
    } catch (IOException e) {
    throw new RuntimeException(e);
    }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    // TODO Auto-generated method stub

    }
    }

    ***********************************************

    package storm;

    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;

    public class UpperBolt extends BaseBasicBolt {

    // 业务处理逻辑
    @Override
    public void execute(Tuple tuple, BasicOutputCollector collector) {
    // 先获取到上一个组件传递过来的数据,数据在tuple里面
    String godName = tuple.getString(0);

    // 将名称转换成大写
    String godName_upper = godName.toUpperCase();

    // 将转换完成的商品名发送出去
    collector.emit(new Values(godName_upper));
    }

    // 声明该bolt组件要发出去的tuple的字段
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("uppername"));
    }
    }



    [root@alamps bin]# mkdir /home/hadoop/stormoutput2 [root@alamps bin]# storm jar /home/hadoop/storm2.jar storm.TopoMain Running: /home/hadoop/app/jdk1.8.0_144/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/hadoop/app/storm -Dstorm.log.dir=/home/hadoop/app/storm/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/hadoop/app/storm/lib/storm-rename-hack-1.1.1.jar:/home/hadoop/app/storm/lib/log4j-core-2.8.2.jar:/home/hadoop/app/storm/lib/reflectasm-1.10.1.jar:/home/hadoop/app/storm/lib/log4j-over-slf4j-1.6.6.jar:/home/hadoop/app/storm/lib/minlog-1.3.0.jar:/home/hadoop/app/storm/lib/kryo-3.0.3.jar:/home/hadoop/app/storm/lib/objenesis-2.1.jar:/home/hadoop/app/storm/lib/disruptor-3.3.2.jar:/home/hadoop/app/storm/lib/storm-core-1.1.1.jar:/home/hadoop/app/storm/lib/servlet-api-2.5.jar:/home/hadoop/app/storm/lib/asm-5.0.3.jar:/home/hadoop/app/storm/lib/log4j-slf4j-impl-2.8.2.jar:/home/hadoop/app/storm/lib/log4j-api-2.8.2.jar:/home/hadoop/app/storm/lib/ring-cors-0.1.5.jar:/home/hadoop/app/storm/lib/slf4j-api-1.7.21.jar:/home/hadoop/app/storm/lib/clojure-1.7.0.jar:/home/hadoop/storm2.jar:/home/hadoop/app/storm/conf:/home/hadoop/app/storm/bin -Dstorm.jar=/home/hadoop/storm2.jar -Dstorm.dependency.jars= -Dstorm.dependency.artifacts={} storm.TopoMain 17406 [main] WARN o.a.s.u.Utils - STORM-VERSION new 1.1.1 old null 18444 [main] INFO o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -8576047804864739509:-7581013764627825431 21549 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : alamps:6627 22276 [main] INFO o.a.s.s.a.AuthUtils - Got AutoCreds [] 22320 [main] INFO o.a.s.u.NimbusClient - Found leader nimbus : alamps:6627 22817 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - jars... 22845 [main] INFO o.a.s.StormSubmitter - Uploading dependencies - artifacts... 22845 [main] INFO o.a.s.StormSubmitter - Dependency Blob keys - jars : [] / artifacts : [] 23487 [main] INFO o.a.s.StormSubmitter - Uploading topology jar /home/hadoop/storm2.jar to assigned location: /home/hadoop/app/storm/storm-local/nimbus/inbox/stormjar-408de160-c6d7-4d2c-895a-1416d94ea4a2.jar 24086 [main] INFO o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/hadoop/app/storm/storm-local/nimbus/inbox/stormjar-408de160-c6d7-4d2c-895a-1416d94ea4a2.jar 24086 [main] INFO o.a.s.StormSubmitter - Submitting topology demotopo2 in distributed mode with conf {"topology.acker.executors":0,"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-8576047804864739509:-7581013764627825431","topology.workers":4,"topology.debug":true} 24087 [main] WARN o.a.s.u.Utils - STORM-VERSION new 1.1.1 old 1.1.1 27476 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: demotopo2
  • 相关阅读:
    利用服务器部署Web项目
    JavaWeb解决文件上传后项目需要刷新问题 以及 图片过大 文件上传时间延长
    JavaWeb接入支付宝支付
    JavaWeb实现文件上传功能
    Eclipse Server中没有TomCat
    Eclipse 中没有Dynamic web Project 解决办法
    JavaWeb图形验证码
    elasticsearch常用语句
    关于elasticsearch的安装教程和使用方法
    解决el-table渲染卡顿组件
  • 原文地址:https://www.cnblogs.com/alamps/p/8311887.html
Copyright © 2011-2022 走看看