zoukankan      html  css  js  c++  java
  • storm入门(三)HelloWorld示例

    一、配置开发环境

    storm有两种操作模式: 本地模式和远程模式。使用本地模式的时候,你可以在你的本地机器上开发测试你的topology, 一切都在你的本地机器上模拟出来; 用远程模式的时候你提交的topology会在一个集群的机器上执行。

    建议使用maven,只需要加上storm的依赖就可以了。

    <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>storm-core</artifactId>
          <version>1.1.0</version>
          <scope>provided</scope>
        </dependency>

    pom.xml

    复制代码
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>cn.ljh.storm</groupId>
      <artifactId>storm-helloworld</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>storm-helloworld</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.12</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.storm</groupId>
          <artifactId>storm-core</artifactId>
          <version>1.1.0</version>
          <scope>provided</scope>
        </dependency>
      </dependencies>
      <build>
          <plugins>
              <plugin>
              <artifactId>maven-assembly-plugin</artifactId>
              <configuration>
                <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                  <manifest>
                    <mainClass>cn.ljh.storm.helloworld.ExclamationTopology</mainClass>
                  </manifest>
                </archive>
              </configuration>
            </plugin>
          </plugins>
      </build>
    </project>
    复制代码

    二、HelloWorld关联代码

    ExclamationTopology.java

    复制代码
    package cn.ljh.storm.helloworld;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    public class ExclamationTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout("word", new TestWordSpout(), 1);
            builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word");
            builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim");
    
            Config conf = new Config();
            conf.setDebug(true);
    
            if (args != null && args.length > 0) {
              conf.setNumWorkers(3);
    
              StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
            }
            else {
    
              LocalCluster cluster = new LocalCluster();
              cluster.submitTopology("test3", conf, builder.createTopology());
              Utils.sleep(20000);
              cluster.killTopology("test3");
              cluster.shutdown();
            }
          }
    }
    复制代码

    TestWordSpout.java

    复制代码
    package cn.ljh.storm.helloworld;
    
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import java.util.Map;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    import java.util.Random;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    public class TestWordSpout extends BaseRichSpout {
       public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
       SpoutOutputCollector _collector;
           
       public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
           _collector = collector;
       }
       
       public void nextTuple() {
           Utils.sleep(100);
           final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
           final Random rand = new Random();
           final String word = words[rand.nextInt(words.length)];
           _collector.emit(new Values(word));
       }
       
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
           declarer.declare(new Fields("word"));
       }
    }
    复制代码

    ExclamationBolt.java

    复制代码
    package cn.ljh.storm.helloworld;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class ExclamationBolt extends BaseRichBolt {
        OutputCollector _collector;
    
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
          _collector = collector;
        }
    
        public void execute(Tuple tuple) {
          _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
          _collector.ack(tuple);
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word"));
        }
    
      }
    复制代码

    PrintBolt.java

    复制代码
    package cn.ljh.storm.helloworld;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class PrintBolt extends BaseRichBolt {
            private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class);
            OutputCollector _collector;
    
            public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
              _collector = collector;
            }
    
            public void execute(Tuple tuple) {
              LOG.info(tuple.getString(0) + " Hello World!");
              _collector.ack(tuple);
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }
    }
    复制代码

    三、实际运行

    storm有本地模式和远程模式。

    1、本地模式

    本地模式一般用于测试和开发阶段,直接在Eclipse执行ExclamationTopology的main函数进行。

    本地模式的代码中有设置睡眠时间,到时间后主动kill topoloyg。

    Utils.sleep(20000);

    开始设置的时间是10S,运行log中没有期待的输出,反而出现以下错误。

    org.apache.storm.shade.org.apache.zookeeper.server.ServerCnxn$EndOfStreamException: Unable to read additional data from client sessionid 
        0x15c8a2872ac000f, likely client has closed socket
        at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:228) [storm-core-1.1.0.jar:1.1.0]
        at org.apache.storm.shade.org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) [storm-core-1.1.0.jar:1.1.0]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

    后面设置时间为20S,运行log中也有上面错误,但是有期待的输出。

    image

    原因是机器比较慢,还没初始化完就到时间跳出了,所以把睡眠时间设置大些。

    2、远程模式

    集群模式需要先创建一个包含程序代码以及代码所依赖的依赖包的jar包(有关storm的jar包不用包括, 这些jar包会在工作节点上自动被添加到classpath里面去)。如果使用maven, 那么插件:Maven Assembly Plugin可以帮你打包,只要把下面的配置加入pom.xml。

    复制代码
    <plugin>
              <artifactId>maven-assembly-plugin</artifactId>
              <configuration>
                <descriptorRefs>
                  <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                  <manifest>
                    <mainClass>cn.ljh.storm.helloworld.ExclamationTopology</mainClass>
                  </manifest>
                </archive>
              </configuration>
            </plugin>
    复制代码

    然后运行mvn assembly:assembly就可以打包了.

    image

    (1)用storm提交topology

    storm jar storm-helloworld-0.0.1-SNAPSHOT-jar-with-dependencies.jar cn.ljh.storm.helloworld.ExclamationTopology ExclamationTest

    运行提交命令后,出现如下log,说明提交成功。

    image

    查看集群的进程jps,两个Supervisor节点出现了worker进程

    imageimage

    在Nimbus节点的/usr/local/storm/data/nimbus/inbox下面有提交的jar

    image

    UI界面显示提交topology

    image

    image

    image

    (2)终止一个topology

    要终止一个topology, 执行:

    storm kill {stormname}

    其中{stormname}是提交topology给storm集群的时候指定的名字。

    storm不会马上终止topology。相反,它会先终止所有的spout,让它们不再发射任何新的tuple, storm会等Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS秒之后才杀掉所有的工作进程。这会给topology足够的时 间来完成所有我们执行storm kill命令的时候还没完成的tuple。

    (3)更新一个运行中的topology

    为了更新一个正在运行的topology, 唯一的选择是杀掉正在运行的topology然后重新提交一个新的。

    至此HelloWorld示例完成。

    四、常见配置

    有很多topology级的配置可以设。 以”TOPOLOGY”打头的配置是topology级别的配置,可以覆盖全局级别的配置。下面是一些比较常见的:

    1)Config.TOPOLOGY_WORKER设置:  这个设置用多少个工作进程来执行这个topology。比如,如果你把它设置成25, 那么集群里面一共会有25个java进程来执行这个topology的所有task。如果你的这个topology里面所有组件加起来一共有150的并行 度,那么每个进程里面会有6个线程(150 / 25 = 6)。

    2)Config.TOPOLOGY_ACKERS: 这个配置设置acker线程的数目。Ackers是Storm的可靠性API的一部分。

    3)Config.TOPOLOGY_MAX_SPOUT_PENDING:  这个设置一个spout task上面最多有多少个没有处理的tuple(没有ack/failed)回复, 我们推荐你设置这个配置,以防止tuple队列爆掉。

    4)Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS: 这个配置storm的tuple的超时时间  – 超过这个时间的tuple被认为处理失败了。这个设置的默认设置是30秒,对于大多数的topology都已经足够了。

    5)Config.TOPOLOGY_SERIALIZATIONS: 为了在你的tuple里面使用自定义类型,你可以用这个配置注册自定义serializer。

  • 相关阅读:
    JAVA中的super和this关键字的使用
    JAVA中类以及成员变量和成员方法的修饰符的总结
    JAVA中的抽象类和接口
    JAVA对数据库进行操作,实现数据库中数据的插入,查询,更改,删除操作
    完整日期正则表达式
    2017实习【Java研发】面经
    MySQL事务及隔离级别(读书小结)
    Java类编译、加载、和执行机制
    JVM内存回收机制
    Centos6.5的MySQL5.7.15二进制源码单机版安装
  • 原文地址:https://www.cnblogs.com/liuys635/p/10786470.html
Copyright © 2011-2022 走看看