zoukankan      html  css  js  c++  java
  • storm

    一 可靠性简单介绍

               
           Storm的可靠性是指Storm会告知用户每个消息单元是否在一个指定的时间(timeout)内被全然处理。

    全然处理的意思是该MessageId绑定的源Tuple以及由该源Tuple衍生的全部Tuple都经过了Topology中每个应该到达的Bolt的处理。


    注: timetout 能够通过Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS 来指定

           Storm中的每个Topology中都包括有一个Acker组件。Acker组件的任务就是跟踪从某个task中的Spout流出的每个messageId所绑定的Tuple树中的全部Tuple的处理情况。假设在用户设置的最大超时时间内这些Tuple没有被全然处理,那么Acker会告诉Spout该消息处理失败。相反则会告知Spout该消息处理成功,它会分别调用Spout中的fail和ack方法。
          Storm同意用户在Spout中发射一个新的源Tuple时为其指定一个MessageId,这个MessageId能够是随意的Object对象。多个源Tuple能够共用同一个MessageId,表示这多个源Tuple对用户来说是同一个消息单元,它们会被放到同一棵tuple树中。例如以下图所看到的:

                                
                          
                                                                             Tuple 树
           在Spout中由message 1绑定的tuple1和tuple2分别经过bolt1和bolt2的处理。然后生成了两个新的Tuple,并终于流向了bolt3。当bolt3处理完之后,称message 1被全然处理了。

    二 Acker 原理分析

           storm里面有一类特殊的task称为acker(acker bolt), 负责跟踪spout发出的每个tuple的tuple树。当acker发现一个tuple树已经处理完毕了。它会发送一个消息给产生这个tuple的那个task。你能够通过Config.TOPOLOGY_ACKERS来设置一个topology里面的acker的数量, 默认值是1。 假设你的topology里面的tuple比較多的话, 那么把acker的数量设置多一点。效率会高一点。


           理解storm的可靠性的最好的方法是来看看tuple和tuple树的生命周期, 当一个tuple被创建。 无论是spout还是bolt创建的。 它会被赋予一个64位的id,而acker就是利用这个id去跟踪全部的tuple的。

    每一个tuple知道它的祖宗的id(从spout发出来的那个tuple的id), 每当你新发射一个tuple, 它的祖宗id都会传给这个新的tuple。

    所以当一个tuple被ack的时候,它会发一个消息给acker。告诉它这个tuple树发生了怎么样的变化。详细来说就是它告诉acker:  我已经完毕了, 我有这些儿子tuple, 你跟踪一下他们吧。

                                      (spout-tuple-id, tmp-ack-val)
                     tmp-ark-val =  tuple-id ^ (child-tuple-id1 ^ child-tuple-id2 ... )
            tmp-ack-val是要ack的tuple的id与由它新创建的全部的tuple的id异或的结果

           当一个tuple须要ack的时候。它究竟选择哪个acker来发送这个信息呢?
            storm使用一致性哈希来把一个spout-tuple-id相应到acker, 由于每个tuple知道它全部的祖宗的tuple-id。 所以它自然能够算出要通知哪个acker来ack。
            注:一个tuple可能存在于多个tuple树,全部可能存在多个祖宗的tuple-id

            acker是怎么知道每个spout tuple应该交给哪个task来处理?
           当一个spout发射一个新的tuple。 它会简单的发一个消息给一个合适的acker,而且告诉acker它自己的id(taskid), 这样storm就有了taskid-tupleid的相应关系。 当acker发现一个树完毕处理了, 它知道给哪个task发送成功的消息。

    Acker的高效性
             acker task并不显式的跟踪tuple树。

    对于那些有成千上万个节点的tuple树,把这么多的tuple信息都跟踪起来会耗费太多的内存。相反, acker用了一种不同的方式, 使得对于每一个spout tuple所须要的内存量是恒定的(20 bytes) .  这个跟踪算法是storm怎样工作的关键,而且也是它的主要突破。


            一个acker task存储了一个spout-tuple-id到一对值的一个mapping。

    这个对子的第一个值是创建这个tuple的taskid, 这个是用来在完毕处理tuple的时候发送消息用的。 第二个值是一个64位的数字称作:ack val, ack val是整个tuple树的状态的一个表示,无论这棵树多大。

    它仅仅是简单地把这棵树上的全部创建的tupleid/ack的tupleid一起异或(XOR)。

              
              当一个acker task 发现一个 ack val变成0了。 它知道这棵树已经处理完毕了。

     比例如以下图是一个简单的Topology。

                            
     
                                                                            个简单的 Topology

            ack_val的初值为0,varl_x表示新产生的tuple id ,它们经过Spout,Bolt1,Bolt2,Bolt3 处理,并与arv_val异或,终于arv_val变为0,表示tuple1被成功处理。


       以下看一个略微复杂一点的样例:
                               

                                             注:红色虚线框表示的是Acker组件。ack_val表示acker value的值,它的初值为0

            msg1绑定了两个源tuple,它们的id分别为1001和1010.在经过Bolt1处理后新生成了tuple id为1110,新生成的tuple与传入的tuple 1001进行异或得到的值为0111,然后Bolt1通过spout-tuple-id映射到指定的Acker组件,向它发送消息。Acker组件将Bolt1传过来的值与ack_val异或,更新ack_val的值变为了0100。与此同样经过Bolt2处理后,ack_val的值变为0001。最后经Bolt3处理后ack_val的值变为了0。说明此时由msg1标识的Tuple处理成功,此时Acker组件会通过事先绑定的task id映射找到相应的Spout,然后调用该Spout的ack方法。

                其流程例如以下图所看到的:
                                                
                                            
        注:1. Acker (ack bolt)组件由系统自己主动产生。一般来说一个topology仅仅有一个ack bolt(当然能够通过配置參数指定多个),当bolt处理并下发完tuple给下一跳的bolt时,会发送一个ack给ack bolt。

    ack bolt通过简单的异或原理(即同一个数与自己异或结果为零)来判定从spout发出的某一个Tuple是否已经被全然处理完成。

    假设结果为真,ack bolt发送消息给spout,spout中的ack函数被调用并运行。

    假设超时,则发送fail消息给spout,spout中的fail函数被调用并运行。spout中的ack和fail的处理逻辑由用户自行填写。


             2. Acker对于每一个Spout-tuple保存一个ack-val的校验值,它的初始值是0, 然后每发射一个tuple 就ack一个tuple。那么tuple的id都要跟这个校验值异或一下,而且把得到的值更新为ack-val的新值。

    那么如果每一个发射出去的tuple都被ack了, 那么最后ack-val一定是0(由于一个数字跟自己异或得到的值是0)。

              A xor A = 0.
              A xor B…xor B xor A = 0。当中每个操作数出现且仅出现两次。

            3. tupleid是随机的64位数字, ack val碰巧变成0(比如:ark_val = 1 ^ 2  ^ 3 = 0)而不是由于全部创建的tuple都完毕了,这种概率极小。算一下就知道了, 就算每秒发生10000个ack, 那么须要50000000万年才可能碰到一个错误。并且就算碰到了一个错误, 也仅仅有在这个tuple失败的时候才会造成数据丢失。 


          看看storm在每种异常情况下是怎么避免数据丢失的:
             1. 因为相应的task挂掉了,一个tuple没有被ack: storm的超时机制在超时之后会把这个tuple标记为失败,从而能够又一次处理。
             2. Acker挂掉了: 这样的情况下由这个acker所跟踪的全部spout tuple都会超时,也就会被又一次处理。
             3. Spout挂掉了: 在这样的情况下给spout发送消息的消息源负责又一次发送这些消息。比方Kestrel和RabbitMQ在一个client断开之后会把全部”处理中“的消息放回队列。

    就像你看到的那样。 storm的可靠性机制是全然分布式的, 可伸缩的而且是高度容错的。


    三 Acker 编程接口

           在Spout中,Storm系统会为用户指定的MessageId生成一个相应的64位的整数。作为整个Tuple Tree的RootId。RootId会被传递给Acker以及兴许的Bolt来作为该消息单元的唯一标识。同一时候,不管Spout还是Bolt每次新生成一个Tuple时,都会赋予该Tuple一个唯一的64位整数的Id。

          当Spout发射完某个MessageId相应的源Tuple之后,它会告诉Acker自己发射的RootId以及生成的那些源Tuple的Id。

    而当Bolt处理完一个输入Tuple并产生出新的Tuple时,也会告知Acker自己处理的输入Tuple的Id以及新生成的那些Tuple的Id。Acker仅仅须要对这些Id进行异或运算,就能推断出该RootId相应的消息单元是否成功处理完毕了。

       以下这个是spout要实现的接口:

         public interface ISpout extends Serializable {
         void open(Map conf, TopologyContext context,
                  SpoutOutputCollector collector);
          void close();
          void nextTuple();
          void ack(Object msgId);
          void fail(Object msgId); 
        }

           首先storm通过调用spout的nextTuple方法来获取下一个tuple, Spout通过open方法參数里面提供的SpoutOutputCollector来发射新tuple到它的当中一个输出消息流, 发射tuple的时候spout会提供一个message-id, 后面通过这个message-id来追踪这个tuple。

             this.collector.emit(new Values("hello world"),msgId);
             注:msgId是提供给Acker组件使用的,Acker组件使用msgId来跟踪Tuple树

           接下来。 这个发射的tuple被传送到消息处理者bolt那里, storm会跟踪由此所产生的这课tuple树。假设storm检測到一个tuple被全然处理了, 那么storm会以最開始的那个message-id作为參数去调用消息源的ack方法;反之storm会调用spout的fail方法。

    值得注意的是。 storm调用ack或者fail的task始终是产生这个tuple的那个task。所以假设一个spout被分成非常多个task来运行。 消息运行的成功失败与否始终会通知最開始发出tuple的那个task。

           作为storm的使用者。有两件事情要做以更好的利用storm的可靠性特征。

    首先。在你生成一个新的tuple的时候要通知storm; 其次,完毕处理一个tuple之后要通知storm。 这样storm就能够检測整个tuple树有没有完毕处理。而且通知源spout处理结果。storm提供了一些简洁的api来做这些事情。

            由一个tuple产生一个新的tuple称为:anchoring。你发射一个新tuple的同一时候也就完毕了一次anchoring。看以下这个样例: 这个bolt把一个包括一个句子的tuple切割成每一个单词一个tuple。
        
      public class SplitSentence implements IRichBolt {
            OutputCollector _collector;
     
            public void prepare(Map conf,
                             TopologyContext context,
                             OutputCollector collector) {
                  _collector = collector;
            }
     
            public void execute(Tuple tuple) {
                    String sentence = tuple.getString(0);
                    for(String word: sentence.split(" ")) {
                          _collector.emit(tuple,new Values(word));
                    }
                 _collector.ack(tuple);
            }
     
           publicvoid cleanup() {}
           publicvoid declareOutputFields(OutputFieldsDeclarer declarer) {
                      declarer.declare(newFields("word"));
           }
       }

            看一下这个execute方法, emit的第一个參数是输入tuple。 第二个參数则是输出tuple, 这事实上就是通过输入tuple anchoring了一个新的输出tuple。由于这个“单词tuple”被anchoring在“句子tuple”一起, 假设当中一个单词处理出错,那么这整个句子会被又一次处理。作为对照, 我们看看假设通过以下这行代码来发射一个新的tuple的话会有什么结果。
            _collector.emit(new Values(word));
            用这样的方法发射会导致新发射的这个tuple脱离原来的tuple树(unanchoring), 假设这个tuple处理失败了, 整个句子不会被又一次处理。一个输出tuple能够被anchoring到多个输入tuple。这样的方式在stream合并或者stream聚合的时候非常实用。一个多入口tuple处理失败的话,那么它相应的全部输入tuple都要又一次运行。

    看看以下演示怎么指定多个输入tuple:

     List<Tuple> anchors = new ArrayList<Tuple>();
    anchors.add(tuple1);
    anchors.add(tuple2);
    _collector.emit(anchors,new Values(1,2,3));
             我们通过anchoring来构造这个tuple树,最后一件要做的事情是在你处理完这个tuple的时候告诉storm,  通过OutputCollector类的ack和fail方法来做,假设你回过头来看看SplitSentence的样例, 你能够看到“句子tuple”在全部“单词tuple”被发出之后调用了ack。

           你能够调用OutputCollector 的fail方法去立即将从消息源头发出的那个tuple标记为fail。 比方你查询了数据库,发现一个错误,你能够立即fail那个输入tuple。 这样能够让这个tuple被高速的又一次处理, 由于你不须要等那个timeout时间来让它自己主动fail。

          每一个你处理的tuple, 必须被ack或者fail。由于storm追踪每个tuple要占用内存。所以假设你不ack/fail每个tuple。 那么终于你会看到OutOfMemory错误。

           大多数Bolt遵循这种规律:读取一个tuple;发射一些新的tuple。在execute的结束的时候ack这个tuple。这些Bolt往往是一些过滤器或者简单函数。Storm为这类规律封装了一个BasicBolt类。假设用BasicBolt来做, 上面那个SplitSentence能够改写成这样:

         
           <pre name="code" class="java"> publicclass SplitSentence implements IBasicBolt {
            public void prepare(Map conf,
                                TopologyContext context) {
               }
     
            public void execute(Tuple tuple,
                                BasicOutputCollector collector) {
                      String sentence = tuple.getString(0);
                      for(String word: sentence.split(" ")) {
                        collector.emit(newValues(word));
                      }
           }
     
            publicvoid cleanup() {}
     
            publicvoid declareOutputFields(
                            OutputFieldsDeclarer declarer) {
                declarer.declare(newFields("word"));
            }
        }
    

    
    
           这个实现比之前的实现简单多了。 可是功能上是一样的,发送到BasicOutputCollector的tuple会自己主动和输入tuple相关联,而在execute方法结束的时候那个输入tuple会被自己主动ack的。


            作为对照,处理聚合和合并的bolt往往要处理一大堆的tuple之后才干被ack, 而这类tuple通常都是多输入的tuple。 所以这个已经不是IBasicBolt能够罩得住的了。

     注:当一个Tuple处理失败的时候,storm不会自己主动的重发该tuple,须要用户自己来编写逻辑又一次处理fail掉的Tuple,能够将其放入一个列表中。在nextTuple()中获取这些失败的tuple,又一次发射。

    四 调整可靠性 

           acker task是非常轻量级的, 所以一个topology里面不须要非常多acker。你能够通过Strom UI(id: -1)来跟踪它的性能。 假设它的吞吐量看起来不正常,那么你就须要多加点acker了。
           假设可靠性对你来说不是那么重要 — 你不太在意在一些失败的情况下损失一些数据。 那么你能够通过不跟踪这些tuple树来获取更好的性能。不去跟踪消息的话会使得系统里面的消息数量降低一半。 由于对于每个tuple都要发送一个ack消息。

    而且它须要更少的id来保存下游的tuple, 降低带宽占用。


     有三种方法能够去掉可靠性:
         
           第一是把Config.TOPOLOGY_ACKERS 设置成 0. 在这样的情况下。 storm会在spout发射一个tuple之后立即调用spout的ack方法。也就是说这个tuple树不会被跟踪。
          第二个方法是在tuple层面去掉可靠性。 你能够在发射tuple的时候不指定messageid来达到不跟踪某个特定的spout tuple的目的。

          最后一个方法是假设你对于一个tuple树里面的某一部分究竟成不成功不是非常关心,那么能够在发射这些tuple的时候unanchor它们。

    这样这些tuple就不在tuple树里面。 也就不会被跟踪了。


    五 小结

          在分布式系统中实现对数据的可靠处理是一件繁琐的事情,storm将事实上现的非常优雅,其Arcker不仅使得对数据的可靠处理变得简单并且还非常高效,这个非常值得学习和借鉴。





  • 相关阅读:
    XML错误信息Referenced file contains errors (http://www.springframework.org/schema/beans/spring-beans-4.0.xsd). For more information, right click on the message in the Problems View ...
    Description Resource Path Location Type Cannot change version of project facet Dynamic Web Module to 2.3.
    maven创建web报错Cannot read lifecycle mapping metadata for artifact org.apache.maven.plugins:maven-compiler-plugin:maven-compiler-plugin:3.5.1:runtime Cause: error in opening zip file
    AJAX跨域
    JavaWeb学习总结(转载)
    JDBC学习笔记
    Java动态代理之JDK实现和CGlib实现
    (转)看懂UML类图
    spring boot配置使用fastjson
    python3下django连接mysql数据库
  • 原文地址:https://www.cnblogs.com/gavanwanggw/p/6788815.html
Copyright © 2011-2022 走看看