zoukankan      html  css  js  c++  java
  • strom基础概念

    一、Storm topology提交到集群分析 

       storm目前1.x版本支持nimbus的高可用(其实也可以不需要高可用,因为nimbus是无状态的,只要运行的topology没有故障且没有新的任务需要提交到storm集群,那么也可以不要nimbus,因为是worker在运行任务,nimbus只是负责任务分配,资源调度且和supervisor保持心跳我们可以做好整个集群的监控即可,当nimbus挂了后,直接重启它,不会影响正在运行的topology)。
        当我们将topology提交到storm集群的时候,如果你搭的环境是一套高可用的环境,首先需要找到leader nimbus节点,因为需要向leader节点提交我们的拓扑,当调用submitTopology的时候,首先会进行相关的配置校验,然后找到配置的nimbus的ip,循环找到为leader的ip地址将其构建成一个NimbusClient返回,然后开始提交jar,我们可以把这个过程抽象成,我们是thift client通过RPC向server提交jar,jar上传完成后,通知Nimbus上传任务已经完成,nimbus接收到jar后将jar重命名后保存到inbox目录下,然后进行配置检查,任务分配。
       nimbus首先会检查它的配置信息,以及整个storm集群中可用的slots(可用理解为worker),
       第一步:检查整个集群中可用的worker,然后根据配置的worker数分配
       第二步:在zk中创建任务的心跳检测节点/storm/workbeats/,storm对zookeeper的一个重要应用就是利用zk的临时节点做存活检测。任务将定时刷新节点的时间戳,然后nimbus会检测这个时间戳是否超过timeout设置。
       第三步:开始分配任务,根据topology配置的task数和worker数,进行分配
         3.1 如果task数目比worker数多
              比如task 为4个,worker为2个
              task1,task2,taks3,task4
              worker1,woker2, 那么效果就是task1到worker1,task2到woker2,这样依次轮询
         3.2 如果worker数比task数目多
             比如task5个,worker 8 个 基本会轮询保证task不会全部分配到同一个worker上
       第四步:supervisor下载分配给自己的topology下载完成后然后运行。


    二、高可用测试


           storm nimbus的单点问题解决了在1.版本,主要理由zk的Ledaer选举实现,其实主要是依靠分布式互斥锁来实现,我们可以将zk的一个数据节点代表一个锁,当多个客户端同时调用create()节点创建节点的时候,zookeeper会保证只会有一个客户端创建成功,那么我们就可以让这个创建成功的客户端让其持有锁,而其它的客户端则注册Watcher监听,当持有锁的客户端释放锁后,监听的客户端就会收到Watcher通知,然后再去试图获取锁。
       1、直接停掉nimbus leader :已经运行的任务不会有任何问题  
      2、任务启动过程中,立刻停掉nimbus leader 
         如果此时新Leader还没选举出来,任务提交会失败
         如果新的Leader已经产生了,任务提交成功
       3、关闭worker所在机器的supervisor守护进程(默认3秒检测一次worker进程)--> Kill -9 $workerPid ---> 关闭Nimbus Leader --->worker没有重新启动/飘走-->1分钟后启动supervisor --->worker飘移走
       4、storm的高可用利用分布式缓存API来实现数据的备份,实现文件在多个topology之间的共享。


    三、storm的主要特点


           1、水平扩展:可以直接通过加机器来提高整个集群处理任务的速率,不仅可以直接添加机器作为supervisor节点,而且可以动态调整并发度来改变运行某个topology的线程数
        2、容错性比较好:当Nimbus挂了,可以通过高可用来选举出一个新的Leader(当然nimbus挂了也不是什么大问题,因为运行任务不在nimbus节点上),当supervisor挂了,nimbus会通过心跳检测到,然后会将运行在这个supervisor节点上的所有任务转移到其它节点上,当worker挂了,即运行任务的工作进程挂了,首先supervisor会尝试重启它,如果没启动成功,那么nimbus就会将运行在worker上的任务转移到其它可用的worker节点上运行。
        3、消息的可靠性处理:storm默认提供3种类型,
          仅仅一次:原生storm api无法实现,需要用的storm 高级部分trident实现 
          最多一次:也就是消息最多只发一次,不管这条消息有没有被成功处理
          至少一次:消息如果处理失败后,会重发
        4、节点的无状态:状态都保存在zk里面,当我们使用kill -9 $nimbus_pid的时候,不影响整个集群的运行。

    四、运行Storm的相关术语

          1、topology:拓扑,运行一组spout/bolt构成的计算逻辑组件的总称
       2、spout:storm消息来源,一般的话是消息队列
       3、bolt:处理消息逻辑的组件,它的上游可以是spout也可以是Bolt
       4、tuple:运行的消息单元,比如句子 zhangsan is a man 这个消息就是一个tuple
       5、Stream:流,由一条条消息组织的,抽象成流
       6、Stream Grouping:流分组,定义各个计算组件之间流的连接,分组,分发策略等


    五、看一个简单的例子(测试storm的容错性)


          从消息队列Kafka接收消息,然后进行单词拆分,然后统计单词出现的次数.起一个线程从消息队列收消息,3个线程进行单词拆分,1个线程进行全局计数
        1、如果nimbus挂了怎么办?
        2、如果supervisor 挂了怎么办?
        3、如果worker挂了怎么办?
        4、节点挂了怎么办?
        第一个问题:nimbus是无状态的,nimbus挂了不影响整个集群中已经在运行的topology的运行。
       第二个问题:supervisor守护进程挂了,没有影响。
       第三个问题:worker挂了怎么办? supervisor会尝试重启,如果没有启动成功,则发生worker转移,转移到其它可用的节点上运行。
       第四个问题:Node节点挂了怎么办? 这个实际上也可用理解为worker挂了,也会发生worker转移。


    六、storm的并发度


          一个topology可用跑在多个worker进程上,自然可用想的在一个进程里面,我们可以跑多个线程,再一次加快数据的处理效率,默认情况下,如果我们不设置线程数,就是1,且默认情况下,一个线程对应一个task任务(task可以理解为spout/bolt实例)
       其中,每一个worker默认起一个acker,也就是说每一个worker默认起一个线程来执行acker,而acker实际上就是一个特殊的bolt,所以在计算整个集群线程数的时候,需要加上acker


    七、流分组


           流分组定义了上层tuple如何路由到下一层tuple,storm里面默认有以下几种。注意,首先下层必须有多个线程,不然一个线程定义流分组没有意义。
       1、shuffleGrouping : 轮询
       2、filedGrouping:按照上一层的输出属性,相同属性的会被路由到同一个线程处理
       3、allGrouping:广播分组,对于每一个tuple,所有的Bolt都会收到
       4、globalGrouping:全局分组,只会到bolt里面其中task-id最小的那一个
       5、noGrouping:不分组,随机
       6、DirectGrouping:直接分组上一层可以直接指定想把该消息路由到下一层的哪个bolt
       7、自定义分组
       8、localorshuffleGrouping:如果上层bolt和下层bolt运行在同一个进程里面,优先在进程内通信,使用Disruptor有界队列


    八、storm的可靠性测试

      8.1: 消息的ack

          当一个spout数据源发射了消息后,一个tuple经过了所有的Bolt处理之后,这个tuple才被认为是处理完了,而acker就是一个用来追踪这些消息的组件,
    acker内部类似于一个map,维护了消息id和顶层spoutid直接的关系 
        1、如果task挂了,一个tuple没有被ack,那么会在超时之后(30s)spout负责重发
        2、如果acker挂了,那么由这个tuple追踪的所有的tuple都会超时,也会被重发
        3、那么spout挂了,第三方消息源负责重发

       8.2 如何实现可靠的spout

         storm的bolt和spout有很多接口和抽象类,我们可以实现ISpout接口,首先需要注意的是spout里面,nextTuple,ack,fail是运行在同一个线程里面,所有不要在nextTuple里面执行一些比较耗时的处理,默认30s超时,如果acker-30s还没有收到那么就会超时重发,且nextTuple不能阻塞,如果没有消息发射,他会sleep一下,释放cpu,而如果我们想实现可靠的spout,需要自己维护msgId和消息直接的对应关系,可以放在map,redis里面,并且自己实现fail函数进行消息重发。

       8.3 如何实现可靠的bolt

        storm提供两种不同类型的 Bolt,分别是 BaseRichBolt 和 BaseBasicBolt都可以实现消息的可靠性处理,其中BaseBasicBolt,storm已经帮我们内部自己实现了消息的可靠处理,BasicOutputCollector在emit数据的时候,会自动和输入的tuple相关联,而在execute方法结束的时候那个输入tuple会被自动ack 。
       而BaseRichBolt则需要我们自己维护msgid和tuple之间的关系并手动实现ack或者fail
       下面来分析一下BaseBasicBolt是怎么实现了,当我们调用emit的时候,由于没有streamId和tuple之间的对应关系,storm会给我们自己生成一个默认的流id,最后会起一个BasicBoltExecutor,这里面的excetue方法自己给我们实现了ack和失败后的fail

       8.4 针对storm提交的3种消息保证语义,在ack的基础上来看一下如何实现

        8.4.1、如何实现最多一次(下面几个条件只要有一个满足,都只能实现最多一次语义)
           1)将ack的个数设置为0
           2)Spout不实现可靠的消息处理
              不带msgId或者不实现fail函数
           3)bolt不把处理的消息发送给acker
        8.4.2 如何实现至少一次
           1)开启ack机制,即ack数目大于0(默认每个worker一个ack)
           2)Spout 实现可靠性传输保证
             Spout 发送消息时附带 message 的 ID
             如果收到 Acker 的处理失败反馈,需要进行消息重传,即实现 fail 函数
           3)Bolt 在处理成功或失败后需要调用相应的方法通知 Acker  
        8.4.3 实现仅仅一次
        storm原生api不能做到,要实现仅仅一次,需要存储tuple的状态,用来防止重复发送的数据被重复处理,在 Storm 中使用 Trident API 实现


    九、思考?
       1、storm消息重发,重发的消息从哪来?重复的消息怎么处理?
          storm需要维护msgId和具体tuple之间的关系  
       2、如果一个bolt总是处理失败导致spout重发,维护的数据越来越多,最后OOM,怎么办?  
           storm在以前的版本里面可以设置max.pending数(前提条件是必须开启ack),即如果spout组件发现还有这么多消息没有给他ack,当达到这个阀值的时候,就不会在接着发tuple给bolt
         PS:这种设置方法个人感觉值得商榷,首先这个值应该设置多少,设置小了,那么整个集群的吞吐量上不去,设置大了很有可能导致内存溢出。


    十、事物


         storm事物在0.7版本后封装到trident中,虽然原生的事物api已经废弃,但是对于我们理解高级部分trident还是非常有帮助的。事物在storm里面主要分为2个节点,事物的处理阶段和提交阶段,事物的处理阶段是可以并行处理的,但是是提交阶段必须按照事物id依次排队处理。
       storm里面事物主要分为3种,一种是普通的事物,一种是分区事物,一种是不透明事物(事物的更高级封装)
      1、普通事物ITransactionSpout接口,事物处理阶段起多个线程,提交阶段起1个线程
         Coordinator:初始化事物
         Emitter:emitBatch接收初始化事物生成的事物元数据,发送出去
      2、分区事物IPartitionTransactionSpout接口,主流的事物Spout
          Coordinator:isReady和返回分区个数
          Emitter: emitPartitionBatchNew 发送新分区的元数据
                   emitPartitionBatch 事物处理失败后重发
      3、不透明事物IOpaqueTransactionSpout,容错性最好的事物Spout
          它不区分每一批是不是发送的tuple是一样的,因为即使某个分区不可用,它还会继续发送可用的分区,等其他不可用的分区可用后,这些分区里面的数据被放到下一个事物batch里面执

    正因为当初对未来做了太多的憧憬,所以对现在的自己尤其失望。生命中曾经有过的所有灿烂,终究都需要用寂寞来偿还。
  • 相关阅读:
    二进制部署k8s集群(7):创建(Pod, Deployment、Service)验证kubernetes集群
    二进制部署k8s集群(六):部署kube-proxy
    centos同步系统时间
    二进制部署k8s集群(五):部署kubelet
    二进制部署k8s集群(四):部署controller-manager与kube-scheduler
    二进制部署k8s集群(三):部署kube-apiserver,签发kube-apiserver证书|kuelete证书|kube-proxy证书
    二进制部署k8s集群(二): 签发etcd证书,安装etcd集群
    二进制部署k8s集群(一):前期准备,安装虚拟机与DNS软件bind9
    docker-compose.yml 使用说明
    python--将字符串类型的list 转换成 list
  • 原文地址:https://www.cnblogs.com/candlia/p/11920214.html
Copyright © 2011-2022 走看看