zoukankan      html  css  js  c++  java
  • Storm架构和编程模型总结

    1. 编程模型
      DataSource:外部数据源
      Spout:接受外部数据源的组件,将外部数据源转化成Storm内部的数据,以Tuple为基本的传输单元下发给Bolt
      Bolt:接受Spout发送的数据,或上游的bolt的发送的数据。根据业务逻辑进行处理。发送给下一个Bolt或者是存储到某种介质上。介质可以是Redis可以是mysql,或者其他。
      Tuple:Storm内部中数据传输的基本单元,里面封装了一个List对象,用来保存数据。
      StreamGrouping:数据分组策略
    7种:shuffleGrouping(Random函数),Non Grouping(Random函数),FieldGrouping(Hash取模)、Local or ShuffleGrouping 本地或随机,优先本地。
    2. 并发度
      用户指定的一个任务,可以被多个线程执行,并发度的数量等于线程的数量。一个任务的多个线程,会被运行在多个Worker(JVM)上,有一种类似于平均算法的负载均衡策略。尽可能减少网络IO,和Hadoop中的MapReduce中的本地计算的道理一样。
    3. 架构
      Nimbus:任务分配
      Supervisor:接受任务,并启动worker。worker的数量根据端口号来的。
      Worker:执行任务的具体组件(其实就是一个JVM),可以执行两种类型的任务,Spout任务或者bolt任务
      Task:Task=线程=executor。 一个Task属于一个Spout或者Bolt并发任务。
      Zookeeper:保存任务分配的信息、心跳信息、元数据信息。
    4. Worker与topology
      一个worker只属于一个topology,每个worker中运行的task只能属于这个topology。 反之,一个topology包含多个worker,其实就是这个topology运行在多个worker上。
      一个topology要求的worker数量如果不被满足,集群在任务分配时,根据现有的worker先运行topology。如果当前集群中worker数量为0,那么最新提交的topology将只会被标识active,不会运行,只有当集群有了空闲资源之后,才会被运行。

    5. 如何指定驱动类中每个组件的并发度数量?如何设置worker的数量?
      (1) 根据上游的数据量来设置Spout的并发度。
      (2) 根据业务复杂度和execute方法执行时间来设置Bolt并发度。
      (3) 根据集群的可用资源来配置,一般情况下70%的资源使用率。
        (4) Worker的数量理论上根据程序并发度总的Task数量来均分,在实际的业务场景中,需要反复调整

    6. Ack-fail机制

      (1) 需要ack-fail时,请为每个tuple生成一个messageID,这个messagetId是用来标识你关心的tuple。当这个tuple被完全处理时,storm框架会调用Spout的ack方法,否则调用fail。至于你的消息是否重发,完全由自己处理。

    MySpout{
            private Map buffer = new HashMap();
            spout.open()
            spout.nextTuple(){
                collector.emit()
                buffer.put(msgId,messValue)
            
            }
            spout.outputFields()
            spout.ack(msgId){
            //消息移除
            buffer.remove(msgId);
            
            }
            spout.fail(msgId){
            //消息重发
             String messValue = buffer.get(msgId)
             collector.emit();
            }
        }
        
        MyBolt{
            bolt.execute(){
            //先判断消息是否被处理过
            // 在redis或mysql中保存一个处理过的消息列表    
            //需要手动的调用ack方法
            collector.ack(tuple)
            }
        }

      (2) 在Spout有并发度的情况下,storm会根据tuple最开始的所属的spout taskId,通知相应的spoutTask

       (3) 在流式计算中topology的bolt组件是可以配置多个的,在每个环节中,都需要bolt组件显式告诉storm框架,自己对当前接受的这个tuple处理完成。
    <spoutTaskId,<RootID,ackaValue=0>>

    spout1----->tuple1(msgId,rootId)-----bolt1-----collector.ack(tuple)
                                   bolt1-----tuple1-1----->bolt2------ack(tuple1-1)
                                   bolt1-----tuple1-2----->bolt2------ack(tuple1-2)
                                   bolt1-----tuple1-3----->bolt2------ack(tuple1-3)
                                   bolt1-----tuple1-4----->bolt2------ack(tuple1-4)
                                                           bolt2-----tuple2-1----->bolt3------ack(tuple2-1)
                                                              bolt2-----tuple2-2----->bolt3------ack(tuple2-2)
                                                              bolt2-----tuple2-3----->bolt3------ack(tuple2-3)
                                                              bolt2-----tuple2-4----->bolt3------ack(tuple2-4)

      (4) ack机制里面,发送两种类型的tuple。一种是原始消息(DataTuple),另外一种是ackTuple<RootID,tupleID>,DataTuple中会包含一个MessageId的对象,

    spout.emit(DataTuple(MessageId(ackTuple)))------->bolt1.execute(dataTuple)---->collector.ack(dataTuple)
    ackTuple--------------------->Acker.execute(tuple)
    dataTuple--->MessageId--->ackTuple
    Acker.execute(tuple)


     

  • 相关阅读:
    c#将 1, 2, ..., 9共 9 个数字分成 3 组
    信息学院本科生创新项目总结
    Element-ui的使用
    fastmock接口管理
    mock安装与使用
    开闭原则
    里氏替换原则
    依赖倒置原则
    接口隔离原则
    单一职责原则
  • 原文地址:https://www.cnblogs.com/sunfie/p/7268199.html
Copyright © 2011-2022 走看看