zoukankan      html  css  js  c++  java
  • Storm入门3-集群搭建


    【storm集群的搭建以及将开发好的拓扑提交到集群上运行的方法】

      在上一篇文章中,我们的拓扑直接运行,并在程序开始时候自动启动一个本地"集群"来运行拓扑。LocalCluster这种方式一般用于本地的开发和调试。而在实际的生产环境中,我们一般是有搭建好的storm集群,我们开发完topology后会提交到集群中的主节点nimbus,由nimbus来向supervisor分发代码,并启动woker来运行topology.下面我们将在本地搭建一个storm运行环境,并将开发好的WordCountTopology提交到本地"集群"上来运行。

      本地"集群"搭建

    1. 安装好JDK并配置系统环境变量;http://www.cnblogs.com/jonyo/p/5656386.html
    2. 安装zookepeer并配置系统环境变量;【待加。。。。】
    3. 下载storm压缩包,解压放到 /usr/local/下,在.zshrc中配置相关的环境变量
       export STORM_HOME=/usr/local/storm            
       export PATH=$PATH:$STORM_HOME/bin
    4. 进入到storm/conf下,修改storm.yaml配置文件,在末尾添加
      storm.zookeeper.servers:
          - "localhost"
      storm.local.dir: "/usr/local/storm/data"
      storm.zookeeper.port: 2181
      nimbus.host: "localhost"
      ui.port: 8080
      supervisor.slots.ports:
          - 6700
          - 6701
          - 6702
          - 6703
      

      这个文件主要是对storm使用的zookepeer地址,集群的nimbus,supervisor等信息进行了相关配置

    5. 步骤1~4完成以后,按照下面就可以启动集群了
      (1)启动zookepeer: 在/usr/local/zookeeper/bin下执行sh zkServer.sh start   接着执行sh zkServer.sh status查看启动状态,出现Mode: standalone即表示成功
      (2)启动nimbus进程:在/usr/local/storm/bin下执行 sh storm nimbus,会看到nimbus启动信息

      (3)启动supervisor进程,由于都是在同一台机器上启动的,需要重新开启一个控制台。在/usr/local/storm/bin下执行 sh storm supervisor

      (4)为了便于查看集群的信息,将ui也启动,在/usr/local/storm/bin下执行 sh storm ui, 然后通过localhost:8080可以查看集群状态

      UI界面,可以对拓扑进行管理,查看集群状态等等;由于没有任何拓扑提交,可以看到Topology Summary是空的,没有信息

      (5)将之前的WordCountTopology.java文件修改如下
       1 //        //建立本地集群,利用LocalCluster,storm在程序启动时会在本地自动建立一个集群,不需要用户自己再搭建,方便本地开发和debug
       2 //        LocalCluster cluster = new LocalCluster();
       3 //
       4 //        //创建拓扑实例,并提交到本地集群进行运行
       5 //        cluster.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
       6 
       7         try {
       8             //submitTopology方法负责发送Topology到集群,[新增的代码]
       9             StormSubmitter.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
      10         } catch (Exception e) {
      11             e.printStackTrace();
      12         }

      修改完成后,利用maven打成jar包:mvn compile       mvn clean package

      (6)将打出的jar包(拓扑)提交到集群上执行
      storm jar /Users/.../workspace/storm-learning/target/storm-learning-1.0-SNAPSHOT.jar  wordCount.WordCountTopology  wordCountTest
      在此打开localhost:8080会发现WordCountTopology在执行,并显示出执行的状态和相关信息




      拓扑提交到nimbus之后都发生了什么事情?即通过storm jar xxxxx.jar  xxx.Main agrs这个命令之后操作是怎么样的?

      storm jar会执行.../storm/bin目录下的storm.py脚本文件里定义的def jar(jarfile, klass, *args),在这个脚本中,通过一系列的设置,让后续main方法中的调用的StormSubmitter.submitTopology(name, conf, builder.createTopology())找到jar所在的地址,然后通过soket传输,将Jar文件上传到nimbus,nimbus在接收到jar文件后,存放到数据目录的inbox目录(inbox是在用户设置的storm.local.dir变量所指定的目录的nimbus下)
      inbox用于存放提交的Jar文件,每个Jar被重命名为stormjar+32位的uuid, 比如的的执行提交后是stormjar-a22e9169-e377-45db-9a07-a97357ccfbef.jar

      stormdist存放的是启动topology后生成的文件,每个topology都分配一个唯一的id,ID的规则是“name-计数-时间戳”。启动后的topology的jar文件名命名为storm.jar ,而它的配置经过java序列化后存放在stormconf.ser文件,而stormcode.ser是将topology本身序列化后存放的文件。这些文件在部署的时候,supervisor会从这个目录下载这些文件,然后在supervisor本地执行这些代码。

      进入重点,topology任务的分配过程(zookeeper路径说明忽略root):
      1.在zookeeper上创建/taskheartbeats/{storm id} 路径,用于任务的心跳检测。storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。task将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
      2.从topology中获取bolts,spouts设置的并行数目以及全局配置的最大并行数,然后产生task id列表,如[1 2 3 4]
      3.在zookeeper上创建/tasks/{strom id}/{task id}路径,并存储task信息
      4.开始分配任务(内部称为assignment), 具体步骤:
       (1)从zk上获得已有的assignment(新的toplogy当然没有了)
       (2)查找所有可用的slot,所谓slot就是可用的worker,在所有supervisor上配置的多个worker的端口。
       (3)将任务均匀地分配给可用的worker,这里有两种情况:
       (a)task数目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最终是这样分配
      {1: [host1:port1] 2 : [host2:port1]
               3 : [host1:port1] 4 : [host2:port1]}
      ,可以看到任务平均地分配在两个worker上。
      (b)如果task数目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先会将woker排序,将不同host间隔排列,保证task不会全部分配到同一个worker上,也就是将worker排列成
      [host1:port1 host2:port1 host1:port2 host2:port2]
      ,然后分配任务为
      {1: host1:port1 , 2 : host2:port2}

      (4)记录启动时间
      (5)判断现有的assignment是否跟重新分配的assignment相同,如果相同,不需要变更,否则更新assignment到zookeeper的/assignments/{storm id}上。
      5.启动topology,所谓启动,只是将zookeeper上/storms/{storm id}对应的数据里的active设置为true。
      6.nimbus会检查task的心跳,如果发现task心跳超过超时时间,那么会重新跳到第4步做re-assignment。

      最后的topology任务的分配过程 参考:http://blog.csdn.net/xiaolang85/article/details/38065185
  • 相关阅读:
    JNI实例(含代码)
    sizeof()小结
    【转】C++ 关键字——friend
    curl资料小结
    【Curl (libcurl) 开发 之一】Cocos2dx之libcurl(curl_easy)的编程教程(帮助手册)!
    JSP中统一错误信息输出
    在号码池取连续号码的算法
    常用的文件操作方法
    走出基金净值的误区
    ResultSet概论
  • 原文地址:https://www.cnblogs.com/jonyo/p/5861835.html
Copyright © 2011-2022 走看看