Worker通过worker-data方法定义了一个包含很多共享数据的映射集合,Worker中很多方法都依赖它
mk-worker
功能:
创建对应的计时器、Executor、接收线程接收消息
方法原型:
1 | (defserverfn mk-worker [conf shared-mq-context storm-id assignment-id port worker-id]) |
Worker内部数据处理流程图:
Worker心跳
功能:
该心态信息被写入到本地文件系统,Supervisor读取这些心跳信息来判断Worker状态,然后决定是否重启Worker.
实现代码:
1 | (defn do-heartbeat [worker]) |
2 | (let [conf (:conf worker) |
3 | hb (WorkerHearbeat. |
4 | (current-time-secs) |
5 | (:storm-id worker) |
6 | (:executors worker) |
7 | (:port worker))] |
8 | (.put (worker-state conf (:worker-id worker)) |
9 | LS_WORKER_HEARTBEAT |
10 | hb))) |
代码说明:
- Worker心跳信息主要包括current-time-sec(当前时间),storm-id(topologyId),executors(Worker中包含的Executor列表),port(Worker对应的端口).
- 通过worker-state创建的LocalState对象,将hb心跳信息保存到STORM-LOCAL-DIR/workers/<workerId>/heartbeats(LS-WORKER_HEARTBEAT)本地文件夹中.
发送Worker心跳信息
功能:
Woker的心跳信息使用heartbeat-timer计时器进行持续发送,发送间隔默认为1秒(WORKER-HEARTBEAT-FREQUENCY-SECS)
调用方式:
1 | _(schedule-recurring (:hearbeat-timer worker) 0 (conf WORKER-HEARTBEAT-FREQUENCY-SECS) heartbeat-fn) |
Executor心跳
功能:
Worker心跳信息保存到本地文件夹,Executor心态保存到zookeeper中。
do-executor-hearbeats函数用来发送一次心跳。
方法原型:
1 | (defnk do-executor-heartbeats [worker :executors nil]) |
代码说明:
- 通过executor/render-stats方法获取Executor的运行统计信息。
- Executor心跳信息包括:topologyId、executor-stats(Worker中Executor的运行统计)、uptime(Worker的启动时间)、time-secs(当前时间).
- 调用storm-cluster-state的worker-hearbeat!方法存储心跳信息,在Zookeeper中存储路径为/storm/workerbeats/<storm-id>/<node-port>.
发送Executor心跳信息
功能:
Worker使用:executor-hearbeat-timer计时器线程来发送Executor心跳信息,默认三秒钟发送一次。
发送过程:
1 | _(schedule-recurring (:executor-heartbeat-timer worker) 0 (conf TASK_HEARBEAT-FREQUENCY-SECS) #(do-executor-hearbeats worker :executors @executors)) |