zoukankan      html  css  js  c++  java
  • Storm可靠性实例解析——ack机制

    对于Storm,它有一个很重要的特性:“Guarantee no data loss” ——可靠性

    很显然,要做到这个特性,必须要track每个data的去向和结果。Storm是如何做到的呢——acker机制

    先概括下acker所参与的工作流程:

    1. Spout创建一个新的Tuple时,会发一个消息通知acker去跟踪;
    2. Bolt在处理Tuple成功或失败后,也会发一个消息通知acker;
    3. acker会找到发射该Tuple的Spout,回调其ack或fail方法。

    我们说RichBolt和BasicBolt的区别是后者会自动ack。那么是不是我们只要实现了Spout的ack或fail方法就能看到反馈了呢?

    试试在RandomSpout(extends BaseRichSpout )中加入如下代码:

     1 public class RandomSpout extends BaseRichSpout {  
     2   
     3     private SpoutOutputCollector collector;  
     4   
     5     private Random rand;  
     6       
     7     private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};  
     8       
     9     @Override  
    10     public void open(Map conf, TopologyContext context,  
    11             SpoutOutputCollector collector) {  
    12         this.collector = collector;  
    13         this.rand = new Random();  
    14     }  
    15   
    16     @Override  
    17     public void nextTuple() {  
    18         String toSay = sentences[rand.nextInt(sentences.length)];  
    19         this.collector.emit(new Values(toSay));  
    20     }  
    21   
    22     @Override  
    23     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
    24         declarer.declare(new Fields("sentence"));  
    25     }  
    26   
    27 }  
    public class RandomSpout extends BaseRichSpout
     1 @Override  
     2 
     3     public void ack(Object msgId) {  
     4 
     5         System.err.println("ack " + msgId);  
     6 
     7     }  
     8 
     9     @Override  
    10 
    11     public void fail(Object msgId) {  
    12 
    13         System.err.println("fail " + msgId);  
    14 
    15     } 

    疑问:重新运行ExclaimBasicTopo,看下结果。并没有任何的ack 和 fail 出现?

    分析:原因是,Storm要求如果要track一个Tuple,必须要指定其messageId,也就是回调回ack和fail方法的参数。如果我们不指定,Storm是不会去track该tuple的,即不保证消息丢失!

    探讨:我们改下Spout代码,为每个消息加入一个唯一Id。同时,为了方便看结果,加入更多的打印,并且靠sleep减慢发送速度。(只是为了演示!)

     1 public class RandomSpout extends BaseRichSpout {  
     2 
     3     private SpoutOutputCollector collector;  
     4 
     5     private Random rand;  
     6 
     7     private AtomicInteger counter;     
     8 
     9     private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};  
    10 
    11     @Override  
    12 
    13     public void open(Map conf, TopologyContext context,  
    14 
    15             SpoutOutputCollector collector) {  
    16 
    17         this.collector = collector;  
    18 
    19         this.rand = new Random();  
    20 
    21         counter = new AtomicInteger();  
    22 
    23     }  
    24 
    25     @Override  
    26 
    27     public void nextTuple() {  
    28 
    29         Utils.sleep(5000);  
    30 
    31         String toSay = sentences[rand.nextInt(sentences.length)];  
    32 
    33         int msgId = this.counter.getAndIncrement();  
    34 
    35         toSay = "["+ msgId + "]"+ toSay;  
    36 
    37         PrintHelper.print("Send " + toSay );  
    38          
    39         this.collector.emit(new Values(toSay), msgId);  
    40 
    41     }  
    42 
    43     @Override  
    44 
    45     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
    46 
    47         declarer.declare(new Fields("sentence"));  
    48 
    49     }   
    50 
    51     @Override  
    52 
    53     public void ack(Object msgId) {  
    54 
    55         PrintHelper.print("ack " + msgId);  
    56 
    57     }  
    58 
    59     @Override  
    60 
    61     public void fail(Object msgId) {  
    62 
    63         PrintHelper.print("fail " + msgId);  
    64 
    65     }  
    66 
    67 }  

    PrintHelper类:

     1 public class PrintHelper {  
     2 
     3     private static SimpleDateFormat sf = new SimpleDateFormat("mm:ss:SSS");    
     4 
     5     public static void print(String out){  
     6 
     7         System.err.println(sf.format(new Date()) + " [" + Thread.currentThread().getName() + "] " + out);  
     8 
     9     }      
    10 
    11 }  

    同时把PrintBolt里面打印也换成PrintHelper.print打印

    看下打印结果:

     1 53:33:891 [Thread-26-spout] Send [0]ted:I'm excited  
     2 53:33:896 [Thread-20-print] Bolt[0] String recieved: [0]ted:I'm excited!  
     3 53:38:895 [Thread-26-spout] Send [1]edi:I'm happy  
     4 53:38:895 [Thread-22-print] Bolt[1] String recieved: [1]edi:I'm happy!  
     5 53:38:895 [Thread-26-spout] ack 0  
     6 53:43:896 [Thread-26-spout] Send [2]edi:I'm happy  
     7 53:43:896 [Thread-22-print] Bolt[1] String recieved: [2]edi:I'm happy!  
     8 53:43:896 [Thread-26-spout] ack 1  
     9 53:48:896 [Thread-26-spout] Send [3]edi:I'm happy  
    10 53:48:896 [Thread-26-spout] ack 2  
    11 53:48:896 [Thread-24-print] Bolt[2] String recieved: [3]edi:I'm happy!  
    12 53:53:896 [Thread-26-spout] Send [4]ted:I'm excited  
    13 53:53:896 [Thread-26-spout] ack 3  
    14 53:53:896 [Thread-20-print] Bolt[0] String recieved: [4]ted:I'm excited!  
    15 53:58:897 [Thread-26-spout] Send [5]laden:I'm dangerous  
    16 53:58:897 [Thread-26-spout] ack 4  
    17 53:58:898 [Thread-24-print] Bolt[2] String recieved: [5]laden:I'm dangerous! 

    很明显看到:

    1. 并发度为1的Spout确实是一个线程,并发度为3的Bolt确实是三个线程;
    2. 消息完全处理完成后,确实回调了ack(Object msgId)方法,而且msgId的值,即为我们emit的msgId;
    3. 虽然我们在topology中定义了两个bolt,但实际上ack对于每个tuple只调用了一次;
    4. spout发出tuple后,Bolt很快就完成了,但是ack直到5秒后spout醒来才打印。

    Tuple树

      对于Spout创建的Tuple,在topology定义的流水线中经过Bolt处理时,可能会产生一个或多个新的Tuple。源Tuple+新产生的Tuple构成了一个Tuple树。当整棵树被处理完成,才算一个Tuple被完全处理,其中任何一个节点的Tuple处理失败或超时,则整棵树失败。

      超时的值,可以通过定义topology时,conf.setMessageTimeoutSecs方法指定。


    Anchor

    在我们例子中ExclaimRichBolt用

    附注:
     1 public class ExclaimBasicBolt extends BaseBasicBolt {  
     2   
     3     @Override  
     4     public void execute(Tuple tuple, BasicOutputCollector collector) {  
     5         //String sentence = tuple.getString(0);  
     6         String sentence = (String) tuple.getValue(0);  
     7         String out = sentence + "!";  
     8         collector.emit(new Values(out));  
     9     }  
    10   
    11     @Override  
    12     public void declareOutputFields(OutputFieldsDeclarer declarer) {  
    13         declarer.declare(new Fields("excl_sentence"));  
    14     }  
    15   
    16 }  
    ExclaimBasicBolt 原实现方式

    collector.emit(inputTule, new Values(newTupleValue));

    发射一个新的tuple。

    第一个参数是传入Bolt的tuple,第二个参数是新产生的tuple的value,这种emit的方式,在Storm中称为: "anchor"。


    Tuple的ack

      前面我们一直提到acker,看到这里,你应该能猜出acker其实就是Storm里面track一个Tuple保证其一定被处理的功能。acker也是一个component

    我们来看看acker的工作流程

    1. Spout在初始化时会产生一个tasksId

    2. Spout中创建新的Tuple,其id是一个64位的随机数;

    3. Spout将新建的Tuple发送出去(给出了messageId来开启Tuple的追踪), 同时会发送一个消息到某个acker,要求acker进行追踪。该消息包含两部分:

    • Spout的taskId:用户acker在整个tuple树被完全处理后找到原始的Spout进行回调ack或fail
    • 一个64位的ack val值: 标志该tuple是否被完全处理。初始值为0。

    4. 一个Bolt在处理完Tuple后,如果发射了一个新的anchor tuple,Storm会维护anchor tuple的列表;

    5. 该Bolt调用OutputCollector.ack()时,Storm会做如下操作:

    • 将anchor tuple列表中每个已经ack过的和新创建的Tuple的id做异或(XOR)。假定Spout发出的TupleID是tuple-id-0,该Bolt新生成的TupleID为tuple-id-1,那么,tuple-id-0XORtuple-id-0XOR tuple-id-1
    • Storm根据该原始TupleID进行一致性hash算法,找到最开始Spout发送的那个acker,然后把上面异或后得出的ack val值发送给acker

    6. acker收到新的ack val值后,与保存的原始的Tuple的id进行异或,如果为0,表示该Tuple已被完全处理,则根据其taskId找到原始的Spout,回调其ack()方法。

    fail的机制类似,在发现fail后直接回调Spout的fail方法。

    ——Storm就是通过这个acker的机制来保证数据不丢失。

      回头再看看上面的打印结果,b、c两条得到很好的解释了。那d是为什么呢?

      在最开始时,我曾经提到过,Storm的设计模型中,Spout是源源不断的产生数据的,所以其nextTuple()方法在任何时候不应该被打断。ack,fail 和 nextTuple是在同一个线程中完成的。

      所以,虽然acker发现一个Tuple已经完全处理完成,但是由于Spout线程在Sleep,无法回调。

      在设计中,我们应尽量避免在Spout、Bolt中去Sleep。如果确实需要控制,最好用异步线程来做,例如用异步线程读取数据到队列,再由Spout去取队列中数据。异步线程可以随意控制速度等。

    另外,

    Storm是否会自动重发失败的Tuple?

    这里答案已经很明显了。fail方法如何实现取决于你自己。只有在fail中做了重发机制,才有重发。

    注:Trident除外。这是Storm提供的特殊的事务性API,它确实会帮你自动重发的。


    Unanchor

      如果我们在Bolt中用OutputCollector.emit()发射一个新的Tuple时,并没有指定输入的Tuple(IBasicBolt的实现类用的是BasicOutPutCollector,其emit方法实际上还是调用OutputCollector.emit(),只不过内部会帮你填上输入的Tuple),那么行为称之为“Unanchor”。

      是否用Unanchor方式取决于你的实现。


    整可靠性

      在某些特定的情况下,你或许想调整Storm的可靠性。例如,你并不关心数据是否丢失,或者你想看看后面是否有某个Bolt拖慢了Spout的速度?
    那么,有三种方法可以实现:
    1. 在build topology时,设置acker数目为0,即conf.setNumAckers(0);
    2. 在Spout中,不指定messageId,使得Storm无法追踪;
    3. 在Bolt中,使用Unanchor方式发射新的Tuple。

    本文转自Edison徐storm应用系列之——可靠性与ack机制

  • 相关阅读:
    自我学习——javascript——基本技巧
    Edge Code CC卡死原因
    Django中ORM介绍和字段及字段参数
    Web框架本质及第一个Django实例
    Python连接MySQL数据库之pymysql模块使用
    Bootstrap框架
    jQuery快速入门
    前端基础之BOM和DOM
    前端基础之JavaScript
    前端基础之CSS
  • 原文地址:https://www.cnblogs.com/xymqx/p/4438672.html
Copyright © 2011-2022 走看看