zoukankan      html  css  js  c++  java
  • 基于Maven构建开发第一个Storm项目

      前面说过了Storm的测试项目,那么此时我们更想自己写一个小项目来练练手,首先我们自己的Windows系统上首先应该安装好maven,然后启动Eclipse for JavaEE版本,接下来开始建立项目并开发

      注意,在开发过程中,无论是Windows还是Linux都要完全关闭防火墙,避免网络的问题

      单击"File"->"New"->"Maven Project"

      

      接下来的界面默认即可,单击Next

      

      下一步,继续单击Next即可

      

      然后,在Group Id输入:org.apache.storm 在Artifact Id输入:firststorm 这里可以自己定义,在Version中输入版本号:0.9.6,这里其实默认0.1.0没有问题,这个和storm的版本号没有任何关系,这里是我们项目的版本号,因为只是测试,输入0.9.6是为了更简单;Package包名会自动根据输入生成,我们默认即可,然后单击Finish,稍等右下角滚动条滚动完毕,一个基本的Maven项目就建立成功了,具体结构和上一个测试案例相同,这时在包org.apache.storm.firststorm下有一个默认的类App.java,由Maven自动生成,这个可以忽略,也可以删除

      

      然后打开项目根目录下的pom.xml文件,这个就是构建项目的配置文件,我们在dependencies标签之间,添加一个节点,代码如下:

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.6</version>
        <scope>provided</scope>
    </dependency>

      加入位置如下图所示,其他的不用动即可

      

      最终pom.xml的代码如下:

     1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     3   <modelVersion>4.0.0</modelVersion>
     4 
     5   <groupId>org.apache.storm</groupId>
     6   <artifactId>firststorm</artifactId>
     7   <version>0.9.6</version>
     8   <packaging>jar</packaging>
     9   
    10 
    11   <name>firststorm</name>
    12   <url>http://maven.apache.org</url>
    13 
    14   <properties>
    15     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    16   </properties>
    17 
    18   <dependencies>
    19     <dependency>
    20       <groupId>junit</groupId>
    21       <artifactId>junit</artifactId>
    22       <version>3.8.1</version>
    23       <scope>test</scope>
    24     </dependency>
    25     
    26     <dependency>
    27       <groupId>org.apache.storm</groupId>
    28       <artifactId>storm-core</artifactId>
    29       <version>0.9.6</version>
    30       <scope>provided</scope>
    31     </dependency>
    32   </dependencies>
    33 </project>

      更简单的方法我们可以直接复制上一个案例中的pom.xml文件直接使用,现在我们保存pom.xml文件,保存的时候maven会自动下载相关依赖并放到Maven Dependencies下,这些jar包可以点击下拉查看,并且会自动添加到项目classpath中,作为编译使用,等jar包全部下载完毕,现在开始编写具体的计算逻辑了,在这个项目中我们把所有的类都建立在包org.apache.storm.firststorm下

      首先建立RandomSpout类作为数据源,并且继承于父类BaseRichSpout,确定后可以看到系统自动补全3个方法:nextTuple,open和declareOutputFields

      

      我们现在就需要重写这3个方法,open方法是数据源的初始化,nextTuple的作用是把Tuple发送至下游,declareOutputFields用来定义输出字段,下面我们手动分配一个数组,并且随机取里面的元素,代码如下:

     1 package org.apache.storm.firststorm;
     2 
     3 import java.util.Map;
     4 import java.util.Random;
     5 
     6 import backtype.storm.spout.SpoutOutputCollector;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.OutputFieldsDeclarer;
     9 import backtype.storm.topology.base.BaseRichSpout;
    10 import backtype.storm.tuple.Fields;
    11 import backtype.storm.tuple.Values;
    12 
    13 public class RandomSpout extends BaseRichSpout {
    14     
    15     private SpoutOutputCollector collector;
    16     private static String[] words = {"Hadoop","Storm","Apache","Linux","Nginx","Tomcat","Spark"};
    17     
    18 
    19     public void nextTuple() {
    20         String word = words[new Random().nextInt(words.length)];
    21         collector.emit(new Values(word));
    22 
    23     }
    24 
    25     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
    26         this.collector = arg2;
    27     }
    28 
    29     public void declareOutputFields(OutputFieldsDeclarer arg0) {
    30         arg0.declare(new Fields("randomstring"));
    31     }
    32 
    33 }

      代码很简单,肯定可以看懂,然后新建一个类SenqueceBolt,继承于BaseBasicBolt类,并且重写方法execute和declareOutputFields,这个类就是用于执行具体的作业,准确的说是execute方法用来执行相关的计算,这里只是简单的输出,代码如下:

     1 package org.apache.storm.firststorm;
     2 
     3 import backtype.storm.topology.BasicOutputCollector;
     4 import backtype.storm.topology.OutputFieldsDeclarer;
     5 import backtype.storm.topology.base.BaseBasicBolt;
     6 import backtype.storm.tuple.Tuple;
     7 
     8 public class SenqueceBolt extends BaseBasicBolt {
     9 
    10     public void execute(Tuple arg0, BasicOutputCollector arg1) {
    11         String word = (String) arg0.getValue(0);
    12         String out = "Hello " + word + "!";
    13         System.out.println(out);
    14     }
    15 
    16     public void declareOutputFields(OutputFieldsDeclarer arg0) {
    17         
    18     }
    19 
    20 }

      最后建立一个类FirstStorm,这个类是主类,在main方法中定义Topology,并且综合设置Spout和Bolt,从而调用其中的方法,这里流式计算时间设置为30s,代码如下:

     1 package org.apache.storm.firststorm;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.StormSubmitter;
     6 import backtype.storm.generated.AlreadyAliveException;
     7 import backtype.storm.generated.InvalidTopologyException;
     8 import backtype.storm.topology.TopologyBuilder;
     9 import backtype.storm.utils.Utils;
    10 
    11 public class FirstStorm {
    12 
    13     public static void main(String[] args) {
    14         TopologyBuilder builder = new TopologyBuilder();
    15         builder.setSpout("spout", new RandomSpout());
    16         builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
    17         Config conf = new Config();
    18         conf.setDebug(false);
    19         if(args != null && args.length > 0) {
    20             conf.setNumWorkers(3);
    21             try {
    22                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    23             } catch (AlreadyAliveException e) {
    24                 // TODO Auto-generated catch block
    25                 e.printStackTrace();
    26             } catch (InvalidTopologyException e) {
    27                 // TODO Auto-generated catch block
    28                 e.printStackTrace();
    29             }
    30         } else {
    31             LocalCluster cluster = new LocalCluster();
    32             cluster.submitTopology("firststorm", conf, builder.createTopology());
    33             Utils.sleep(30000);
    34             cluster.killTopology("firststorm");
    35             cluster.shutdown();
    36         }
    37     }
    38 
    39 }

      到这里一个简单的storm项目就开发完毕了,然后可以用本地模式运行,跑起来之后某一时刻输出结果如下:

      

      接下来我们将这个项目放到Storm服务器集群中运行,这里不要把Storm的jar包加进来,因为运行的时候,Storm环境会自动加载并协调集群运行,方法有很多,可以使用插件上传,也可以使用本地Storm客户端配置一下numbus.host进行提交,也可以在服务器节点上执行,执行后nimbus会得到任务并分发给各个supervisor去执行,首先我们应该将项目打包,右击项目,选择Export

      

      然后导出类型选择Java下的JAR file,点击Next

      

      然后单击Brower确定输出位置和文件名或者直接在输入框输入jar包的名称,然后单击Finish完成打包

      

      打包之后我们可以在输出位置看见一个jar文件

      

      然后我们将这个文件上传到服务器,这里上传到了storm安装目录下,然后这个时候在主节点storm安装目录下执行: bin/storm nimbus & 在从节点目录下分别执行 bin/storm supervisor & 启动整个集群的storm服务,也可以执行 bin/storm ui & 启动UI管理界面更直观的看到执行结果,当然对于单机环境启动或者不启动storm服务都可以,这个时候,执行下面命令运行本次项目的程序:

    bin/storm jar firststorm.jar org.apache.storm.firststorm.FirstStorm

      这里就是调用了FirstStorm类中的main方法,如果程序中对参数进行了处理,后面还可以跟上参数,回车确认执行之后,系统会进行初始化集群的工作,几秒后任务开始执行,执行过程中某一时刻的滚动输出如下:

      

      到这里,第一个Storm入门项目的开发和测试运行都完毕了,更复杂的计算逻辑模式也基本相同,主要就是Maven项目中出现了更复杂的模块和调用,整个运行的流程其实都是差不多的,现在就算步入Storm流式计算的殿堂的大门了,接下来的精彩还需要慢慢体会

     

  • 相关阅读:
    jmeter跨平台执行时的文件路径问题
    jenkins配置
    jmeter--负载测试
    jmeter-脚本制作
    jmeter学习-性能指标、jmeter初识
    功能测试--其他
    功能测试--Fiddler
    功能测试--APP专项
    功能测试--基础(二)
    功能测试-基础(一)
  • 原文地址:https://www.cnblogs.com/freeweb/p/5242631.html
Copyright © 2011-2022 走看看