zoukankan      html  css  js  c++  java
  • Storm-源码分析- hook (backtype.storm.hooks)

    task hook

    在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook

    当前定义如下事件, emit, cleanup, spoutAck……

    用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS)

    系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions

     

    public interface ITaskHook {
        void prepare(Map conf, TopologyContext context);
        void cleanup();
        void emit(EmitInfo info);
        void spoutAck(SpoutAckInfo info);
        void spoutFail(SpoutFailInfo info);
        void boltExecute(BoltExecuteInfo info);
        void boltAck(BoltAckInfo info);
        void boltFail(BoltFailInfo info);
    }

    public class EmitInfo {
        public List<Object> values;
        public String stream;
        public int taskId;
        public Collection<Integer> outTasks;
        
        public EmitInfo(List<Object> values, String stream, int taskId, Collection<Integer> outTasks) {
            this.values = values;
            this.stream = stream;
            this.taskId = taskId;
            this.outTasks = outTasks;
        }
    }

     

    1. add hook

    在mk-task的时候, 会从storm-conf配置里面读出hooks的class names
    创建hook对象, 加入到TopologyContext的_hooks中

    (defn mk-task [executor-data task-id]
        (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
          (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
    )
    public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {
        private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
        
        public void addTaskHook(ITaskHook hook) {
            hook.prepare(_stormConf, this);
            _hooks.add(hook);
        }
        
        public Collection<ITaskHook> getHooks() {
            return _hooks;
        }
    }

     

    2. apply hook

    当发生相应的事件时, 调用事先注册的hooks

    下面的例子是在emit时, 调用相应的hooks
    apply-hooks宏实现也很简单, 从topology context中取出hooks列表, 对每个hook调用emit(EmitInfo)

    (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))

    (defmacro apply-hooks [topology-context method-sym info-form]
      (let [hook-sym (with-meta (gensym "hook") {:tag 'backtype.storm.hooks.ITaskHook})]
        `(let [hooks# (get-context-hooks ~topology-context)]
           (when-not (hooks-empty? hooks#)
             (let [info# ~info-form]
               (fast-list-iter [~hook-sym hooks#]
                 (~method-sym ~hook-sym info#)
                 ))))))
  • 相关阅读:
    在energia中添加新的库
    KEIL3中出现的字符不对齐的情况解决办法
    VHDL硬件描述语言实现数字钟
    51单片机软件I2C驱动中的CY
    自问自答:在VB中如何实现像C++一样printf的功能
    [转][译] 分分钟学会一门语言之 Python 篇
    杂谈PID控制算法——最终篇:C语言实现51单片机中的PID算法
    Eclipse 安装与配置
    win10 环境安装 jdk 11.0.2
    解决网络问题神奇工具
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3225287.html
Copyright © 2011-2022 走看看