zoukankan      html  css  js  c++  java
  • strom 简单案例

    Topology

    package com.zxf.strom;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    
    /**
     * strom
     */
    public class MyTopology {
    
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("myspout",new MySpout(),2);
            topologyBuilder.setBolt("mybolt",new MyBolt(),2).shuffleGrouping("myspout");
            try {
                if(args.length==0) {
                    //这个是本地运行
                    LocalCluster localCluster = new LocalCluster();
                    localCluster.submitTopology("submitTopology",new Config(),topologyBuilder.createTopology());
                }else {
                    Config config = new Config();
                    config.setNumWorkers(2);
                    StormSubmitter.submitTopology("wordcount1", config, topologyBuilder.createTopology());
                }
            }catch (Exception e){
               System.out.println(e.getStackTrace());
            }
        }
    }

    Spout

    package com.zxf.strom;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import java.util.Map;
    
    /**
     *
     */
    public class MySpout extends BaseRichSpout {
    
        private SpoutOutputCollector collector;
    
        private Long num = 0l;
        /**
         * 这个是strom 初始化 接口
         * @param map
         * @param topologyContext
         * @param spoutOutputCollector
         */
        public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            collector = spoutOutputCollector;
            System.out.println("RichSpout open 初始化了 ");
        }
    
        public void nextTuple() {
            num = num+1;
            collector.emit(new Values(num));
            System.out.println("RichSpout nextTuple 被调用了 ");
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("num"));
        }
    }

    Bolt

    package com.zxf.strom;
    
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseBasicBolt;
    import org.apache.storm.tuple.Tuple;
    
    import java.util.Map;
    
    
    /**
     *
     */
    public class MyBolt extends BaseBasicBolt {
    
        private long count = 0 ;
        /**
         *  BaseBasicBolt
         * @param topoConf
         * @param context
         */
        public void prepare(Map<String, Object> topoConf, TopologyContext context) {
            System.out.println("BaseBasicBolt 被初始化了");
        }
    
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            Long num = tuple.getLongByField("num");
            count = count+ num;
            System.out.println("BaseBasicBolt execute"+count);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    
        }
    }
  • 相关阅读:
    232. Implement Queue using Stacks
    231. Power of Two
    n&(n-1)位运算的妙用
    230. Kth Smallest Element in a BST
    关于UNIX的exec函数
    Go语言的各种Print函数
    Go语言的接口interface、struct和组合、继承
    Go语言知识点笔记
    Ubuntu自定义终端窗口位置
    Python的类变量和成员变量、类静态方法和类成员方法
  • 原文地址:https://www.cnblogs.com/zxf330301/p/14034237.html
Copyright © 2011-2022 走看看