zoukankan      html  css  js  c++  java
  • Storm-源码分析-EventManager (backtype.storm.event)

    Protocol and DataType

    大体结构,

    定义protocol EventManager, 其实就是定义interface

    函数event-manager, 主要做2件事
    1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行
    2. 返回实现EventManager的匿名record(reify部分, 实现protocol)

    这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner

    (ns backtype.storm.event
      (:use [backtype.storm log util])
      (:import [backtype.storm.utils Time Utils])
      (:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
      )
    
    (defprotocol EventManager
      (add [this event-fn])
      (waiting? [this])
      (shutdown [this]))
    
    (defn event-manager
      "Creates a thread to respond to events. Any error will cause process to halt"
      [daemon?]
      (let [added (atom 0)
            processed (atom 0)
            ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
            running (atom true)
            runner (Thread.
                      (fn []
                        (try-cause
                          (while @running
                            (let [r (.take queue)]
                              (r)
                              (swap! processed inc)))
                        (catch InterruptedException t
                          (log-message "Event manager interrupted"))
                        (catch Throwable t
                          (log-error t "Error when processing event")
                          (halt-process! 20 "Error when processing an event"))
                          )))]
        (.setDaemon runner daemon?)
        (.start runner)
        (reify
          EventManager
          (add [this event-fn]
            ;; should keep track of total added and processed to know if this is finished yet
            (when-not @running
              (throw (RuntimeException. "Cannot add events to a shutdown event manager")))
            (swap! added inc)
            (.put queue event-fn)
            )
          (waiting? [this]
            (or (Time/isThreadWaiting runner)
                (= @processed @added)
                ))
          (shutdown [this]
            (reset! running false)
            (.interrupt runner)
            (.join runner)
            )
            )))

     

    使用的时候很简单, 如下

    let [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
    (.add processes-event-manager sync-processes)

    可以直接调用add或其他的function
    相当于给event-manager增加EventManager protocol, 反过来说, 给add或其他接口functions增加对event-manager record的support, 因为protocol函数的第一个参数总是类型

    比较神奇的是, 闭包产生的效果, 可以在完全没有queue, runner定义或声明的情况下, 方便的操作他们

  • 相关阅读:
    mysql数据库备份脚本
    int main(int argc,char *argv[])参数的应用
    文件I/O实现cp复制功能
    网络通信TCP编程实例代码
    外部碎片、进程描述符、内部碎片
    程序、进程、作业、线程的关系
    word2016怎么从第三页开始设置页码
    ARMs3c2440开发板挂接NFS服务
    u-boot添加一个hello命令
    vi 编辑器跳转到指定行数
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3152926.html
Copyright © 2011-2022 走看看