zoukankan      html  css  js  c++  java
  • Storm的acker确认机制

     Storm的acker消息确认机制...

           ack/fail消息确认机制(确保一个tuple被完全处理)

           在spout中发射tuple的时候需要同时发送messageid,这样才相当于开启了消息确认机制

            如果你的topology里面的tuple比较多的话, 那么把acker的数量设置多一点,效率会高一点。

           通过config.setNumAckers(num)来设置一个topology里面的acker的数量,默认值是1。

           注意: acker用了特殊的算法,使得对于追踪每个spout tuple的状态所需要的内存量是恒定的(20 bytes)

           注意:如果一个tuple在指定的timeout(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS默认值为30秒)时间内没有被成功处理,那么这个tuple会被认为处理失败了。

    下面代码中Bolt的execute中模拟消息的正常和失败.

      1 import java.util.Map;
      2 
      3 import backtype.storm.Config;
      4 import backtype.storm.LocalCluster;
      5 import backtype.storm.spout.SpoutOutputCollector;
      6 import backtype.storm.task.OutputCollector;
      7 import backtype.storm.task.TopologyContext;
      8 import backtype.storm.topology.OutputFieldsDeclarer;
      9 import backtype.storm.topology.TopologyBuilder;
     10 import backtype.storm.topology.base.BaseRichBolt;
     11 import backtype.storm.topology.base.BaseRichSpout;
     12 import backtype.storm.tuple.Fields;
     13 import backtype.storm.tuple.Tuple;
     14 import backtype.storm.tuple.Values;
     15 import backtype.storm.utils.Utils;
     16 
     17 /**
     18  * 数字累加求和
     19  * 先添加storm依赖
     20  * 
     21  * @author Administrator
     22  *
     23  */
     24 public class LocalTopologySumAcker {
     25     
     26     
     27     /**
     28      * spout需要继承baserichspout,实现未实现的方法
     29      * @author Administrator
     30      *
     31      */
     32     public static class MySpout extends BaseRichSpout{
     33         private Map conf;
     34         private TopologyContext context;
     35         private SpoutOutputCollector collector;
     36         
     37         /**
     38          * 初始化方法,只会执行一次
     39          * 在这里面可以写一个初始化的代码
     40          * Map conf:其实里面保存的是topology的一些配置信息
     41          * TopologyContext context:topology的上下文,类似于servletcontext
     42          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     43          */
     44         @Override
     45         public void open(Map conf, TopologyContext context,
     46                 SpoutOutputCollector collector) {
     47             this.conf = conf;
     48             this.context = context;
     49             this.collector = collector;
     50         }
     51 
     52         int num = 1;
     53         /**
     54          * 这个方法是spout中最重要的方法,
     55          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     56          * 每调用一次,会向外发射一条数据
     57          */
     58         @Override
     59         public void nextTuple() {
     60             System.out.println("spout发射:"+num);
     61             //把数据封装到values中,称为一个tuple,发射出去
     62             //messageid:和tuple需要是一一对应的,可以把messageid认为是数据的主键id,而tuple中的内容就是这个数据.
     63             //messageid和tuple中的消息是一一对应的. 它们之间的关系是需要我们程序员来维护的.
     64             //this.collector.emit(new Values(num++));
     65             this.collector.emit(new Values(num++),num-1);//传递messageid(num-1)参数就表示开启了消息确认机制.
     66             Utils.sleep(1000);
     67         }
     68         
     69         @Override
     70         public void ack(Object msgId) {
     71             System.out.println("处理成功");
     72         }
     73         
     74         @Override
     75         public void fail(Object msgId) {
     76             System.out.println("处理失败....."+msgId);
     77             //TODO--可以选择把失败的数据重发,或者单独存储后期进行分析
     78             //重发的方法...this.collector.emit(tuple);//这个tuple可以根据参数msgId来获得...
     79         }
     80         
     81         /**
     82          * 声明输出字段
     83          */
     84         @Override
     85         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     86             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     87             //fields中定义的参数和values中传递的数值是一一对应的
     88             declarer.declare(new Fields("num"));
     89         }
     90         
     91     }
     92     
     93     
     94     /**
     95      * 自定义bolt需要实现baserichbolt
     96      * @author Administrator
     97      *
     98      */
     99     public static class MyBolt extends BaseRichBolt{
    100         private Map stormConf; 
    101         private TopologyContext context;
    102         private OutputCollector collector;
    103         
    104         /**
    105          * 和spout中的open方法意义一样
    106          */
    107         @Override
    108         public void prepare(Map stormConf, TopologyContext context,
    109                 OutputCollector collector) {
    110             this.stormConf = stormConf;
    111             this.context = context;
    112             this.collector = collector;
    113         }
    114 
    115         int sum = 0;
    116         /**
    117          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    118          */
    119         @Override
    120         public void execute(Tuple input) {
    121             try{
    122                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
    123                 Integer value = input.getIntegerByField("num");
    124                 if(value == 3){
    125                     throw new Exception("value=3异常.....");
    126                 }
    127                 sum+=value;
    128                 System.out.println("和:"+sum);
    129                 this.collector.ack(input);//这个表示确认消息处理成功,spout中的ack方法会被调用
    130             }catch(Exception e) {
    131                 this.collector.fail(input);//这个表示确认消息处理失败,spout中的fail方法会被调用
    132                 e.printStackTrace();
    133             }
    134         }
    135         
    136         /**
    137          * 声明输出字段
    138          */
    139         @Override
    140         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    141             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    142             //如果nextTuple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    143         }
    144         
    145     }
    146     /**
    147      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    148      * @param args
    149      */
    150     public static void main(String[] args) {
    151         //组装topology
    152         TopologyBuilder topologyBuilder = new TopologyBuilder();
    153         topologyBuilder.setSpout("spout1", new MySpout());
    154         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    155         topologyBuilder.setBolt("bolt1", new MyBolt()).shuffleGrouping("spout1");
    156         
    157         //创建本地storm集群
    158         LocalCluster localCluster = new LocalCluster();
    159         Config config = new Config();
    160         localCluster.submitTopology("sumTopology", config, topologyBuilder.createTopology());
    161     }
    162 
    163 }

     运行结果:

    从结果可以看到Bolt1执行execute成功了通过ack  调用Spout1中的ack方法....   失败了就通过fail 调用Spout1中的fail 方法 来达到对消息处理成功与否的追踪.

    //=============================================================================================

    上面的例子是一个Spout 和一个Bolt.....如果对应有1个Spout和2个Bolt 会是什么情况.....

    改造上面的代码.....

      1 /**
      2  * 数字累加求和
      3  * 先添加storm依赖
      4  * 
      5  * @author Administrator
      6  *
      7  */
      8 public class LocalTopologySumAcker2 {
      9     
     10     
     11     /**
     12      * spout需要继承baserichspout,实现未实现的方法
     13      * @author Administrator
     14      *
     15      */
     16     public static class MySpout extends BaseRichSpout{
     17         private Map conf;
     18         private TopologyContext context;
     19         private SpoutOutputCollector collector;
     20         
     21         /**
     22          * 初始化方法,只会执行一次
     23          * 在这里面可以写一个初始化的代码
     24          * Map conf:其实里面保存的是topology的一些配置信息
     25          * TopologyContext context:topology的上下文,类似于servletcontext
     26          * SpoutOutputCollector collector:发射器,负责向外发射数据(tuple)
     27          */
     28         @Override
     29         public void open(Map conf, TopologyContext context,
     30                 SpoutOutputCollector collector) {
     31             this.conf = conf;
     32             this.context = context;
     33             this.collector = collector;
     34         }
     35 
     36         int num = 1;
     37         /**
     38          * 这个方法是spout中最重要的方法,
     39          * 这个方法会被storm框架循环调用,可以理解为这个方法是在一个while循环之内
     40          * 每调用一次,会向外发射一条数据
     41          */
     42         @Override
     43         public void nextTuple() {
     44             System.out.println("spout发射:"+num);
     45             //把数据封装到values中,称为一个tuple,发射出去
     46             //messageid:和tuple需要是一一对应的,可以把messageid认为是数据的主键id,而tuple中的内容就是这个数据
     47             //messageid和tuple之间的关系是需要我们程序员维护的
     48             this.collector.emit(new Values(num++),num-1);//传递messageid参数就表示开启了消息确认机制
     49             Utils.sleep(1000);
     50         }
     51         
     52         /**
     53          * 声明输出字段
     54          */
     55         @Override
     56         public void declareOutputFields(OutputFieldsDeclarer declarer) {
     57             //给values中的数据起个名字,方便后面的bolt从这个values中取数据
     58             //fields中定义的参数和values中传递的数值是一一对应的
     59             declarer.declare(new Fields("num"));
     60         }
     61 
     62         @Override
     63         public void ack(Object msgId) {
     64             System.out.println("处理成功");
     65         }
     66 
     67         @Override
     68         public void fail(Object msgId) {
     69             System.out.println("处理失败。。"+msgId);
     70             //TODO--可以选择吧失败的数据重发,或者单独存储后期分析
     71         }
     72     }
     73     
     74     
     75     /**
     76      * 自定义bolt需要实现baserichbolt
     77      * @author Administrator
     78      *
     79      */
     80     public static class MyBolt1 extends BaseRichBolt{
     81         private Map stormConf; 
     82         private TopologyContext context;
     83         private OutputCollector collector;
     84         
     85         /**
     86          * 和spout中的open方法意义一样
     87          */
     88         @Override
     89         public void prepare(Map stormConf, TopologyContext context,
     90                 OutputCollector collector) {
     91             this.stormConf = stormConf;
     92             this.context = context;
     93             this.collector = collector;
     94         }
     95 
     96         int sum = 0;
     97         /**
     98          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
     99          */
    100         @Override
    101         public void execute(Tuple input) {
    102             try {
    103                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
    104                 Integer value = input.getIntegerByField("num");
    105                 this.collector.emit(new Values(value+"_1"));
    106                 //this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1")  老的tuple是input
    107                 this.collector.ack(input);//确认数据处理成功,spout中的ack方法会被调用
    108             } catch (Exception e) {
    109                 this.collector.fail(input);//确认数据处理失败,spout中的fail方法会被调用
    110                 e.printStackTrace();
    111             }
    112         }
    113         
    114         /**
    115          * 声明输出字段
    116          */
    117         @Override
    118         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    119             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    120             //如果nextT|uple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    121             declarer.declare(new Fields("num_1"));
    122         }
    123         
    124     }
    125     
    126     public static class MyBolt2 extends BaseRichBolt{
    127         private Map stormConf; 
    128         private TopologyContext context;
    129         private OutputCollector collector;
    130         
    131         /**
    132          * 和spout中的open方法意义一样
    133          */
    134         @Override
    135         public void prepare(Map stormConf, TopologyContext context,
    136                 OutputCollector collector) {
    137             this.stormConf = stormConf;
    138             this.context = context;
    139             this.collector = collector;
    140         }
    141 
    142         int sum = 0;
    143         /**
    144          * 是bolt中最重要的方法,当spout发射一个tuple出来,execute也会被调用,需要对spout发射出来的tuple进行处理
    145          */
    146         @Override
    147         public void execute(Tuple input) {
    148             try {
    149                 //input.getInteger(0);//也可以根据角标获取tuple中的数据
    150                 String value = input.getStringByField("num_1");
    151                 System.out.println(value);
    152                 this.collector.fail(input);//确认数据处理成功,spout中的ack方法会被调用
    153                 //this.collector.ack(input);//确认数据处理成功,spout中的ack方法会被调用
    154             } catch (Exception e) {
    155                 //this.collector.fail(input);//确认数据处理失败,spout中的fail方法会被调用
    156                 e.printStackTrace();
    157             }
    158         }
    159         
    160         /**
    161          * 声明输出字段
    162          */
    163         @Override
    164         public void declareOutputFields(OutputFieldsDeclarer declarer) {
    165             //在这没必要定义了,因为execute方法中没有向外发射tuple,所以就不需要声明了。
    166             //如果nextT|uple或者execute方法中向外发射了tuple,那么declareOutputFields必须要声明,否则不需要声明
    167             declarer.declare(new Fields("num_1"));
    168         }
    169         
    170     }
    171     
    172     /**
    173      * 注意:在组装topology的时候,组件的id在定义的时候,名称不能以__开头。__是系统保留的
    174      * @param args
    175      */
    176     public static void main(String[] args) {
    177         //组装topology
    178         TopologyBuilder topologyBuilder = new TopologyBuilder();
    179         topologyBuilder.setSpout("spout1", new MySpout());
    180         //.shuffleGrouping("spout1"); 表示让MyBolt接收MySpout发射出来的tuple
    181         topologyBuilder.setBolt("bolt1", new MyBolt1()).shuffleGrouping("spout1");
    182         topologyBuilder.setBolt("bolt2", new MyBolt2()).shuffleGrouping("bolt1");
    183         
    184         //创建本地storm集群
    185         LocalCluster localCluster = new LocalCluster();
    186         localCluster.submitTopology("sumTopology", new Config(), topologyBuilder.createTopology());
    187     }
    188 }

    上面的代码的大体意思是 Bolt1接收Spout1的输出,接收之后在数据后面加上"_1",然后发送给Bolt2,Bolt2接收到之后直接打印.

    在spout2中的execute()方法不管成功还是失败 都调用   this.collector.fail(input);  方法....也就是Spout1发射的数据在Bolt1中处理都成功了,在Bolt2中的处理都失败了.

    看Spout1中的哪个方法会被执行.....也就是Spout2中调用的ack或者是fail对tuple的处理状态结果是否有影响.

    运行看结果:

    可以看出都是成功的...这就说明tuple的处理状态和Bolt2中ack或者是fail是没有任何的关系的......只要Bolt1中处理tuple成功了,我们就认为是处理成功了...

    如果Bolt1处理失败了就认为是处理失败了.. ...现在Bolt1中发射出去的tuple是无法追踪的.....

    能不能在Bolt1发射的数据中也加上一个messageid...这个在Bolt中的   this.collector.emit(new Values(value+"_1"));  emit方法中是不支持传入一个messageid的.

    但是这样有一种场景是有问题的.  单词计数的例子:

    这个Spout后面有两个Bolt  一个SplitBolt 一个CountBolt    SplitBolt 切割成一个个的单词  然后再CountBolt中进行汇总....

    按照上面在SplitBolt中切割成功了,就算处理成功了...但是有可能切割之后 在CountBolt中有一些Bolt没有收到. 这样最后其实是没有成功的...

    而且SpiltBolt中处理的tuple和CountBolt中的tuple之间是有关联的. 后者是在前者之上切割出来的小tuple....

    我们想达到两个Bolt都处理成功了才认为是处理成功的...如何做?

    上面的代码中已经包括......这里再说明一下:

    Spout1中 的   this.collector.emit(input,new Values(value+"_1"));   ---->    this.collector.emit(input,new Values(value+"_1"));//新的tuple是new Values(value+"_1")  老的tuple是input
    在Spout2中还是不管是否异常都调用.. this.collector.fail(input);

    看运行结果:

    
    

    运行都失败了........

    这样就达到了上面的"完全处理"的要求....

    完全处理:保证一个tuple以及这个tuple衍生的所有tuple都被成功处理.

    在storm里面一个tuple被完全处理的意思是: 这个tuple以及由这个tuple所衍生的所有的tuple都被成功处理。

    如果把Bolt2的正常对应改为  this.collector.ack(input);  失败对应 this.collector.fail(input);就回复正常了.....

    如果Spout2后面还有Spout3  同样把老的tuple在emit上带上.........

  • 相关阅读:
    优雅地从Python入门到入土*序与目录
    【NOI2008】假面舞会
    【HNOI2009】梦幻布丁
    【题解】前k大子段和
    【NOIP2017】宝藏
    【NOIP2014】飞扬的小鸟
    【NOIP2014】解方程
    【NOIP2012】开车旅行
    【模板】线性同余方程组
    java实现省市区三级联动
  • 原文地址:https://www.cnblogs.com/DreamDrive/p/6671194.html
Copyright © 2011-2022 走看看