zoukankan      html  css  js  c++  java
  • storm事件管理器EventManager源码分析event.clj

    storm事件管理器定义在event.clj中,主要功能就是通过独立线程执行"事件处理函数"。我们可以将"事件处理函数"添加到EventManager的阻塞队列中,EventManager的事件处理线程不断地从阻塞队列中获取"事件处理函数"并执行。

    EventManager协议

    协议就是一组函数定义的集合,协议中函数的第一个参数必须为实现该协议的实例本身,类似于java中实例方法的第一个参数为this;协议类似于java中的接口。

    (defprotocol EventManager
     (add [this event-fn])
     (waiting? [this])
     (shutdown [this]))
     
    event-manager函数

    (defn event-manager
     "Creates a thread to respond to events. Any error will cause process to halt"
     ;; daemon?表示是否将事件处理线程设置成守护线程
     [daemon?]
     ;; added表示已添加的"事件处理函数"的个数
     (let [added (atom 0)
         ;; processed表示已处理的"事件处理函数"的个数
           processed (atom 0)
         ;; queue绑定事件管理器的阻塞队列LinkedBlockingQueue
           ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
         ;; 设置事件管理器的状态为"running"
           running (atom true)
         ;; 创建事件处理线程。Clojure函数实现了Runnable和Callable接口,所以可以将Clojure函数作为参数传递给java.lang.Thread类的构造函数
           runner (Thread.
                  ;; 事件处理线程循环检查事件处理器的状态是否是"running",如果是,就从阻塞队列中获取"事件处理函数",并执行;然后将processed加1
                    (fn []
                      (try-cause
                        (while @running
                          (let [r (.take queue)]
                            (r)
                            (swap! processed inc)))
                        (catch InterruptedException t
                          (log-message "Event manager interrupted"))
                        (catch Throwable t
                          (log-error t "Error when processing event")
                          (exit-process! 20 "Error when processing an event")))))]
       (.setDaemon runner daemon?)
       ;; 启动事件处理线程
       (.start runner)
       ;; 返回一个实现了EventManager协议的实例
       (reify
         EventManager
         ;; add函数将"事件处理函数"添加到事件处理器的阻塞队列中
         (add
           [this event-fn]
           ;; should keep track of total added and processed to know if this is finished yet
           (when-not @running
             (throw (RuntimeException. "Cannot add events to a shutdown event manager")))
           (swap! added inc)
           (.put queue event-fn))
         ;; waiting?判断事件处理线程是否处于等待状态
         (waiting?
           [this]
           (or (Time/isThreadWaiting runner)
               (= @processed @added)))
         ;; 关闭事件管理器
         (shutdown
           [this]
           (reset! running false)
           (.interrupt runner)
           (.join runner)))))
  • 相关阅读:
    【POJ 3162】 Walking Race (树形DP-求树上最长路径问题,+单调队列)
    【POJ 2152】 Fire (树形DP)
    【POJ 1741】 Tree (树的点分治)
    【POJ 2486】 Apple Tree (树形DP)
    【HDU 3810】 Magina (01背包,优先队列优化,并查集)
    【SGU 390】Tickets (数位DP)
    【SPOJ 2319】 BIGSEQ
    【SPOJ 1182】 SORTBIT
    【HDU 5456】 Matches Puzzle Game (数位DP)
    【HDU 3652】 B-number (数位DP)
  • 原文地址:https://www.cnblogs.com/ierbar0604/p/3947580.html
Copyright © 2011-2022 走看看