zoukankan      html  css  js  c++  java
  • Storm 中什么是-acker,acker工作流程介绍

    概述

    我们知道storm一个很重要的特性是它能够保证你发出的每条消息都会被完整处理, 完整处理的意思是指:

    一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所导致的所有的tuple都被成功处理。而一个tuple会被认为处理失败了如果这个消息在timeout所指定的时间内没有成功处理。

    也就是说对于任何一个spout-tuple以及它的所有子孙到底处理成功失败与否我们都会得到通知。关于如果做到这一点的原理,可以看看Twitter Storm如何保证消息不丢失这篇文章。从那篇文章里面我们可以知道,storm里面有个专门的acker来跟踪所有tuple的完成情况。这篇文章就来讨论acker的详细工作流程。

    源代码列表

    这篇文章涉及到的源代码主要包括:

    1. backtype.storm.daemon.acker
    2. backtype.storm.daemon.task
    3. backtype.storm.task.OutputCollectorImpl
    算法简介

    acker对于tuple的跟踪算法是storm的主要突破之一, 这个算法使得对于任意大的一个tuple树, 它只需要恒定的20字节就可以进行跟踪了。原理很简单:acker 对于每个spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple/ack一个tuple,那么tuple的id都要跟这个校验值异或一下,并且把得到的值更新为ack-val的新值。那么假设 每个发射出去的tuple都被ack了, 那么最后ack-val一定是0(因为一个数字跟自己异或得到的值是0)。

    进入正题

    那么下面我们从源代码层面来看看哪些组件在哪些时候会给acker发送什么样的消息来共同完成这个算法的。acker对消息进行处理的主要是下面这块代码:

     
    (let [id (.getValue tuple 0)
       ^TimeCacheMap pending @pending
       curr (.get pending id)
       curr (condp = (.getSourceStreamId tuple)
            ACKER-INIT-STREAM-ID (-> curr
                   (update-ack id)
                   (assoc :spout-task (.getValue tuple 1)))
            ACKER-ACK-STREAM-ID (update-ack
                             curr (.getValue tuple 1))
            ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
                ...)
     
    Spout创建一个新的tuple的时候给acker发送消息

    消息格式(看上面代码的第1行和第7行对于tuple.getValue()的调用)

     
    (spout-tuple-id, task-id)
     
     
    消息的streamId是__ack_init(ACKER-INIT-STREAM-ID)
     
    这 是告诉acker, 一个新的spout-tuple出来了, 你跟踪一下,它是由id为task-id的task创建的(这个task-id在后面会用来通知这个task:你的tuple处理成功了/失败了)。处理 完这个消息之后, acker会在它的pending这个map(类型为TimeCacheMap)里面添加这样一条记录:
     
     
     
    {spout-tuple-id {:spout-tasktask-id :valack-val)}
     

    这就是acker对spout-tuple进行跟踪的核心数据结构, 对于每个spout-tuple所产生的tuple树的跟踪都只需要保存上面这条记录。acker后面会检查:val什么时候变成0,变成0, 说明这个spout-tuple产生的tuple都处理完成了。

    Bolt发射一个新tuple的时候会给acker发送消息么?

    任何一个bolt在发射一个新的tuple的时候,是不会直接通知acker的,如果这样做的话那么每发射一个消息会有三条消息了:

    1. Bolt创建这个tuple的时候,把它发给下一个bolt的消息
    2. Bolt创建这个tuple的时候,发送给acker的消息
    3. ack tuple的时候发送的ack消息

    事 实上storm里面只有第一条和第三条消息,它把第二条消息省掉了, 怎么做到的呢?storm这点做得挺巧妙的,bolt在发射一个新的bolt的时候会把这个新tuple跟它的父tuple的关系保存起来。然后在ack 每个tuple的时候,storm会把要ack的tuple的id, 以及这个tuple新创建的所有的tuple的id的异或值发送给acker。这样就给每个tuple省掉了一个消息(具体看下一节)。

    Tuple被ack的时候给acker发送消息

    每个tuple在被ack的时候,会给acker发送一个消息,消息格式是:

     
    (spout-tuple-id, tmp-ack-val)
     
    消息的streamId是__ack_ack(ACKER-ACK-STREAM-ID)
     
    注意,这里的tmp-ack-val是要ack的tuple的id与由它新创建的所有的tuple的id异或的结果:
     
     
    tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )
     
    我们可以从task.clj里面的send-ack方法看出这一点:
     
     
    (defn- send-ack [^TopologyContext topology-context
                              ^Tuple input-tuple
                              ^List generated-ids send-fn]
      (let [ack-val (bit-xor-vals generated-ids)]
        (doseq [
          [anchor id] (.. input-tuple
                          getMessageId
                          getAnchorsToIds)]
          (send-fn (Tuple. topology-context
                     [anchor (bit-xor ack-val id)]
                     (.getThisTaskId topology-context)
                     ACKER-ACK-STREAM-ID))
          )))
     
    这里面的generated-ids参数就是这个input-tuple的所有子tuple的id, 从代码可以看出storm会给这个tuple的每一个spout-tuple发送一个ack消息。
     
    为什么说这里的generated-ids是input-tuple的子tuple呢? 这个send-ack是被OutputCollectorImpl里面的ack方法调用的:
     
    public void ack(Tuple input) {
        List generated = getExistingOutput(input);
        // don't just do this directly in case
        // there was no output
        _pendingAcks.remove(input);
        _collector.ack(input, generated);
    }
    generated是由getExistingOutput(input)方法计算出来的, 我们再来看看这个方法的定义:
     
     
    private List getExistingOutput(Tuple anchor) {
        if(_pendingAcks.containsKey(anchor)) {
            return _pendingAcks.get(anchor);
        } else {
            List ret = new ArrayList();
            _pendingAcks.put(anchor, ret);
            return ret;
        }
    }
     
    _pendingAcks里面存的是什么东西呢?
     
    private Tuple anchorTuple(Collection< Tuple > anchors,
                                    String streamId,
                                    List< Object > tuple) {
        // The simple algorithm in this function is the key
        // to Storm. It is what enables Storm to guarantee
        // message processing.
        // 这个map存的东西是 spout-tuple-id到ack-val的映射
        Map< Long, Long > anchorsToIds
                           = new HashMap<Long, Long>();
        // anchors 其实就是它的所有父亲:spout-tuple
        if(anchors!=null) {
            for(Tuple anchor: anchors) {
                long newId = MessageId.generateId();
                // 告诉每一个父亲,你们又多了一个儿子了。
                getExistingOutput(anchor).add(newId);
                for(long root: anchor.getMessageId()
                              .getAnchorsToIds().keySet()) {
                    Long curr = anchorsToIds.get(root);
                    if(curr == null) curr = 0L;
     
                    // 更新spout-tuple-id的ack-val
                    anchorsToIds.put(root, curr ^ newId);
                }
            }
        }
        return new Tuple(_context, tuple,
                        _context.getThisTaskId(),
                        streamId,
                        MessageId.makeId(anchorsToIds));
    }
     
    从上面代码里面的红色部分我们可以看出, _pendingAcks里面维护的其实就是tuple到自己儿子的对应关系。
     
    Tuple处理失败的时候会给acker发送失败消息
     
    acker会忽略这种消息的消息内容(消息的streamId为ACKER-FAIL-STREAM-ID), 直接将对应的spout-tuple标记为失败(最上面代码第9行)
     
    最后Acker发消息通知spout-tuple对应的Worker
     
    最后, acker会根据上面这些消息的处理结果来通知这个spout-tuple对应的task:
     
     
     
    (when (and curr
              (:spout-task curr))
     (cond (= 0 (:val curr))
           ;; ack-val == 0 说明这个tuple的所有子孙都
           ;; 处理成功了(都发送ack消息了)
           ;; 那么发送成功消息通知创建这个spout-tuple的task.
           (do
             (.remove pending id)
             (acker-emit-direct @output-collector
                                (:spout-task curr)
                                ACKER-ACK-STREAM-ID
                                [id]
                                ))
           ;; 如果这个spout-tuple处理失败了
           ;; 发送失败消息给创建这个spout-tuple的task
           (:failed curr)
           (do
             (.remove pending id)
             (acker-emit-direct @output-collector
                                (:spout-task curr)
                                ACKER-FAIL-STREAM-ID
                                [id]
                                ))
           ))
     
     
     
  • 相关阅读:
    POJ 1321:棋盘问题
    POJ 2251:Dungeon Master
    POJ 3438:Look and Say
    POJ 1094:Sorting It All Out拓扑排序之我在这里挖了一个大大的坑
    杭电1285--确定比赛名次(拓扑排序)
    南阳67--三角形面积
    南阳38--布线问题
    杭电1050--Moving Tables(区间覆盖)
    杭电1217--Arbitrage(Spfa)
    杭电1719--Friend(找规律)
  • 原文地址:https://www.cnblogs.com/wuxiang/p/5630721.html
Copyright © 2011-2022 走看看