zoukankan      html  css  js  c++  java
  • Storm-源码分析-EventManager (backtype.storm.event)

    Protocol and DataType

    大体结构,

    定义protocol EventManager, 其实就是定义interface

    函数event-manager, 主要做2件事
    1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行
    2. 返回实现EventManager的匿名record(reify部分, 实现protocol)

    这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner

    (ns backtype.storm.event
      (:use [backtype.storm log util])
      (:import [backtype.storm.utils Time Utils])
      (:import [java.util.concurrent LinkedBlockingQueue TimeUnit])
      )
    
    (defprotocol EventManager
      (add [this event-fn])
      (waiting? [this])
      (shutdown [this]))
    
    (defn event-manager
      "Creates a thread to respond to events. Any error will cause process to halt"
      [daemon?]
      (let [added (atom 0)
            processed (atom 0)
            ^LinkedBlockingQueue queue (LinkedBlockingQueue.)
            running (atom true)
            runner (Thread.
                      (fn []
                        (try-cause
                          (while @running
                            (let [r (.take queue)]
                              (r)
                              (swap! processed inc)))
                        (catch InterruptedException t
                          (log-message "Event manager interrupted"))
                        (catch Throwable t
                          (log-error t "Error when processing event")
                          (halt-process! 20 "Error when processing an event"))
                          )))]
        (.setDaemon runner daemon?)
        (.start runner)
        (reify
          EventManager
          (add [this event-fn]
            ;; should keep track of total added and processed to know if this is finished yet
            (when-not @running
              (throw (RuntimeException. "Cannot add events to a shutdown event manager")))
            (swap! added inc)
            (.put queue event-fn)
            )
          (waiting? [this]
            (or (Time/isThreadWaiting runner)
                (= @processed @added)
                ))
          (shutdown [this]
            (reset! running false)
            (.interrupt runner)
            (.join runner)
            )
            )))

     

    使用的时候很简单, 如下

    let [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)]
    (.add processes-event-manager sync-processes)

    可以直接调用add或其他的function
    相当于给event-manager增加EventManager protocol, 反过来说, 给add或其他接口functions增加对event-manager record的support, 因为protocol函数的第一个参数总是类型

    比较神奇的是, 闭包产生的效果, 可以在完全没有queue, runner定义或声明的情况下, 方便的操作他们

  • 相关阅读:
    Sql ----- sqlserver 中的if 判断 case... when
    Bootstrap ---------
    js:定时弹出图片(获取属性、setInterval函数)
    js:轮播图(获取属性、setInterval函数)
    js:表单校验(获取元素、事件)
    js:获取元素的值(id、标签、html5新增、特殊元素的获取)
    js:流程控制(分支结构、顺序结构、循环结构)
    拦截器原理(AOP、责任链模式、拦截器的创建、自定义拦截器)
    Action类
    Struts2配置文件(动态方法调用)
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3152926.html
Copyright © 2011-2022 走看看