zoukankan      html  css  js  c++  java
  • Storm-源码分析-EventManager (backtype.storm.event)

    Protocol and DataType

    大体结构,

    定义protocol EventManager, 其实就是定义interface

    函数event-manager, 主要做2件事
    1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行
    2. 返回实现EventManager的匿名record(reify部分, 实现protocol)

    这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner

    (ns backtype.storm.event
      (:use [backtype.storm log util])
      (:import [backtype.storm.utils Time Utils])
      (:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
      )
    
    (defprotocol EventManager
      (add [this event-fn])
      (waiting? [this])
      (shutdown [this]))
    
    (defn event-manager
      "Creates a thread to respond to events. Any error will cause process to halt"
      [daemon?]
      (let [added (atom 0)
            processed (atom 0)
            ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
            running (atom true)
            runner (Thread.
                      (fn []
                        (try-cause
                          (while @running
                            (let [r (.take queue)]
                              (r)
                              (swap! processed inc)))
                        (catch InterruptedException t
                          (log-message "Event manager interrupted"))
                        (catch Throwable t
                          (log-error t "Error when processing event")
                          (halt-process! 20 "Error when processing an event"))
                          )))]
        (.setDaemon runner daemon?)
        (.start runner)
        (reify
          EventManager
          (add [this event-fn]
            ;; should keep track of total added and processed to know if this is finished yet
            (when-not @running
              (throw (RuntimeException. "Cannot add events to a shutdown event manager")))
            (swap! added inc)
            (.put queue event-fn)
            )
          (waiting? [this]
            (or (Time/isThreadWaiting runner)
                (= @processed @added)
                ))
          (shutdown [this]
            (reset! running false)
            (.interrupt runner)
            (.join runner)
            )
            )))

     

    使用的时候很简单, 如下

    let [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
    (.add processes-event-manager sync-processes)

    可以直接调用add或其他的function
    相当于给event-manager增加EventManager protocol, 反过来说, 给add或其他接口functions增加对event-manager record的support, 因为protocol函数的第一个参数总是类型

    比较神奇的是, 闭包产生的效果, 可以在完全没有queue, runner定义或声明的情况下, 方便的操作他们

  • 相关阅读:
    数据库优化
    2013调试sql的方法
    C++ Primer Pluse_6_课后题
    软件测试--(10)功能测试、系统测试
    软件测试--发展之路
    软件测试--(9)软件测试过程和软件测试模型
    软件测试--(8)软件开发过程和软件开发模型
    软件测试--(7)集成测试
    软件测试--(6)模块测试(单元测试)
    软件测试--(5)测试策略
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3152926.html
Copyright © 2011-2022 走看看