zoukankan      html  css  js  c++  java
  • Storm系列(十六)架构分析之Executor-Bolt

    准备消息循环的数据

    函数原型:

    let[executor-sampler (mk-stats-sampler (:storm-conf executor-data))]

    主要功能:

    定义tuple-action-fn函数,该函数会根据TaskId获得对应的Bolt对象并调用其executor方法.

    Bolt输入处理函数

    函数原型:

    tuple-action-fn (fn [task-id ^TupleImpl tuple])

    主要功能:

    获得Bolt对应的bolt-obj,调用executor回调方法。

    Bolt的消息发送函数

    函数原型:

    bolt-emit (fn [stream anchors values task])

    主要功能:

    1. 调用tasks-fn获取消息接收端的TaskId集合。
    2. 调用transfer-fn函数发送消息,该函数与Spout中实现类似(唯一的区别不使用overflow-buffer缓存).

    Bolt对象的初始化

    调用Bolt的prepare函数
    初始化过程:

    1. 获取Bolt对象并定义相关方法。bolt-emit方法用于向Executor的消息发送队列中发送消息。
    2. 调用Bolt对象的prepare方法,同时实例化Bolt对象的OutputCollector对象作为prepare方法的传入参数,OutputCollector的emit方法将调用bolt-emit函数来发送消息,ack及fail方法则用来对消息进行跟踪。
    3. 调用mk-task-receiver函数来获得接收队列的处理函数(tuple-action-fn)。

    消息循环

    调用阻塞方式的 consume-batch-when-available函数对接收队列中的消息进行处理。

    (fn [] (distruptor/comsume-batch-when-available receive-queuu event-handler))

    创建Executor

    创建Executor函数mk-executor

    函数原型:

    (defn mk-executor [worker executor-id])

    方法说明:

    1. 调用mk-executor-data创建Executor的数据。
    2. 调用mk-task创建Executor中的每个Task对应的数据。
    3. 调用start-batch-transfer->worker-handler!方法启动Executor的数据发送线程。
    4. 调用mk-threads方法获得Executor的主循环线程,并通过with-error-reaction宏对mk-threads进行包装。当异常发生时调用report-error-and-die方法记录错误并退出。
    5. 实例化RunningExecutor对象用来操作executor.
    6. 实例化 Shutdownable用于退出Executor并清理相关资源,具体操作包括:
      a)结束DisruptorQueue的消息循环。
      b)结束Executor中的启动线程
      c)清理用户钩子的数据
      d)断开与Zookeeper的连接
      e)依次调用Executor中Spout或Bolt的close方法.

    获取分组函数

    函数原型:

    (defn outbound-components [^WorkerTopologyContext worker-context component-id])

    功能描述:

    获取从组件到某一个流的分组函数,task-fn函数通过调用该分组函数可获得消息的目标Task集合。

    函数说明:

    1. 调用WorkerTopologyContext对象的getTargets方法得到一个哈希表,该哈希表的键为当前组件所对应的流,值为一个哈希表,用于记录目标组件以何种方式从该流接收数据。
    2. 调用outbound-groupings函数获得分组函数。

    outbound-groupings
    函数的定义

    (defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping])

    方法说明:

    1. 对目标组件进行过滤,若组件对应的TaskId集合为空,怎被过滤掉。
    2. 用map函数对组件及其分组方式进行处理,调用mk-grouper函数来产生分组函数,并最终返回一个保存有从组件到分组函数的映射关系的哈希表.

    mk-grouper

    返回一个函数,该函数返回一个TaskId集合,代表消息发送的目的Task集合

    函数原型:

    (defn mk-grouper [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks])

    函数说明:

    1. 获取与目标组件对应的Task的数目及排列后的列表,它们将作为计算目标Task的函数输入。某些分组方式只需要目标组件的Task数目,如:ShuffleGrouping操作.
    2. 针对Thrift类型,不同分组方式分别构建分组函数。

    触发系统Ticks

    setup-ticks!函数定期向Executor的接收消息队列发送Tick消息.Executor在收到Tick消息之后,就会执行发送队列的超时操作。setup-ticks!主要用于对Spout节点发送出去的消息进行操作操作。

    函数原型:

    (defn setup-ticks [worker executor-data])

    函数说明:

    1. 配置项TOPOLOGY-TICK-TUPLE-FREQ-SECS用来控制向__system流及__tick流发送消息的频率,tick-time-secs用来保存该频率值,receive-queue为Executor对应的接收Disruptor Queue,context为WorkerTopologyContext对象,Tick消息只发送到本地Worker,并不能被其它Worker的Executor收到。
    2. 判断tick-time-secs是否设置,若已进行设置则开始设置系统的Tick消息.
    3. 若该节点为Spout节点且未设置消息超时,则打印消息退出。参数TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS用于调试模式,由于超时的消息会给系统调试带来额外的复杂性,因此可在调试过程中暂时关闭消息的超时操作。当Spout收到Tick消息时,可对缓存在pending对象中的数据进行超时操作。
    4. 利用Worker定义的用户计时器tick-time-secs为间隔设置计时器,并定义计时器回调函数,向receive-queue中发送一条消息,该消息对应的TaskId为nil,表示该Executor中所有的Task都会收到该消息,消息的内容为tick-time-secs,-1表示系统TaskId,最后一项表示该消息会被发送到SYSTEM-TICK_STREAM.
  • 相关阅读:
    Java提高篇(三四)-----fail-fast机制
    opencv提取surf特征点出现的错误
    Android开发_Gson解析
    关于权限管理设计文章整理,希望对大家有所帮助
    URAL
    JAVA编程心得-多态设计初步
    paip. mysql如何临时 暂时 禁用 关闭 触发器
    Citrix服务器虚拟化之三十 XenApp 6.5发布流式应用程序
    在TextView使用部分颜色文字
    微软面试题:求整数随机数构成的数组中找到长度大于=3的最长的等差数列
  • 原文地址:https://www.cnblogs.com/jianyuan/p/4891955.html
Copyright © 2011-2022 走看看