zoukankan      html  css  js  c++  java
  • Storm-源码分析- Storm中Zookeeper的使用

    在backtype.storm.cluster.clj中, 定义了storm对于Zookeeper的使用

     

    ClusterState

    首先定义操作Zookeeper集群的interface

    (defprotocol ClusterState
      (set-ephemeral-node [this path data])
      (delete-node [this path])
      (create-sequential [this path data])
      (set-data [this path data])  ;; if node does not exist, create persistent with this data 
      (get-data [this path watch?])
      (get-children [this path watch?])
      (mkdirs [this path])
      (close [this])
      (register [this callback])
      (unregister [this id])
      )

    实现和生成用于操作Zookeeper集群的record
    首先创建zk-client, 并在zk上创建STORM-ZOOKEEPER-ROOT目录
    接着定义,
        callbacks, callback集合
        active, 标志zk集群状态
        zk, zk client

    创建zk client的时候, 设置了watcher, 即zk server当状态发生变化时会给client发送event, 此处client设置的watcher会调用callbacks来处理server发送的event

    Storm在操作Zookeeper时, 使用CuratorFramework(http://curator.incubator.apache.org/curator-framework/index.html)

    最后实现ClusterState protocol, 其中register和unregister是用来添加/删除callbacks的, 其他都是些zk的常规操作

    (defn mk-distributed-cluster-state [conf]
      (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]
        (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))
        (.close zk))
      (let [callbacks (atom {})
            active (atom true)
            zk (zk/mk-client conf
                             (conf STORM-ZOOKEEPER-SERVERS)
                             (conf STORM-ZOOKEEPER-PORT)
                             :auth-conf conf
                             :root (conf STORM-ZOOKEEPER-ROOT)
                             :watcher (fn [state type path]
                                         (when @active
                                           (when-not (= :connected state)
                                             (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))
                                           (when-not (= :none type)
                                             (doseq [callback (vals @callbacks)]
                                               (callback type path))))
                                           ))]
        (reify
         ClusterState
         (register [this callback]
                   (let [id (uuid)]
                     (swap! callbacks assoc id callback)
                     id
                     ))
         (unregister [this id]
                     (swap! callbacks dissoc id))
    
         (set-ephemeral-node [this path data]
                             (zk/mkdirs zk (parent-path path))
                             (if (zk/exists zk path false)
                               (try-cause
                                 (zk/set-data zk path data) ; should verify that it's ephemeral
                                 (catch KeeperException$NoNodeException e
                                   (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")
                                   (zk/create-node zk path data :ephemeral)
                                   ))
                               (zk/create-node zk path data :ephemeral)
                               ))
         
         (create-sequential [this path data]
           (zk/create-node zk path data :sequential))
         
         (set-data [this path data]
                   ;; note: this does not turn off any existing watches
                   (if (zk/exists zk path false)
                     (zk/set-data zk path data)
                     (do
                       (zk/mkdirs zk (parent-path path))
                       (zk/create-node zk path data :persistent)
                       )))
         
         (delete-node [this path]
                      (zk/delete-recursive zk path)
                      )
         
         (get-data [this path watch?]
                   (zk/get-data zk path watch?)
                   )
         
         (get-children [this path watch?]
                       (zk/get-children zk path watch?))
         
         (mkdirs [this path]
                 (zk/mkdirs zk path))
         
         (close [this]
                (reset! active false)
                (.close zk))
         )))

     

    StormClusterState

    定义针对Storm定制的zk操作协议, 包含各种storm里面的信息在zk上的读写

    (defprotocol StormClusterState
      (assignments [this callback])
      (assignment-info [this storm-id callback])
      (active-storms [this])
      (storm-base [this storm-id callback])
    
      (get-worker-heartbeat [this storm-id node port])
      (executor-beats [this storm-id executor->node+port])
      (supervisors [this callback])
      (supervisor-info [this supervisor-id])  ;; returns nil if doesn't exist
    
      (setup-heartbeats! [this storm-id])
      (teardown-heartbeats! [this storm-id])
      (teardown-topology-errors! [this storm-id])
      (heartbeat-storms [this])
      (error-topologies [this])
    
      (worker-heartbeat! [this storm-id node port info])
      (remove-worker-heartbeat! [this storm-id node port])
      (supervisor-heartbeat! [this supervisor-id info])
      (activate-storm! [this storm-id storm-base])
      (update-storm! [this storm-id new-elems])
      (remove-storm-base! [this storm-id])
      (set-assignment! [this storm-id info])
      (remove-storm! [this storm-id])
      (report-error [this storm-id task-id error])
      (errors [this storm-id task-id])
    
      (disconnect [this])
      )

    首先判断是否第一次mk-storm-cluster-state, 既是否进行过zk cluster state的创建, 如果没有调用mk-distributed-cluster-state
    接着, 定义一系列的callbacks, 并调用cluster-state的register, 注册到callbacks列表中
           state-id 就是register返回的callback的uuid
    再者, 在zk上创建storm的子目录
    最后, 实现StormClusterState协议, 实现各种zk数据的读写

    (defn mk-storm-cluster-state [cluster-state-spec]
      (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)
                                    [false cluster-state-spec]
                                    [true (mk-distributed-cluster-state cluster-state-spec)])
            assignment-info-callback (atom {})
            supervisors-callback (atom nil)
            assignments-callback (atom nil) ;在StormClusterState.assignments中被set
            storm-base-callback (atom {})
            state-id (register
                      cluster-state
                      (fn [type path]
                        (let [[subtree & args] (tokenize-path path)] ;将path按'/'分割
                          (condp = subtree ;对path的subtree部分进行swith…case
                              ASSIGNMENTS-ROOT (if (empty? args)
                                                 (issue-callback! assignments-callback) ;issue-callback!, 执行并删除该callback, 保证callback只被执行一次
                                                 (issue-map-callback! assignment-info-callback (first args)))
                              SUPERVISORS-ROOT (issue-callback! supervisors-callback)
                              STORMS-ROOT (issue-map-callback! storm-base-callback (first args))
                              ;; this should never happen
                              (halt-process! 30 "Unknown callback for subtree " subtree args)
                              )
                          )))]
        (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]
          (mkdirs cluster-state p))
        (reify
         StormClusterState
         )

     

    例子

    通过一个场景来说明storm怎样使用zookeeper

    supervisor中的mk-synchronize-supervisor, 主要用于下载新的, 并删除不使用的topology代码
    所以这个逻辑光执行一次是不够的, 需要当每次assignment发生变化的时候就执行一次

    storm是利用zookeeper的watcher来解决这个问题
    1. 在mk-distributed-cluster-state中创建zk client的时候配置watcher, 当收到zk server的event的时候, 调用callbacks列表里面的callback进行处理

    2. 在mk-storm-cluster-state 中将callback加入cluster-state的callback列表
        而这个callback本身, 就是根据event中的path(代表哪部分数据发生change)来issue在storm-cluster-state中维护的一系列callback
        比如, 当ASSIGNMENTS-ROOT发生变化时, 会调用assignments-callback 
    3. 那么也就是说只需要将mk-synchronize-supervisor, set到assignments-callback, 就可以保证当ASSIGNMENTS-ROOT发生变化时, 调用mk-synchronize-supervisor去同步topology代码
       什么时候set? 在第一次调用mk-synchronize-supervisor的时候
    sync-callback (fn [& ignored] (.add event-manager this))
    assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)

       同步topology代码是消耗时间的事情, 所以实现的时候放在后台执行, 只是将this(function) add到event-manager的queue里面, 后台线程会执行这个函数
       并且在调用assignment获取assignments-snapshot的时候, 将sync-callback set到assignments-callback中去

         (assignments [this callback]
            (when callback
              (reset! assignments-callback callback))
            (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))

    By the way, 对于get-children, 是否有callback, 即是否被watch, 读的数据是不一样的, 具体原因不是很清楚, 需要后面看看zk的具体使用

    (defn get-children [^CuratorFramework zk ^String path watch?]
      (if watch?
        (.. zk (getChildren) (watched) (forPath (normalize-path path)))
        (.. zk (getChildren) (forPath (normalize-path path)))))

    4. 前面说了issue-callback!在执行assignments-callback之前, 会将其清空, 所以如果需要不断的触发, 那么就要不断的设置assignments-callback
        所以作为callback, mk-synchronize-supervisor会先通过assignments-snapshot去重设assignments-callback
        至于为什么要采用这样的机制? 现在还看不清楚

  • 相关阅读:
    WeX5 苹果APP打包教程
    开源中国社区
    HBuilder-飞速编码的极客工具,手指爽,眼睛爽下载
    java用double和float进行小数计算精度不准确
    SQL Server 查询表的主键的两种方式
    JS代码压缩格式化在线地址
    解决SQL Server 阻止了对组件 'Ad Hoc Distributed Queries' 的 STATEMENT'OpenRowset/OpenDatasource' 的访问的方法
    SQL跨数据库复制表数据
    ExtJs 扩展类CheckColumn的使用(事件触发)
    DM36x IPNC OSD显示中文 --- 基本数据准备篇
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3156081.html
Copyright © 2011-2022 走看看