zoukankan      html  css  js  c++  java
  • Storm-源码分析- timer (backtype.storm.timer)

    mk-timer

    timer是基于PriorityQueue实现的(和PriorityBlockingQueue区别, 在于没有阻塞机制, 不是线程安全的), 优先级队列是堆数据结构的典型应用
    默认情况下, 按照自然顺序(其实就是默认comparator的定义), 最小的元素排在堆头
    当然也可以自己重新实现comparator接口, 比如timer就用reify重新实现了comparator接口

    整个过程其实比较简单, 开个timer-thread, 不断check PriorityQueue里面时间最小的timer是否已经可以触发
    如果可以, 就poll出来, 调用callback, 并sleep, 都很好理解

    唯一需要说的是, 这里使用Semaphore,
    信号量和lock相似, 都是用于互斥
    不同在于, 信号量模拟资源管理, 所以不同于lock的排他, 信号量可以接收多个aquire(取决于配置)
    另外一个比较大的区别, lock是解铃还须系铃人, 谁锁谁解, 而信号量无所谓, 任何线程都可以调用release, 或acquire
    这里使用信号量, 是用于在cancel-timer时, 等待timer-thread结束

    (defn cancel-timer [timer]
      (check-active! timer)
      (locking (:lock timer)
        (reset! (:active timer) false)
        (.interrupt (:timer-thread timer)))
      (.acquire (:cancel-notifier timer)))
    因为cancel的过程就是将active置false, 然后就是调用acquire等待信号量cancel-notifier被释放
    而timer-thread在线程结束前, 会release这个信号量

     

    (defnk mk-timer [:kill-fn (fn [& _] )]
      (let [queue (PriorityQueue. 10
                                  (reify Comparator
                                    (compare [this o1 o2]
                                      (- (first o1) (first o2))
                                      )
                                    (equals [this obj]
                                      true
                                      )))
            active (atom true) ;;标志位
            lock (Object.)     ;;创建lock对象, 由于PriorityQueue非线程安全, 所以使用locking来保证同时只有一个线程访问queue
            notifier (Semaphore. 0) ;;创建信号量, 初始为0
            timer-thread (Thread.
                          (fn []
                            (while @active
                              (try
                                ;;peek读但不从queue中取出, 先读出time看看, 符合条件再取出 
                                (let [[time-secs _ _ :as elem] (locking lock (.peek queue))]
                                  (if (and elem (>= (current-time-secs) time-secs)) 
                                    ;;无法保证恰好, 只要当前时间>=time-secs, 就可以执行, 可想而知对于afn必须不能耗时, 否则会影响其他timer
                                    ;; imperative to not run the function inside the timer lock
                                    ;; otherwise, it's possible to deadlock if function deals with other locks
                                    ;; (like the submit lock)
                                    (let [afn (locking lock (second (.poll queue)))]  ;;poll从queue中取出
                                      (afn))    ;;真正执行timer中的callback
                                    (Time/sleep 1000)
                                    ))
                                (catch Throwable t
                                  ;; because the interrupted exception can be wrapped in a runtimeexception
                                  (when-not (exception-cause? InterruptedException t)
                                    (kill-fn t)
                                    (reset! active false)
                                    (throw t))
                                  )))
                            (.release notifier)))]
        (.setDaemon timer-thread true)
        (.setPriority timer-thread Thread/MAX_PRIORITY)
        (.start timer-thread)
        {:timer-thread timer-thread
         :queue queue
         :active active
         :lock lock
         :cancel-notifier notifier}))

     

    schedule

    schedule其实就是往PriorityQueue里面插入timer

    对于循环schdule, 就是在timer的callback里面, 再次schedule

    (defnk schedule [timer delay-secs afn :check-active true]
      (when check-active (check-active! timer))
      (let [id (uuid)
            ^PriorityQueue queue (:queue timer)]
        (locking (:lock timer)
          (.add queue [(+ (current-time-secs) delay-secs) afn id])
          )))
    
    (defn schedule-recurring [timer delay-secs recur-secs afn]
      (schedule timer
                delay-secs
                (fn this []
                  (afn)
                  (schedule timer recur-secs this :check-active false)) ; this avoids a race condition with cancel-timer
                ))

     

    使用例子

    Supervisor中的使用例子, 定期的调用hb函数更新supervisor的hb
    在mk-timer时, 传入的kill-fn callback, 会在timer-thread发生exception的时候被调用

    :timer (mk-timer :kill-fn (fn [t]
                                (log-error t "Error when processing event")
                                (halt-process! 20 "Error when processing an event")
                                ))

    (schedule-recurring (:timer supervisor)
                            0
                            (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
                            heartbeat-fn)
  • 相关阅读:
    GetSystemMetrics SM_** 系统消息
    Direct3D中的HLSL
    Delphi TStream 详细介绍
    VI命令详解(大全)
    Delphi中messagedlg
    Windows中的消息详细列表
    SharePoint2007深入浅出——开发环境配置
    深入浅出InfoPath——预备式
    SharePoint术语对照表
    深入浅出Nintex——判断当前用户的角色
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3166945.html
Copyright © 2011-2022 走看看