zoukankan      html  css  js  c++  java
  • 基于Maven构建开发第一个Storm项目

      前面说过了Storm的测试项目,那么此时我们更想自己写一个小项目来练练手,首先我们自己的Windows系统上首先应该安装好maven,然后启动Eclipse for JavaEE版本,接下来开始建立项目并开发

      注意,在开发过程中,无论是Windows还是Linux都要完全关闭防火墙,避免网络的问题

      单击"File"->"New"->"Maven Project"

      

      接下来的界面默认即可,单击Next

      

      下一步,继续单击Next即可

      

      然后,在Group Id输入:org.apache.storm 在Artifact Id输入:firststorm 这里可以自己定义,在Version中输入版本号:0.9.6,这里其实默认0.1.0没有问题,这个和storm的版本号没有任何关系,这里是我们项目的版本号,因为只是测试,输入0.9.6是为了更简单;Package包名会自动根据输入生成,我们默认即可,然后单击Finish,稍等右下角滚动条滚动完毕,一个基本的Maven项目就建立成功了,具体结构和上一个测试案例相同,这时在包org.apache.storm.firststorm下有一个默认的类App.java,由Maven自动生成,这个可以忽略,也可以删除

      

      然后打开项目根目录下的pom.xml文件,这个就是构建项目的配置文件,我们在dependencies标签之间,添加一个节点,代码如下:

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.6</version>
        <scope>provided</scope>
    </dependency>

      加入位置如下图所示,其他的不用动即可

      

      最终pom.xml的代码如下:

     1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     2   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     3   <modelVersion>4.0.0</modelVersion>
     4 
     5   <groupId>org.apache.storm</groupId>
     6   <artifactId>firststorm</artifactId>
     7   <version>0.9.6</version>
     8   <packaging>jar</packaging>
     9   
    10 
    11   <name>firststorm</name>
    12   <url>http://maven.apache.org</url>
    13 
    14   <properties>
    15     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    16   </properties>
    17 
    18   <dependencies>
    19     <dependency>
    20       <groupId>junit</groupId>
    21       <artifactId>junit</artifactId>
    22       <version>3.8.1</version>
    23       <scope>test</scope>
    24     </dependency>
    25     
    26     <dependency>
    27       <groupId>org.apache.storm</groupId>
    28       <artifactId>storm-core</artifactId>
    29       <version>0.9.6</version>
    30       <scope>provided</scope>
    31     </dependency>
    32   </dependencies>
    33 </project>

      更简单的方法我们可以直接复制上一个案例中的pom.xml文件直接使用,现在我们保存pom.xml文件,保存的时候maven会自动下载相关依赖并放到Maven Dependencies下,这些jar包可以点击下拉查看,并且会自动添加到项目classpath中,作为编译使用,等jar包全部下载完毕,现在开始编写具体的计算逻辑了,在这个项目中我们把所有的类都建立在包org.apache.storm.firststorm下

      首先建立RandomSpout类作为数据源,并且继承于父类BaseRichSpout,确定后可以看到系统自动补全3个方法:nextTuple,open和declareOutputFields

      

      我们现在就需要重写这3个方法,open方法是数据源的初始化,nextTuple的作用是把Tuple发送至下游,declareOutputFields用来定义输出字段,下面我们手动分配一个数组,并且随机取里面的元素,代码如下:

     1 package org.apache.storm.firststorm;
     2 
     3 import java.util.Map;
     4 import java.util.Random;
     5 
     6 import backtype.storm.spout.SpoutOutputCollector;
     7 import backtype.storm.task.TopologyContext;
     8 import backtype.storm.topology.OutputFieldsDeclarer;
     9 import backtype.storm.topology.base.BaseRichSpout;
    10 import backtype.storm.tuple.Fields;
    11 import backtype.storm.tuple.Values;
    12 
    13 public class RandomSpout extends BaseRichSpout {
    14     
    15     private SpoutOutputCollector collector;
    16     private static String[] words = {"Hadoop","Storm","Apache","Linux","Nginx","Tomcat","Spark"};
    17     
    18 
    19     public void nextTuple() {
    20         String word = words[new Random().nextInt(words.length)];
    21         collector.emit(new Values(word));
    22 
    23     }
    24 
    25     public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
    26         this.collector = arg2;
    27     }
    28 
    29     public void declareOutputFields(OutputFieldsDeclarer arg0) {
    30         arg0.declare(new Fields("randomstring"));
    31     }
    32 
    33 }

      代码很简单,肯定可以看懂,然后新建一个类SenqueceBolt,继承于BaseBasicBolt类,并且重写方法execute和declareOutputFields,这个类就是用于执行具体的作业,准确的说是execute方法用来执行相关的计算,这里只是简单的输出,代码如下:

     1 package org.apache.storm.firststorm;
     2 
     3 import backtype.storm.topology.BasicOutputCollector;
     4 import backtype.storm.topology.OutputFieldsDeclarer;
     5 import backtype.storm.topology.base.BaseBasicBolt;
     6 import backtype.storm.tuple.Tuple;
     7 
     8 public class SenqueceBolt extends BaseBasicBolt {
     9 
    10     public void execute(Tuple arg0, BasicOutputCollector arg1) {
    11         String word = (String) arg0.getValue(0);
    12         String out = "Hello " + word + "!";
    13         System.out.println(out);
    14     }
    15 
    16     public void declareOutputFields(OutputFieldsDeclarer arg0) {
    17         
    18     }
    19 
    20 }

      最后建立一个类FirstStorm,这个类是主类,在main方法中定义Topology,并且综合设置Spout和Bolt,从而调用其中的方法,这里流式计算时间设置为30s,代码如下:

     1 package org.apache.storm.firststorm;
     2 
     3 import backtype.storm.Config;
     4 import backtype.storm.LocalCluster;
     5 import backtype.storm.StormSubmitter;
     6 import backtype.storm.generated.AlreadyAliveException;
     7 import backtype.storm.generated.InvalidTopologyException;
     8 import backtype.storm.topology.TopologyBuilder;
     9 import backtype.storm.utils.Utils;
    10 
    11 public class FirstStorm {
    12 
    13     public static void main(String[] args) {
    14         TopologyBuilder builder = new TopologyBuilder();
    15         builder.setSpout("spout", new RandomSpout());
    16         builder.setBolt("bolt", new SenqueceBolt()).shuffleGrouping("spout");
    17         Config conf = new Config();
    18         conf.setDebug(false);
    19         if(args != null && args.length > 0) {
    20             conf.setNumWorkers(3);
    21             try {
    22                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    23             } catch (AlreadyAliveException e) {
    24                 // TODO Auto-generated catch block
    25                 e.printStackTrace();
    26             } catch (InvalidTopologyException e) {
    27                 // TODO Auto-generated catch block
    28                 e.printStackTrace();
    29             }
    30         } else {
    31             LocalCluster cluster = new LocalCluster();
    32             cluster.submitTopology("firststorm", conf, builder.createTopology());
    33             Utils.sleep(30000);
    34             cluster.killTopology("firststorm");
    35             cluster.shutdown();
    36         }
    37     }
    38 
    39 }

      到这里一个简单的storm项目就开发完毕了,然后可以用本地模式运行,跑起来之后某一时刻输出结果如下:

      

      接下来我们将这个项目放到Storm服务器集群中运行,这里不要把Storm的jar包加进来,因为运行的时候,Storm环境会自动加载并协调集群运行,方法有很多,可以使用插件上传,也可以使用本地Storm客户端配置一下numbus.host进行提交,也可以在服务器节点上执行,执行后nimbus会得到任务并分发给各个supervisor去执行,首先我们应该将项目打包,右击项目,选择Export

      

      然后导出类型选择Java下的JAR file,点击Next

      

      然后单击Brower确定输出位置和文件名或者直接在输入框输入jar包的名称,然后单击Finish完成打包

      

      打包之后我们可以在输出位置看见一个jar文件

      

      然后我们将这个文件上传到服务器,这里上传到了storm安装目录下,然后这个时候在主节点storm安装目录下执行: bin/storm nimbus & 在从节点目录下分别执行 bin/storm supervisor & 启动整个集群的storm服务,也可以执行 bin/storm ui & 启动UI管理界面更直观的看到执行结果,当然对于单机环境启动或者不启动storm服务都可以,这个时候,执行下面命令运行本次项目的程序:

    bin/storm jar firststorm.jar org.apache.storm.firststorm.FirstStorm

      这里就是调用了FirstStorm类中的main方法,如果程序中对参数进行了处理,后面还可以跟上参数,回车确认执行之后,系统会进行初始化集群的工作,几秒后任务开始执行,执行过程中某一时刻的滚动输出如下:

      

      到这里,第一个Storm入门项目的开发和测试运行都完毕了,更复杂的计算逻辑模式也基本相同,主要就是Maven项目中出现了更复杂的模块和调用,整个运行的流程其实都是差不多的,现在就算步入Storm流式计算的殿堂的大门了,接下来的精彩还需要慢慢体会

     

  • 相关阅读:
    PATA 1071 Speech Patterns.
    PATA 1027 Colors In Mars
    PATB 1038. 统计同成绩学生(20)
    1036. 跟奥巴马一起编程(15)
    PATA 1036. Boys vs Girls (25)
    PATA 1006. Sign In and Sign Out (25)
    读取web工程目录之外的图片并显示
    DOS命令
    java连接oracle集群
    servlet
  • 原文地址:https://www.cnblogs.com/freeweb/p/5242631.html
Copyright © 2011-2022 走看看