zoukankan      html  css  js  c++  java
  • Twitter Storm中Bolt消息传递路径之源码解读

    本文初次发表于storm-cn的google groups中,现以blog的方式再次发表,表明本人徽沪一郎确实读过这些代码,:).

    Bolt作为task被executor执行,而executor是一个个的线程,所以executor必须存在于具体的process之中,而这个process就是worker。至于worker是如何被supervisor创建,尔后worker又如何创建executor线程,这些暂且按下不表。

     
    假设同属于一个Topology的Spout与Bolt分别处于不同的JVM,即不同的worker中,不同的JVM可能处于同一台物理机器,也可能处于不同的物理机器中。为了让情景简单,认为JVM处于不同的物理机器中。
     
    Spout的输出消息到达Bolt,作为Bolt的输入会经过这么几个阶段。
     
    1. spout的输出通过该spout所处worker的消息输出线程,将tuple输入到Bolt所属的worker。它们之间的通路是socket连接,用ZeroMQ实现。
    2. bolt所处的worker有一个专门处理socket消息的receive thread 接收到spout发送来的tuple
    3. receive thread将接收到的消息传送给对应的bolt所在的executor。 在worker内部(即同一process内部),消息传递使用的是Lmax Disruptor pattern.
    4. executor接收到tuple之后,由event-handler进行处理
     
    下面是具体的源码
    1. worker创建消息接收线程 
     
    worker.clj
     
    (defn launch-receive-thread [worker]
      (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
      (msg-loader/launch-receive-thread!
        (:mq-context worker)
        (:storm-id worker)
        (:port worker)
        (:transfer-local-fn worker)
        (-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))
        :kill-fn (fn [t] (halt-process! 11))))
     
    注意加亮的行会将storm.yaml中配置使用ZMQ或其它
    storm.messaging.transport:"backtype.storm.messaging.zmq"
     
    2. worker从socket接收到新消息
    vthread (async-loop
                     (fn []
                       (let [socket (.bind ^IContext context storm-id port)]
                         (fn []
                           (let [batched (ArrayList.)
                                 init (.recv ^IConnection socket 0)]
                             (loop [packet init]
                               (let [task (if packet (.task ^TaskMessage packet))
                                     message (if packet (.message ^TaskMessage packet))]
                                 (if (= task -1)
                                   (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
                                     (.close socket)
                                     nil )
                                   (do
                                     (when packet (.add batched [task message]))
                                     (if (and packet (< (.size batched) max-buffer-size))
                                       (recur (.recv ^IConnection socket 1))
                                       (do (transfer-local-fn batched)
                                         0 ))))))))))
     
    加亮行使用的transfer-local-fn会将接收的TaskMessage传递给相应的executor
     
    3. transfer-local-fn
     
    (defn mk-transfer-local-fn [worker]
      (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
            task->short-executor (:task->short-executor worker)
            task-getter (comp #(get task->short-executor %) fast-first)]
        (fn [tuple-batch]
          (let [grouped (fast-group-by task-getter tuple-batch)]
            (fast-map-iter [[short-executor pairs] grouped]
              (let [q (short-executor-receive-queue-map short-executor)]
                (if q
                  (disruptor/publish q pairs)
                  (log-warn "Received invalid messages for unknown tasks. Dropping... ")
                  )))))))
     
    用disruptor在线程之间进行消息传递。
     
    多费一句话,mk-transfer-local-fn表示将外部世界的消息传递给本进程内的线程。而mk-transfer-fn则刚好在方向上反过来。
     
    4. 消息被executor处理
     
    executor.clj
    ==========================================================
    (defn mk-task-receiver [executor-data tuple-action-fn]
      (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
            task-ids (:task-ids executor-data)
            debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
            ]
        (disruptor/clojure-handler
          (fn [tuple-batch sequence-id end-of-batch?]
            (fast-list-iter [[task-id msg] tuple-batch]
              (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
                (when debug? (log-message "Processing received message " tuple))
                (if task-id
                  (tuple-action-fn task-id tuple)
                  ;; null task ids are broadcast tuples
                  (fast-list-iter [task-id task-ids]
                    (tuple-action-fn task-id tuple)
                    ))
                ))))))
     
    加亮行中tuple-action-fn定义于mk-threads(源文件executor.clj)中。因为当前以Bolt为例,所以会调用的tuple-action-fn定义于defmethod mk-threads :bolt [executor-data task-datas]
     
    那么mk-task-receiver是如何与disruptor关联起来的呢,可以见定义于mk-threads中的下述代码
    (let [receive-queue (:receive-queue executor-data)
                  event-handler (mk-task-receiver executor-data tuple-action-fn)]
              (disruptor/consumer-started! receive-queue)
              (fn []            
                (disruptor/consume-batch-when-available receive-queue event-handler)
                0)))
     
    到了这里,消息的发送与接收处理路径打通。
  • 相关阅读:
    hdu2328 Corporate Identity
    hdu1238 Substrings
    hdu4300 Clairewd’s message
    hdu3336 Count the string
    hdu2597 Simpsons’ Hidden Talents
    poj3080 Blue Jeans
    poj2752 Seek the Name, Seek the Fame
    poj2406 Power Strings
    hust1010 The Minimum Length
    hdu1358 Period
  • 原文地址:https://www.cnblogs.com/hseagle/p/3333768.html
Copyright © 2011-2022 走看看