zoukankan      html  css  js  c++  java
  • Storm系列(十五)架构分析之Executor-Spout

    Spout实现mk-threads接口用于创建与Executor对应的消息循环主函数。

    defmulti mk-threads executor-selector

    Mk-threads函数的主消息循环通过async-loop方法实现,若传入的函数为工厂方法,则在第一次调用该方法时进行初始化,并返回用于消息循环的函数。

    Spout输入处理函数

    spout的输入处理函数采用非阻塞的方式从接收队列中获取消息:

    (disruptor/consume-batch receive-queue event-handler)

    处理函数原型:

    tuple-action-fn (fn [task-id ^TupleImpl tuple])

    函数说明:

    1. 确定消息来源(根据流ID判定),若消息来自SYSTEM_TICK_STREAM_ID,则调用pending对象的rotate方法,该方法导致发送消息超时.在进行消息跟踪过程中,Spout会用pending对象来保存所有发送出去的消息,用SYSTEM_TICK消息作为清理缓存消息的信号.
    2. 若消息来自METRICS_TICK_STREAM_ID,则调用metrics-tick方法来整理目前统计信息并将其发送到信息统计Bolt节点上去.
    3. 其它消息来源只能为Ack/Fail的流,获取消息的ID,将该ID对应的数据从pending数组中清除出去,并返回该ID对应的数据,返回数据格式:
      [stored-task-id,spout-id,tuple-finished-info,start-time-ms]
      Stored-task-id为发送消息的TaskID,spout-id为发送消息是附带的MessageId,tuple-finished-info含有发送消息的StreamId及消息内容,start-time-ms则在消息被执行统计采样时,存储当前的时间.否则为空。
    4. 若消息来自ACKER-ACK-STREAM-ID,则调用ack-spout-msg回调方法处理消息(ack-spout-msg函数主要调用用户的Spout对象的Ack回调方法  .ack spout msg-id),若消息来自ACKER-FAIL-STREAM-ID,则调用fail-spout-msg方法处理(fail-spout-msg函数主要调用Spout对象的Fail回调方法 .fail spout msg-id)

    Spout消息发送函数

    Spout使用send-spout-msg函数来发送消息。

    函数原型:

    send-spout-msg (fn [out-stream-id values message-id out-task-id])

    参数说明:

    Out-stream-id为消息的StreamId,values为消息内容,message-id为消息的MessageId,表示是否对消息进行跟踪,out-task-id则是消息的接收端TaskId,用于向直接流发送消息.

    方法说明:

    1. 调用task-fn函数获得消息的目标TaskId,task-fn为Task的主要函数,它根据消息的StreamId和消息内容来确定哪些Task将接收该流的消息,以及以何种方式来接收该流的消息。对于直接分组方式,其作用主要是检查目标out-task-id是否以直接分组的方式来接收消息.
    2. task-fn函数内部通过调用,从outbound-components函数返回的从组件到某一个流的分组函数,获得的目标Task集合.
    3. 调用transfer-fn函数发送消息,该函数由mk-executor-transfer-fn函数创建,并会将消息发送至Executor发发送队列中.

    mk-executor-transfer-fn函数原型:

    (defn mk-executor-transfer-fn [bath-transfer->worker])

    函数说明:

    1. bath-transfer->worker对于于Executor的输出Disruptor Queue.
    2. 该函数有三种重载,主要区别在于是否对发送的消息进行缓存.
    3. 当Disruptor Queue接收端未启动或空间不足时,用overflow-buffer临时存放将要发送的信息。
    4. 在进行消息发送时若overflow-buffer非空,则标明该异常已经发生过,Disruptor Queue中空间不足,此时消息会被直接放入overflow-buffer以提高效率.
    5. 在 Spout消息循环中,会优先发送overflow-buffer中的数据.

    Spout对象的初始化

    调用Executor中各Spout对象的open操作,且open方法只会调用一次.

    初始化过程说明:

    1. 等待对应的Topology处于活跃状态.
    2. 对Executor中的每个Spout进行如下操作,获得tasks-fn函数和send-spout-msg函数,send-soput-msg函数会利用tasks-fn函数选择目标TaskId,每个Spout都定义了send-spout-msg方法,它是Task级别非Executor共享.
    3. 调用Spout对象的open回调方法,同时实例化SpoutOutputCollector,它主要用于调用send-spout-msg来发送消息.
    4. 调用consumer-started!函数打开接收队列。由于在open函数被调用之前,接收队列尚未打开,故最好不要在Spout的open函数中发送消息.

    Spout消息循环

    Async-loop:该函数使用一个线程来循环调用传入的函数afn,同时要求被调用的afn在执行结束后返回一个时间间隔,并将其作为与下次调用之间需要等待的事件间隔.

    过程说明:

    1. 以非阻塞的方式对接收队列中的消息进行处理。
    2. 优先发送overflow-buffer中的数据。
    3. 若overflow-buffer为空,并且pending存储的数据少于max-spout-pending,或者未设置max-spout-pending,Topology处于活动状态,则Spout可以发送消息,若Topology处于非活动状态则休眠100毫秒。
    4. 依次调用spout的nextTuple回调方法来发送消息,nextTuple会利用传入的SpoutOutputCollector的emit或emitDirect方法来发送消息,并最终调用send-spout-msg函数将消息发送到Executor的消息队列中,send-spout-msg会更新emitted-count.
    5. 若emitted-count与上次发送消息的curr-count相同,则表明nextTuple函数没有发送出去消息,此时调用spout-wait-strategy的emptyEmit方法来进行处理,默认休眠1毫秒(TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS配置项决定休眠时间).
  • 相关阅读:
    Windows Server 2003 SP2(32位) 中文版 下载地址 光盘整合方法
    用Recycle()方法对Java对象的重要性
    Lotus中千奇百怪的 $$
    Developing a simple application using steps "User Decision" and "Mail"(1) 沧海
    沟通中的情绪管理(演讲稿) 沧海
    人只有在压力之下,才可能成功,没做一件事,都必须成功,不许言败 沧海
    什么是IDOC,以及IDOC的步骤 沧海
    VS2008 Professional Edition CHS中的deffactory.dat读取错误 沧海
    Including custom text in the step "User Decision" 沧海
    SAP Upgrade Strategy 沧海
  • 原文地址:https://www.cnblogs.com/jianyuan/p/4891450.html
Copyright © 2011-2022 走看看