zoukankan      html  css  js  c++  java
  • storm 消息确认机制及可靠性

    worker进程死掉

    在一个节点 kill work进程 比方 kill 2509  对work没有影响 由于会在其它节点又一次启动进程运行topology任务

    supervisor进程死掉

    supervisor进程kill掉 对work进程没有影响  由于他们是互相独立的!

    nimbus进程死掉(存在HA的问题)

    nimbus假设死掉 整个任务挂掉 存在单点故障问题!(hadoop2有ha!。!!。!

    storm没有ha高可用)

    节点宕机(和supervisor是一样的)


    ack/fail消息确认机制

    spout发送过来的数据  blot要确认数据是否收到及反馈给spout 以下给一个样例:



    import java.util.Map;
    import backtype.storm.Config;
    import backtype.storm.StormSubmitter;
    import backtype.storm.generated.AlreadyAliveException;
    import backtype.storm.generated.InvalidTopologyException;
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.topology.TopologyBuilder;
    import backtype.storm.topology.base.BaseRichBolt;
    import backtype.storm.topology.base.BaseRichSpout;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    import backtype.storm.utils.Utils;


    public class ClusterStormTopologyAck {

    public static class DataSourceSpout extends BaseRichSpout{
    private Map conf;
    private TopologyContext context;
    private SpoutOutputCollector collector;

    /**
    * 在本实例执行的时候被调用一次
    */
    public void open(Map conf, TopologyContext context,
    SpoutOutputCollector collector) {
    this.conf = conf;
    this.context = context;
    this.collector = collector;
    }
    /**
    * 死循环调用 心跳
    */
    int i=0;
    public void nextTuple() {
    System.err.println("spout :"+i);
    //values 就是value的list列表

    //(new Values(i++),i-1);发送的值及key一一相应
    this.collector.emit(new Values(i++),i-1);
    Utils.sleep(1000);
    }
    /**
    * 声明字段名称
    */
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    //fields就是field的列表
    declarer.declare(new Fields("num"));
    }
    @Override
    public void ack(Object msgId) {
    System.out.println("运行ACK:"+msgId);
    }
    @Override
    public void fail(Object msgId) {
    System.out.println("运行FAIL:"+msgId);
    //TODO--
    //this.collector.emit(tuple);
    }



    }

    public static class SumBolt extends BaseRichBolt{

    private Map stormConf;
    private TopologyContext context;
    private OutputCollector collector;
    /**
    * 仅仅会被调用一次
    */
    public void prepare(Map stormConf, TopologyContext context,
    OutputCollector collector) {
    this.stormConf = stormConf;
    this.context = context;
    this.collector = collector;
    }
    /**
    * 死循环,循环的获取上一级发送过来的数据(spout/bolt)
    */
    int sum = 0;
    public void execute(Tuple input) {
    //input.getInteger(0);
    Integer count = input.getIntegerByField("num");

    try{
    //--------

    this.collector.ack(input);
    }catch(Exception e){
    this.collector.fail(input);
    }
    /*if(count>10 && count<20){
    this.collector.fail(input);
    }{
    this.collector.ack(input);
    }*/
    }


    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
    }


    public static void main(String[] args) {
    TopologyBuilder builder = new TopologyBuilder();
    String SPOUT_NAME = DataSourceSpout.class.getSimpleName();
    String BOLT_NAME = SumBolt.class.getSimpleName();
    builder.setSpout(SPOUT_NAME, new DataSourceSpout());
    builder.setBolt(BOLT_NAME, new SumBolt()).shuffleGrouping(SPOUT_NAME);
    Config config = new Config();
    try {
    StormSubmitter.submitTopology(ClusterStormTopologyAck.class.getSimpleName(), config, builder.createTopology());
    } catch (AlreadyAliveException e) {
    e.printStackTrace();
    } catch (InvalidTopologyException e) {
    e.printStackTrace();
    }


    }


    }

  • 相关阅读:
    UVA
    把.apk传到站点server下载
    2014腾讯实习生面试经历(重庆站)
    Phalcon 訪问控制列表 ACL(Access Control Lists ACL)
    Regular Expression Matching
    改动Androidproject的名称(非Eclipse重命名)
    Cocos2d-x3.2 LayerMultiplex使用说明
    BAT解密:互联网技术发展之路(5)- 开发层技术剖析
    使用xftp连接VirtualBox中的centos6.5
    【C语言】二维数组中的查找,杨氏矩阵
  • 原文地址:https://www.cnblogs.com/gccbuaa/p/7039671.html
Copyright © 2011-2022 走看看