zoukankan      html  css  js  c++  java
  • strom 简单案例

    Topology

    package com.zxf.strom;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    
    /**
     * strom
     */
    public class MyTopology {
    
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("myspout",new MySpout(),2);
            topologyBuilder.setBolt("mybolt",new MyBolt(),2).shuffleGrouping("myspout");
            try {
                if(args.length==0) {
                    //这个是本地运行
                    LocalCluster localCluster = new LocalCluster();
                    localCluster.submitTopology("submitTopology",new Config(),topologyBuilder.createTopology());
                }else {
                    Config config = new Config();
                    config.setNumWorkers(2);
                    StormSubmitter.submitTopology("wordcount1", config, topologyBuilder.createTopology());
                }
            }catch (Exception e){
               System.out.println(e.getStackTrace());
            }
        }
    }

    Spout

    package com.zxf.strom;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     *
     */
    public class MySpout extends BaseRichSpout {
    
        private SpoutOutputCollector collector;
    
        private Long num = 0l;
        /**
         * 这个是strom 初始化 接口
         * @param map
         * @param topologyContext
         * @param spoutOutputCollector
         */
        public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            collector = spoutOutputCollector;
            System.out.println("RichSpout open 初始化了 ");
        }
    
        public void nextTuple() {
            num = num+1;
            collector.emit(new Values(num));
            System.out.println("RichSpout nextTuple 被调用了 ");
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("num"));
        }
    }

    Bolt

    package com.zxf.strom;
    
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.Map;
    
    
    /**
     *
     */
    public class MyBolt extends BaseBasicBolt {
    
        private long count = 0 ;
        /**
         *  BaseBasicBolt
         * @param topoConf
         * @param context
         */
        public void prepare(Map<String, Object> topoConf, TopologyContext context) {
            System.out.println("BaseBasicBolt 被初始化了");
        }
    
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            Long num = tuple.getLongByField("num");
            count = count+ num;
            System.out.println("BaseBasicBolt execute"+count);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
  • 相关阅读:
    HDOJ 1846 Brave Game
    并查集模板
    HDU 2102 A计划
    POJ 1426 Find The Multiple
    POJ 3278 Catch That Cow
    POJ 1321 棋盘问题
    CF 999 C.Alphabetic Removals
    CF 999 B. Reversing Encryption
    string的基础用法
    51nod 1267 4个数和为0
  • 原文地址:https://www.cnblogs.com/zxf330301/p/14034237.html
Copyright © 2011-2022 走看看