zoukankan      html  css  js  c++  java
  • 初识Storm之HelloWorld程序源码

    1. 新建一个Maven项目,pom.xml代码如下:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.yg</groupId>
        <artifactId>storm</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>storm</name>
        <url>http://maven.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
    
            <dependency>
                <groupId>org.apache.storm</groupId>
                <artifactId>storm-core</artifactId>
                <version>1.1.3</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            
        </dependencies>
    
    
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass>com.path.to.main.Class</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
    
            </plugins>
        </build>
    
    </project>
    View Code

    2.新建HelloWorldSpout.java,代码如下:

    package com.yg.storm.spouts;
    
    import java.util.Map;
    import java.util.Random;
    
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    public class HelloWorldSpout extends BaseRichSpout{
    
        /**
         * 功能:随机生成字符串
         * 实现:先产生一个1-10随机整数,再不断产生一个1-10随机整数,若两者
         * 相等,则发射hello world,否则发送其他字符串
         */
        private static final long serialVersionUID = -5698117627723074157L;
        private static final int MAX_RANDOM = 10;
        private int referenceRandom;
        private SpoutOutputCollector collector;
        
        //构造函数
        public HelloWorldSpout(){
            //产生第一个随机数
            final Random rand  = new Random();
            referenceRandom = rand.nextInt(MAX_RANDOM);
        }
    
        //在spout加载时,打开一些资源(只在spout加载的时候执行一次)
        @Override
        public void open(Map conf, 
                TopologyContext context, 
                SpoutOutputCollector collector) {
            this.collector = collector;
            
        }
    
        //核心方法,storm会不断调用该方法,也就是方法执行完后会马上重置并再次执行
        @Override
        public void nextTuple() {
            
            Utils.sleep(1000);//停滞一秒
            final Random rand  = new Random();
            int instanceRandom = rand.nextInt(MAX_RANDOM);
            if (referenceRandom == instanceRandom){
                collector.emit(new Values("Hello World"));//有顺序的
            } else {
                collector.emit(new Values("Other Random Word"));
            }
        }
    
        //声明Tuple的字段名,有顺序的
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("sentence"));
            
        }
    
    }
    View Code

    3.新建HelloWorldBolt.java,代码如下:

    package com.yg.storm.bolts;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    
    public class HelloWorldBolt extends BaseRichBolt{
    
        /**
         * 功能:就收到spout发送的数据,打印并统计hello world的数量
         * 实现:打印,创建计数变量用于统计hello world
         */
        private static final long serialVersionUID = -5061906223048521415L;
        private int myCount = 0;//计数变量,不能在execute函数中初始化
        private TopologyContext context;//上下文变量
        private OutputCollector collector;
    
        //相当于spout中的open
        @Override
        public void prepare(Map stormConf, 
                TopologyContext context, 
                OutputCollector collector) {
            this.context = context;
            this.collector = collector;
        }
    
        //相当于spout中的nextTuple
        @Override
        public void execute(Tuple input) {        
            //拿到数据,用字段名取出
            String text = input.getStringByField("sentence");
            System.out.println("One tuple gets in: " + context.getThisTaskId() + text);
            if ("Hello World".equals(text)){
                myCount++;
                System.out.println("Found a Hello World! My count is now:" + myCount);            
            }
            collector.ack(input);//处理完成要通知Storm
    //        collector.fail(input);//处理失败要通知Storm    
            
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            
        }
    }
    View Code

    4.新建HelloWorldTopolog.java,代码如下:

    package com.yg.storm.topologies;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    import com.yg.storm.bolts.HelloWorldBolt;
    import com.yg.storm.spouts.HelloWorldSpout;
    
    public class HelloWorldTopology {
        
        //可以向main传递一个参数作为集群模式下的Topology的名字,若没有传入参数则使用本地模式
        public static void main(String[] args) {
            
            TopologyBuilder builder = new TopologyBuilder();
            
            builder.setSpout("hlSpout", new HelloWorldSpout());
            builder.setBolt("hlBolt", new HelloWorldBolt())
            .shuffleGrouping("hlSpout");
            
            Config conf = new Config();
            
            if (args != null && args.length > 0){
                //集群模式提交
                conf.setNumWorkers(3);
                
                try {
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (AlreadyAliveException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
            } else {
                //本地模式提交
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("test", conf, builder.createTopology());
                Utils.sleep(1000*60);
                cluster.killTopology("test");
                cluster.shutdown();
                
            }
        }
    
    }
    View Code

     直接本地运行HelloWorldTopology类即可.

  • 相关阅读:
    20172315 2018-2019-1 《程序设计与数据结构》第九周学习总结
    20172315 2018-2019-1 《程序设计与数据结构》实验二报告
    20172315 2018-2019-1 《程序设计与数据结构》第八周学习总结
    20172315 2018-2019-2 《程序设计与数据结构》第七周学习总结
    20172315 2018-2019-1 《程序设计与数据结构》第六周学习总结
    20172315 2018-2019-1 《程序设计与数据结构》第五周学习总结
    20172315 2018-2019-1 《程序设计与数据结构》第四周学习总结
    20172310 2018-2019-1《程序设计与数据结构》(下)课程总结
    Do-Now—团队 冲刺博客六
    Do-Now—团队冲刺博客三
  • 原文地址:https://www.cnblogs.com/dreamboy/p/11392809.html
Copyright © 2011-2022 走看看