zoukankan      html  css  js  c++  java
  • twitter storm源码走读之2 -- tuple消息发送场景分析

    欢迎转载,转载请注明出处源自徽沪一郎。本文尝试分析tuple发送时的具体细节,本博的另一篇文章《bolt消息传递路径之源码解读》主要从消息接收方面来阐述问题,两篇文章互为补充。

    worker进程内消息接收与处理全景图

    先上幅图简要勾勒出worker进程接收到tuple消息之后的处理全过程

    IConnection的建立与使用

    话说在mk-threads :bolt函数的实现中有这么一段代码,其主要功能是实现tuple的emit功能

    bolt-emit (fn [stream anchors values task]
              (let [out-tasks (if task
                      (tasks-fn task stream values)
                    (tasks-fn stream values))]
            (fast-list-iter [t out-tasks]
                    (let [anchors-to-ids (HashMap.)]
                      (fast-list-iter [^TupleImpl a anchors]
                              (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
                                (when (pos? (count root-ids))
                                  (let [edge-id (MessageId/generateId rand)]
                                (.updateAckVal a edge-id)
                                (fast-list-iter [root-id root-ids]
                                        (put-xor! anchors-to-ids root-id edge-id))
                                ))))
                      (transfer-fn t
                               (TupleImpl. worker-context
                                   values
                                   task-id
                                   stream
                                   (MessageId/makeId anchors-to-ids)))))
            (or out-tasks [])))

    加亮为蓝色的部分实现的功能是另外发送tuple,那么transfer-fn函数的定义在哪呢?见mk-threads的let部分,能见到下述一行代码

    :transfer-fn (mk-executor-transfer-fn batch-transfer->worker)

    在继续往下看每个函数实现之前,先确定一下这节代码阅读的目的。storm在线程之间使用disruptor进行通讯,在进程之间进行消息通讯使用的是zeromq或netty, 所以需要从transfer-fn追踪到使用zeromq或netty api的位置。

    再看mk-executor-transfer-fn函数实现

    (defn mk-executor-transfer-fn [batch-transfer->worker]
      (fn this
          ([task tuple block? ^List overflow-buffer]
           (if (and overflow-buffer (not (.isEmpty overflow-buffer)))
           (.add overflow-buffer [task tuple])
         (try-cause
          (disruptor/publish batch-transfer->worker [task tuple] block?)
          (catch InsufficientCapacityException e
             (if overflow-buffer
                 (.add overflow-buffer [task tuple])
               (throw e))
             ))))
          ([task tuple overflow-buffer]
           (this task tuple (nil? overflow-buffer) overflow-buffer))
          ([task tuple]
           (this task tuple nil)
           )))

    disruptor/publish表示将消息从本线程发送出去,至于谁是该消息的接收者,请继续往下看。

    worker进程中,有一个receiver-thread是用来专门接收来自外部进程的消息,那么与之相对的是有一个transfer-thread用来将本进程的消息发送给外部进程。所以刚才的disruptor/publish发送出来的消息应该被transfer-thread接收到。

    在transfer-thread中,能找到这行下述一行代码

    transfer-thread (disruptor/consume-loop* (:transfer-queue worker) transfer-tuples)

    对于接收到来自本进程中其它线程发送过来的消息利用transfer-tuples进行处理,transfer-tuples使用mk-transfer-tuples-handler来创建,所以需要看看mk-transfer-tuples-handler能否与zeromq或netty联系上呢?

    (defn mk-transfer-tuples-handler [worker]
      (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
                drainer (ArrayList.)
                node+port->socket (:cached-node+port->socket worker)
                task->node+port (:cached-task->node+port worker)
                endpoint-socket-lock (:endpoint-socket-lock worker)
                ]
        (disruptor/clojure-handler
         (fn [packets _ batch-end?]
         (.addAll drainer packets)
         (when batch-end?
           (read-locked endpoint-socket-lock
                (let [node+port->socket @node+port->socket
                            task->node+port @task->node+port]
                  ;; consider doing some automatic batching here (would need to not be serialized at this point to remo
                  ;; try using multipart messages ... first sort the tuples by the target node (without changing the lo
                  17
                  (fast-list-iter [[task ser-tuple] drainer]
                          ;; TODO: consider write a batch of tuples here to every target worker
                          ;; group by node+port, do multipart send
                          (let [node-port (get task->node+port task)]
                            (when node-port
                              (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
                            ))))
           (.clear drainer))))))

    上述代码中出现了与zeromq可能有联系的部分了即加亮为红色的一行。

    那凭什么说加亮的IConnection一行与zeromq有关系的,这话得慢慢说起,需要从配置文件开始。

    在storm.yaml中有这么一行配置项,即

    storm.messaging.transport: "backtype.storm.messaging.zmq"

    这个配置项与worker中的mqcontext相对应,所以在worker中以mqcontext为线索,就能够一步步找到IConnection的实现。connections在函数mk-refresh-connections中建立

    refresh-connections (mk-refresh-connections worker)

    mk-refresh-connection函数中与mq-context相关联的一部分代码如下所示

    (swap! (:cached-node+port->socket worker)
           #(HashMap. (merge (into {} %1) %2))
           (into {}
             (dofor [endpoint-str new-connections
                      :let [[node port] (string->endpoint endpoint-str)]]
                 [endpoint-str
                   (.connect
                ^IContext (:mq-context worker)
                storm-id
                ((:node->host assignment) node)
                port)
                   ]
                )))

    注意加亮部分,利用mq-conext中connect函数来创建IConnection. 当打开zmq.clj时候,就能验证我们的猜测。

    (^IConnection connect [this ^String storm-id ^String host ^int port]
              (require 'backtype.storm.messaging.zmq)
              (-> context
              (mq/socket mq/push)
              (mq/set-hwm hwm)
              (mq/set-linger linger-ms)
              (mq/connect (get-connect-zmq-url local? host port))
              mk-connection))

    代码走到这里,IConnection什么时候建立起来的谜底就揭开了,消息是如何从bolt或spout线程传递到transfer-thread,再由zeromq将tuple发送给下跳的路径打通了。

    tuple的分发策略 grouping

    从一个bolt中产生的tuple可以有多个bolt接收,到底发送给哪一个bolt呢?这牵扯到分发策略问题,其实在twitter storm中有两个层面的分发策略问题,一个是对于task level的,在讲topology submit的时候已经涉及到。另一个就是现在要讨论的针对tuple level的分发。

    再次将视线拉回到bolt-emit中,这次将目光集中在变量t的前前后后。

      (let [out-tasks (if task
    (tasks-fn task stream values)
    (tasks-fn stream values))]
    (fast-list-iter [t out-tasks]
    (let [anchors-to-ids (HashMap.)]
    (fast-list-iter [^TupleImpl a anchors]
    (let [root-ids (-> a .getMessageId .getAnchorsToIds .keySet)]
    (when (pos? (count root-ids))
    (let [edge-id (MessageId/generateId rand)]
    (.updateAckVal a edge-id)
    (fast-list-iter [root-id root-ids]
    (put-xor! anchors-to-ids root-id edge-id))
    ))))
    (transfer-fn t
    (TupleImpl. worker-context
    values
    task-id
    stream
    (MessageId/makeId anchors-to-ids)))))

    上述代码显示t从out-tasks来,而out-tasks是tasks-fn的返回值

        tasks-fn (:tasks-fn task-data)

    一谈tasks-fn,原来从未涉及的文件task.clj这次被挂上了,task-data与由task/mk-task创建。将中间环节跳过,调用关系如下所列。

    • mk-task
    • mk-task-data
    • mk-tasks-fn

    tasks-fn中会使用到grouping,处理代码如下

    fn ([^Integer out-task-id ^String stream ^List values]
              (when debug?
                (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
              (let [target-component (.getComponentId worker-context out-task-id)
                    component->grouping (get stream->component->grouper stream)
                    grouping (get component->grouping target-component)
                    out-task-id (if grouping out-task-id)]
                (when (and (not-nil? grouping) (not= :direct grouping))
                  (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
                (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
                (when (emit-sampler)
                  (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
                  (stats/emitted-tuple! executor-stats stream)
                  (if out-task-id
                    (stats/transferred-tuples! executor-stats stream 1)
                    (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))
                (if out-task-id [out-task-id])
                ))

    而每个topology中的grouping策略又是如何被executor知道的呢,这从另一端executor-data说起。

    在mk-executor-data中有下面一行代码 

    :stream->component->grouper (outbound-components worker-context component-id)

    outbound-components的定义如下

    (defn outbound-components
      "Returns map of stream id to component id to grouper"
      [^WorkerTopologyContext worker-context component-id]
      (->> (.getTargets worker-context component-id)
           clojurify-structure
           (map (fn [[stream-id component->grouping]]
            [stream-id
             (outbound-groupings
              worker-context
              component-id
              stream-id
              (.getComponentOutputFields worker-context component-id stream-id)
              component->grouping)]))
           (into {})
           (HashMap.)))
  • 相关阅读:
    多线程博文地址 http://www.cnblogs.com/nokiaguy/archive/2008/07/13/1241817.html
    vs2010一运行就报错deven.exe assert failure 解决方法,卸载系统中.netFramework最新版本的(简体中文)
    Lambda语句中创建自定义类型时,也可指定某种特定类型,方法是在new与{}之间写上类型名称
    Win7开始菜单所在目录
    C#中Struct与Class的区别
    Linq语句:三表联查
    用exp、dmp导入导出用户到同一个实例下时,类型type会有问题
    列、约束重命名,原数据不丢失
    CDM中,创建一个或多个组合属性的唯一约束
    EF中新建表和关联表的方法
  • 原文地址:https://www.cnblogs.com/hseagle/p/3436304.html
Copyright © 2011-2022 走看看