zoukankan      html  css  js  c++  java
  • Storm系列(十三)架构分析之Worker-维护ZMQ连接

    Worker根据Topology的定义及分配到自身的任务情况,计算出发出的消息被那些Task接收,由于Worker上分配的任务可能被调整,因此Worker需要定时的更新这些连接信息。

    ZMQ连接信息更新

    函数:mk-refresh-connections

    方法原型:

    (defn mk-refresh-connection [worker])

    方法说明:

    1. 调用worker-outbound-tasks返回从worker参数接收数据的TaskId集合(outbound-tasks).
    2. 定义this函数及相应的回调函数,并将自己注册到定时器:refresh-connections-timer中。
    3. 回调函数带有callback参数用于监听Zookeeper对应节点变化并同步。
    4. 调用storm-cluster-state的assignment函数获取与storm-id对应的Topology的任务分配。
    5. 调用:executor->node+port存储Executor到node+port的映射关系,再调用to-task->node+port根据Executor中的TaskId集合构建TaskId到node+port的映射关系,利用select-keys函数和outbound-tasks集合进行过滤,得到从该Worker接收消息的TaskId到node+port的映射关系,最后调用map-val和endpoint->string函数获取node+port的字符串表示,最终结果为一个TaskId到node+port的哈希表(my-assignment).
    6. 对my-assingmnet进行过滤,移除所有存在该worker的TaskId结果保存到needed-assignment。
    7. 从needed-assignment中获取目标Worker集合(node_port)存储到needed-connections,从needed-assignment中获取目标节点上的TaskId集合存储到needed-tasks中。
    8. 获取worker节点中缓存的从node+port到ZMQ Socket的哈希表的所有键列表,存储到current-connections中。
    9. 判断哪些连接需要新建,哪些连接可以关闭,分别保存到new-connections和remove-connections中。
    10. 调用msg/connect方法根据new-connections中的node+poer创建新的连接,并更新到cached-node+port。
    11. 将cached-task->node+port更新为my-assignment。
    12. 调用需要删除的Socket的close方法,将这些Socket从:cached-node+port中移除。

    应用过程

    :refresh-connections-timer(mk-halting-timer)
    refresh-connections (mk-refresh-connections worker)
    _(refresh-connections nil)
    (schedule-recurring (:refresh-connections-timer worker) 0 (conf TASK-REGRESH-POLL-SECS) refresh-connections).

    代码说明:

    1. 调用mk-halting-timer创建计时器,该代码在创建Worker数据时调用。
    2. 创建一个用于更新连接的函数,然后立即执行refresh-connections函数更新ZMQ,然后不断执行该函数,执行间隔为TASK-REFRESH-SECS默认为10秒,在mk-worker函数中调用.

    从Zookeeper中获取Topology活跃情况

    refresh-storm-active函数获取Topology的状态信息.

    方法原型:

    (defn refresh-storm-active ([worker])…)

    方法说明:

    1. 提供匿名函数做为callback参数,在相应Zookeeper节点变化时回调。
    2. 通过:refresh-active-timer计时器完成定期调用,默认为10秒。
      (shcedule-recurring(:refresh-active-timer worker) 0 (conf TASK-FEFRESH-POLL_SECS) (partial refresh-active worker)).
    3. 调用:storm-cluster-state的storm-base方法获取Topology的基础信息。
    4. 判断该Topology是否处于活跃状态(:active),并将判断结果存储于:storm-active-atom变量中。

    Worker中接收函数

    Mk-transfer-local-fn函数用于产生并发送消息到Executor的接收队列,同一Worker内部的Executor之间通过该函数传递消息。

    方法原型:

    (defn mk-transfer-local-fn [worker])

    方法说明:

    1. 调用:short-executor-receive-queue-map返回Executor中第一个Task的TaskId到该Executor对应的接收队列的映射关系,保存到short-executor-queue-map变量。
    2. 调用task->short-executor返回从该Worker中的TaskId到Executor中第一个Task的TaskId的映射关系。
    3. 定义返回的匿名函数,该函数输入为ZMQ收到的一组消息tuple-batch,按照与消息TaskId对应的Executor中第一个Task的TaskId对消息进行分组,变量grouped对应的键为Executor中第一个Task的TaskId,值为属于该Executor的一组消息。
    4. 通过short-executor-receive-queue-map得到TaskId与Executor相对应的接收消息队列q。
    5. 调用disruptor/publish方法将收到的消息发送到队列q.

    Woker中的发送函数

    Mk-transfer-fn用于Executor的数据发送,分别有以下两种情况:

    1. 目标TaskId与发送TaskId属于同一个Worker,此时不需要跨进程传输消息,可将消息通过mk-tansfer-local-fn发送至接收端Executor的接收队列。
    2. 消息的目标TaskId跟发送TaskId属于不同的Worker中,此时则将消息序列化(KryoTupleSerializer)后发送至Worker的发送队列,由Worker(mk-tranfer-tuples-handler)负责将队列的消息通过ZMQ发送出去.

    方法原型:

    defn mk-transfer [worker]

    该方法返回一个参数为系列化器serializer和一组消息的函数。

    不同Worker间的通信

    Worker中有一个额外的线程对transfer-queue(worker对应的消息发送队列)进行监听,Mk-transfer-tuples-handler用于创建对应的消息处理器。

    方法原型:

    defn mk-transfer-tuples-handler [worker]
    1. 调用cached-node+port获取Worker中与目标node+port相对应的ZMQ Socket连接,保存到node+port->socket.
    2. 调用worker-data的cached-task->node+port获取TaskId到node+port的映射,保存到task->node+port.
    3. 定义一个clojure-handler,对应的函数定义为fn [packets _ batch-end?],第一个参数为一组消息packets,第二个忽略,第三个为为结束标记。
    4. 调用msg/seng函数将消息发送出去。

    发送监听线程的启动:

    transfer-tuples (mk-transfer-tuples-handler worker)
    transfer-thread (disruptor /consume-loop* (:transfer-queue worker) transfer-tuples)
  • 相关阅读:
    详解prototype、proto和constructor的关系
    BFC
    transition(动画属性)
    React组件生命周期
    正则限制input只能输入大于0的数字
    msbuild编译
    python 在头文件添加 #include "stdafx.h"
    2017年感想
    chVsprintf
    富文本测试
  • 原文地址:https://www.cnblogs.com/jianyuan/p/4866089.html
Copyright © 2011-2022 走看看