zoukankan      html  css  js  c++  java
  • 一个最简单的JStorm例子

    最简单的JStorm例子分为以下几个步骤:

    1、生成Topology

     1 Map conf = new HashMp();
     2 //topology所有自定义的配置均放入这个Map
     3 
     4 TopologyBuilder builder = new TopologyBuilder();
     5 //创建topology的生成器
     6 
     7 int spoutParal = get("spout.parallel", 1);
     8 //获取spout的并发设置
     9 
    10 SpoutDeclarer spout = builder.setSpout(SequenceTopologyDef.SEQUENCE_SPOUT_NAME,
    11                 new SequenceSpout(), spoutParal);
    12 //创建Spout, 其中new SequenceSpout() 为真正spout对象,SequenceTopologyDef.SEQUENCE_SPOUT_NAME 为spout的名字,注意名字中不要含有空格
    13 
    14 int boltParal = get("bolt.parallel", 1);
    15 //获取bolt的并发设置
    16 
    17 BoltDeclarer totalBolt = builder.setBolt(SequenceTopologyDef.TOTAL_BOLT_NAME, new TotalCount(),
    18                 boltParal).shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME);
    19 //创建bolt, SequenceTopologyDef.TOTAL_BOLT_NAME 为bolt名字,TotalCount 为bolt对象,boltParal为bolt并发数,
    20 //shuffleGrouping(SequenceTopologyDef.SEQUENCE_SPOUT_NAME), 
    21 //表示接收SequenceTopologyDef.SEQUENCE_SPOUT_NAME的数据,并且以shuffle方式,
    22 //即每个spout随机轮询发送tuple到下一级bolt中
    23 
    24 int ackerParal = get("acker.parallel", 1);
    25 Config.setNumAckers(conf, ackerParal);
    26 //设置表示acker的并发数
    27 
    28 int workerNum = get("worker.num", 10);
    29 conf.put(Config.TOPOLOGY_WORKERS, workerNum);
    30 //表示整个topology将使用几个worker
    31 
    32 conf.put(Config.STORM_CLUSTER_MODE, "distributed");
    33 //设置topolog模式为分布式,这样topology就可以放到JStorm集群上运行
    34 
    35 StormSubmitter.submitTopology(streamName, conf,
    36                 builder.createTopology());
    37 //提交topology

    2、IRichSpout

    IRichSpout 为最简单的Spout接口

     1  IRichSpout{
     2 
     3     @Override
     4     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
     5     }
     6 
     7     @Override
     8     public void close() {
     9     }
    10 
    11     @Override
    12     public void activate() {
    13     }
    14 
    15     @Override
    16     public void deactivate() {
    17     }
    18 
    19     @Override
    20     public void nextTuple() {
    21     }
    22 
    23     @Override
    24     public void ack(Object msgId) {
    25     }
    26 
    27     @Override
    28     public void fail(Object msgId) {
    29     }
    30 
    31     @Override
    32     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    33     }
    34 
    35     @Override
    36     public Map<String, Object> getComponentConfiguration() {
    37         return null;
    38     }

     其中注意:

    • spout对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
    • spout可以有构造函数,但构造函数只执行一次,是在提交任务时,创建spout对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将spout序列化到文件中去,在worker起来时再将spout从文件中反序列化出来)。
    • open是当task起来后执行的初始化动作
    • close是当task被shutdown后执行的动作
    • activate 是当task被激活时,触发的动作
    • deactivate 是task被deactive时,触发的动作
    • nextTuple 是spout实现核心, nextuple完成自己的逻辑,即每一次取消息后,用collector 将消息emit出去。
    • ack, 当spout收到一条ack消息时,触发的动作,详情可以参考 ack机制
    • fail, 当spout收到一条fail消息时,触发的动作,详情可以参考 ack机制
    • declareOutputFields, 定义spout发送数据,每个字段的含义
    • getComponentConfiguration 获取本spout的component 配置

    3、Bolt

     1 IRichBolt {
     2 
     3     @Override
     4     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
     5     }
     6 
     7     @Override
     8     public void execute(Tuple input) {
     9     }
    10 
    11     @Override
    12     public void cleanup() {
    13     }
    14 
    15     @Override
    16     public void declareOutputFields(OutputFieldsDeclarer declarer) {
    17     }
    18 
    19     @Override
    20     public Map<String, Object> getComponentConfiguration() {
    21         return null;
    22     }
    23 
    24 }

     其中注意:

    • bolt对象必须是继承Serializable, 因此要求spout内所有数据结构必须是可序列化的
    • bolt可以有构造函数,但构造函数只执行一次,是在提交任务时,创建bolt对象,因此在task分配到具体worker之前的初始化工作可以在此处完成,一旦完成,初始化的内容将携带到每一个task内(因为提交任务时将bolt序列化到文件中去,在worker起来时再将bolt从文件中反序列化出来)。
    • prepare是当task起来后执行的初始化动作
    • cleanup是当task被shutdown后执行的动作
    • execute是bolt实现核心, 完成自己的逻辑,即接受每一次取消息后,处理完,有可能用collector 将产生的新消息emit出去。 ** 在executor中,当程序处理一条消息时,需要执行collector.ack, 详情可以参考 ack机制 ** 在executor中,当程序无法处理一条消息时或出错时,需要执行collector.fail ,详情可以参考 ack机制
    • declareOutputFields, 定义bolt发送数据,每个字段的含义
    • getComponentConfiguration 获取本bolt的component 配置

    4、编译

    在Maven中配置

     1         <dependency>
     2             <groupId>com.alibaba.jstorm</groupId>
     3             <artifactId>jstorm-client</artifactId>
     4             <version>0.9.3.1</version>
     5             <scope>provided</scope>
     6         </dependency> 
     7 
     8 
     9          <dependency>
    10             <groupId>com.alibaba.jstorm</groupId>
    11             <artifactId>jstorm-client-extension</artifactId>
    12             <version>0.9.3.1</version>
    13             <scope>provided</scope>
    14         </dependency>

    如果找不到jstorm-client和jstorm-client-extension包,可以自己下载jstorm源码进行编译,请参考 源码编译

    打包时,需要将所有依赖打入到一个包中

     1 <build>
     2         <plugins>
     3 
     4             <plugin>
     5                 <artifactId>maven-assembly-plugin</artifactId>
     6                 <configuration>
     7                     <descriptorRefs>
     8                         <descriptorRef>jar-with-dependencies</descriptorRef>
     9                     </descriptorRefs>
    10                     <archive>
    11                         <manifest>
    12                             <mainClass>storm.starter.SequenceTopology</mainClass>
    13                         </manifest>
    14                     </archive>
    15                 </configuration>
    16                 <executions>
    17                     <execution>
    18                         <id>make-assembly</id>
    19                         <phase>package</phase>
    20                         <goals>
    21                             <goal>single</goal>
    22                         </goals>
    23                     </execution>
    24                 </executions>
    25             </plugin>
    26             <plugin>
    27                 <groupId>org.apache.maven.plugins</groupId>
    28                 <artifactId>maven-compiler-plugin</artifactId>
    29                 <configuration>
    30                     <source>1.6</source>
    31                     <target>1.6</target>
    32                 </configuration>
    33             </plugin>
    34         </plugins>
    35     </build>

    5、提交jar

    jstorm jar xxxxxx.jar com.alibaba.xxxx.xx parameter

    • xxxx.jar 为打包后的jar
    • com.alibaba.xxxx.xx 为入口类,即提交任务的类
    • parameter即为提交参数
  • 相关阅读:
    (转)Python之路,Day6
    (转)函数作用域,匿名函数,函数式编程,面向过程,面向对象
    (转)面向对象编程初步
    day26-多态、封装、反射
    (转)面向对象进阶
    MySql-Mysql技术内幕~SQL编程学习笔记(1)
    Spring MVC-学习笔记(4)数据绑定流程
    Mybatis-学习笔记(10)调用存储过程、存储函数
    Mybatis-学习笔记(9)Mybatis3+spring4+springMVC
    Mybatis-学习笔记(8)常用的注解
  • 原文地址:https://www.cnblogs.com/xymqx/p/4409155.html
Copyright © 2011-2022 走看看