zoukankan      html  css  js  c++  java
  • Twitter Storm中Bolt消息传递路径之源码解读

    本文初次发表于storm-cn的google groups中,现以blog的方式再次发表,表明本人徽沪一郎确实读过这些代码,:).

    Bolt作为task被executor执行,而executor是一个个的线程,所以executor必须存在于具体的process之中,而这个process就是worker。至于worker是如何被supervisor创建,尔后worker又如何创建executor线程,这些暂且按下不表。

     
    假设同属于一个Topology的Spout与Bolt分别处于不同的JVM,即不同的worker中,不同的JVM可能处于同一台物理机器,也可能处于不同的物理机器中。为了让情景简单,认为JVM处于不同的物理机器中。
     
    Spout的输出消息到达Bolt,作为Bolt的输入会经过这么几个阶段。
     
    1. spout的输出通过该spout所处worker的消息输出线程,将tuple输入到Bolt所属的worker。它们之间的通路是socket连接,用ZeroMQ实现。
    2. bolt所处的worker有一个专门处理socket消息的receive thread 接收到spout发送来的tuple
    3. receive thread将接收到的消息传送给对应的bolt所在的executor。 在worker内部(即同一process内部),消息传递使用的是Lmax Disruptor pattern.
    4. executor接收到tuple之后,由event-handler进行处理
     
    下面是具体的源码
    1. worker创建消息接收线程 
     
    worker.clj
     
    (defn launch-receive-thread [worker]
      (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
      (msg-loader/launch-receive-thread!
        (:mq-context worker)
        (:storm-id worker)
        (:port worker)
        (:transfer-local-fn worker)
        (-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))
        :kill-fn (fn [t] (halt-process! 11))))
     
    注意加亮的行会将storm.yaml中配置使用ZMQ或其它
    storm.messaging.transport:"backtype.storm.messaging.zmq"
     
    2. worker从socket接收到新消息
    vthread (async-loop
                     (fn []
                       (let [socket (.bind ^IContext context storm-id port)]
                         (fn []
                           (let [batched (ArrayList.)
                                 init (.recv ^IConnection socket 0)]
                             (loop [packet init]
                               (let [task (if packet (.task ^TaskMessage packet))
                                     message (if packet (.message ^TaskMessage packet))]
                                 (if (= task -1)
                                   (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
                                     (.close socket)
                                     nil )
                                   (do
                                     (when packet (.add batched [task message]))
                                     (if (and packet (< (.size batched) max-buffer-size))
                                       (recur (.recv ^IConnection socket 1))
                                       (do (transfer-local-fn batched)
                                         0 ))))))))))
     
    加亮行使用的transfer-local-fn会将接收的TaskMessage传递给相应的executor
     
    3. transfer-local-fn
     
    (defn mk-transfer-local-fn [worker]
      (let [short-executor-receive-queue-map (:short-executor-receive-queue-map worker)
            task->short-executor (:task->short-executor worker)
            task-getter (comp #(get task->short-executor %) fast-first)]
        (fn [tuple-batch]
          (let [grouped (fast-group-by task-getter tuple-batch)]
            (fast-map-iter [[short-executor pairs] grouped]
              (let [q (short-executor-receive-queue-map short-executor)]
                (if q
                  (disruptor/publish q pairs)
                  (log-warn "Received invalid messages for unknown tasks. Dropping... ")
                  )))))))
     
    用disruptor在线程之间进行消息传递。
     
    多费一句话,mk-transfer-local-fn表示将外部世界的消息传递给本进程内的线程。而mk-transfer-fn则刚好在方向上反过来。
     
    4. 消息被executor处理
     
    executor.clj
    ==========================================================
    (defn mk-task-receiver [executor-data tuple-action-fn]
      (let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
            task-ids (:task-ids executor-data)
            debug? (= true (-> executor-data :storm-conf (get TOPOLOGY-DEBUG)))
            ]
        (disruptor/clojure-handler
          (fn [tuple-batch sequence-id end-of-batch?]
            (fast-list-iter [[task-id msg] tuple-batch]
              (let [^TupleImpl tuple (if (instance? Tuple msg) msg (.deserialize deserializer msg))]
                (when debug? (log-message "Processing received message " tuple))
                (if task-id
                  (tuple-action-fn task-id tuple)
                  ;; null task ids are broadcast tuples
                  (fast-list-iter [task-id task-ids]
                    (tuple-action-fn task-id tuple)
                    ))
                ))))))
     
    加亮行中tuple-action-fn定义于mk-threads(源文件executor.clj)中。因为当前以Bolt为例,所以会调用的tuple-action-fn定义于defmethod mk-threads :bolt [executor-data task-datas]
     
    那么mk-task-receiver是如何与disruptor关联起来的呢,可以见定义于mk-threads中的下述代码
    (let [receive-queue (:receive-queue executor-data)
                  event-handler (mk-task-receiver executor-data tuple-action-fn)]
              (disruptor/consumer-started! receive-queue)
              (fn []            
                (disruptor/consume-batch-when-available receive-queue event-handler)
                0)))
     
    到了这里,消息的发送与接收处理路径打通。
  • 相关阅读:
    Java Spring MVC框架搭建(一)
    LeetCode 239. Sliding Window Maximum(Hard)
    LeetCode 238. Product of Array Except Self
    LeetCode 237 Delete Node in a Linked List
    LeetCode 236. Lowest Common Ancestor of a Binary Tree
    LeetCode 235 Lowest Common Ancestor of a Binary Search Tree
    LeetCode 234. Palindrome Linked List
    LeetCode 232. Implement Queue using Stacks
    LeetCode 231. Power of Two
    LeetCode 230. Kth Smallest Element in a BST
  • 原文地址:https://www.cnblogs.com/hseagle/p/3333768.html
Copyright © 2011-2022 走看看