zoukankan      html  css  js  c++  java
  • Storm-源码分析-acker (backtype.storm.daemon.acker)

    backtype.storm.daemon.acker
    设计的巧妙在于, 不用分别记录和track, stream过程中所有的tuple, 而只需要track root tuple, 而所有中间过程都通过异或更新track entry

    acker-init, 在spout发送一个tuple时触发, 初始化这个root tuple的track entry 
    acker-ack, 在blot ack一个tuple的时候触发, 会对该tuple的anchors-to-ids中记录的每个(root, edge)进行ack, 并出于优化还会附带登记新的edge(对acker透明, 在发送前已经完成)
    acker-fail, 任一个过程中的tuple fail, 都会导致这个root tuple失败

     

    (defn mk-acker-bolt []
      (let [output-collector (MutableObject.)
            pending (MutableObject.)]
        (reify IBolt
          (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
                   (.setObject output-collector collector)
                   (.setObject pending (RotatingMap. 2)) ;;用RotatingMap来缓存每个tuple的track信息
                   )
          (^void execute [this ^Tuple tuple]
                 (let [^RotatingMap pending (.getObject pending)
                       stream-id (.getSourceStreamId tuple)]  ;;从ack tuple中取出streamid
                   (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) ;;收到system_tick_stream, rotate pending, spout的pending和acker的pending超期时间是一样的, 都取决于system-tick
                     (.rotate pending)
                     (let [id (.getValue tuple 0) ;;else,其他的stream,取出tuple id
                           ^OutputCollector output-collector (.getObject output-collector)
                           curr (.get pending id) ;;取出相应tuple的track entry
                           curr (condp = stream-id
                                    ACKER-INIT-STREAM-ID (-> curr  ;;初始化tuple的track entry
                                                             (update-ack (.getValue tuple 1)) ;;更新entry中的track value
                                                             (assoc :spout-task (.getValue tuple 2))) ;;记录该tuple和spout-task的关系, 这样在ack或fail的时候才知道通知谁
                                    ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1));;ack, 用val和原来的entry value做异或
                                    ACKER-FAIL-STREAM-ID (assoc curr :failed true))] ;;fail, 直接把entry的:failed设true
                       (.put pending id curr)
                       (when (and curr (:spout-task curr))
                         (cond (= 0 (:val curr)) ;;val为0, 表示该tuple的所有edge都被成功ack
                               (do
                                 (.remove pending id) ;;从pending中删除track entry, 并向相应的spout-task发送ack消息
                                 (acker-emit-direct output-collector
                                                    (:spout-task curr)
                                                    ACKER-ACK-STREAM-ID
                                                    [id]
                                                    ))
                               (:failed curr) ;;:failed为true, 表示该tuple失败
                               (do
                                 (.remove pending id) ;;从pending中删除track entry, 并向相应的spout-task发送fail消息 
                                 (acker-emit-direct output-collector
                                                    (:spout-task curr)
                                                    ACKER-FAIL-STREAM-ID
                                                    [id]
                                                    ))
                               ))
                       (.ack output-collector tuple) ;;acker bolt也是bolt, 所以最后完成对该ack tuple的ack
                       ))))
          (^void cleanup [this]
            )
          )))

     

    (defn- update-ack [curr-entry val]
      (let [old (get curr-entry :val 0)] ;;取出entry中的value值,默认设为0
        (assoc curr-entry :val (bit-xor old val)) ;;将old和新val异或, 赋给entry的value
        ))
  • 相关阅读:
    python flsak 框架
    postman
    压力测试和负载测试
    软件测试相关内容
    Linux常用基本命令
    leetcode刷题——反转字符串
    leetcode——搜索插入位置
    leetcode刷题——移除元素
    leetcode 刷题 ——删除排序数组中的重复项
    json.decoder.JSONDecodeError: Expecting value 错误的处理
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3240042.html
Copyright © 2011-2022 走看看