zoukankan      html  css  js  c++  java
  • 大数据处理框架之Strom:容错机制

    1、集群节点宕机
    Nimbus服务器
      单点故障,大部分时间是闲置的,在supervisor挂掉时会影响,所以宕机影响不大,重启即可
    非Nimbus服务器
      故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行

    2、进程挂掉
    Worker
      挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上
    Supervisor
      无状态(所有的状态信息都存放在Zookeeper中来管理)
      快速失败(每当遇到任何异常情况,都会自动毁灭)
    Nimbus
      无状态(所有的状态信息都存放在Zookeeper中来管理)
      快速失败(每当遇到任何异常情况,都会自动毁灭)

    3、消息的完整性
    从Spout中发出的Tuple,以及基于他所产生Tuple,由这些消息就构成了一棵tuple树,当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“完整处理”,即消息的完整性,storm使用Acker确保消息完整性,Acker是拓扑当中特殊的一些任务,负责跟踪每个Spout发出的Tuple的DAG(有向无环图)
    Acker分为ack确认机制和fail失败处理机制,Spout作为数据源,当拓扑中bolt处理失败时该怎么办?Acker机制可以重发数据到bolt进行重新处理。

    看下面的例子:

    MessageSpout  ---->   split-bolt  ---->    write-bolt

    MessageTopology
    package bhz.topology;
    
    import backtype.storm.Config;
    import backtype.storm.LocalCluster;
    import backtype.storm.topology.TopologyBuilder;
    import bhz.bolt.SpliterBolt;
    import bhz.bolt.WriterBolt;
    import bhz.spout.MessageSpout;
    
    public class MessageTopology {
        
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new MessageSpout());
            builder.setBolt("split-bolt", new SpliterBolt()).shuffleGrouping("spout");
            builder.setBolt("write-bolt", new WriterBolt()).shuffleGrouping("split-bolt");
            //本地配置
            Config config = new Config();
            config.setDebug(false);
            LocalCluster cluster = new LocalCluster();
            System.out.println(cluster);
            cluster.submitTopology("message", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.killTopology("message");
            cluster.shutdown();
        }
    }

    MessageSpout

    package bhz.spout;
    
    import java.util.Map;
    
    import backtype.storm.spout.SpoutOutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichSpout;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Values;
    
    public class MessageSpout implements IRichSpout {
    
        private static final long serialVersionUID = 1L;
    
        private int index = 0;
        
        private String[] subjects = new String[]{
                "groovy,oeacnbase",
                "openfire,restful",
                "flume,activiti",
                "hadoop,hbase",
                "spark,sqoop"        
        };
        
        private SpoutOutputCollector collector;
        
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }
        
        @Override
        public void nextTuple() {
            if(index < subjects.length){
                String sub = subjects[index];
                //发送信息参数1 为数值, 参数2为msgId
                collector.emit(new Values(sub), index);
                index++;
            }
        }
        
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("subjects"));
        }
        //当bolt 处理成功  ack确认 spout执行ack方法
        @Override
        public void ack(Object msgId) {
            System.out.println("【消息发送成功!!!】 (msgId = " + msgId +")");
        }
        //当bolt处理失败时,spout调用fail方法,进行重发处理
        @Override
        public void fail(Object msgId) {
            System.out.println("【消息发送失败!!!】  (msgId = " + msgId +")");
            System.out.println("【重发进行中...】");
            collector.emit(new Values(subjects[(Integer) msgId]), msgId);
            System.out.println("【重发成功!!!】");
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void activate() {
    
        }
    
        @Override
        public void deactivate() {
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
    }

    SpliterBolt

    package bhz.bolt;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Fields;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class SpliterBolt implements IRichBolt {
    
        private static final long serialVersionUID = 1L;
    
        private OutputCollector collector;
        
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }
        
        
        private boolean flag = false;
        
        @Override
        public void execute(Tuple tuple) {
            try {
                String subjects = tuple.getStringByField("subjects");
                
                if(!flag && subjects.equals("flume,activiti")){
                    flag = true;
                    int a = 1/0;
                }
                
                String[] words = subjects.split(",");
                //List<String> list = new ArrayList<String>();
                //int index = 0; 
                for (String word : words) {
                    //注意这里循环发送消息,要携带tuple对象,用于处理异常时重发策略
                    collector.emit(tuple, new Values(word));
                    //list.add(word);
                    //index ++;
                }
                //collector.emit(tuple, new Values(list));
                collector.ack(tuple);//通知spout处理成功
            } catch (Exception e) {
                e.printStackTrace();
                collector.fail(tuple);//通知spout 处理失败
            }
        }
        
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
        
        @Override
        public void cleanup() {
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
    }

    WriterBolt

    package bhz.bolt;
    
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.List;
    import java.util.Map;
    
    import backtype.storm.task.OutputCollector;
    import backtype.storm.task.TopologyContext;
    import backtype.storm.topology.IRichBolt;
    import backtype.storm.topology.OutputFieldsDeclarer;
    import backtype.storm.tuple.Tuple;
    import backtype.storm.tuple.Values;
    
    public class WriterBolt implements IRichBolt {
    
        private static final long serialVersionUID = 1L;
    
        private FileWriter writer;
    
        private OutputCollector collector;
    
        @Override
        public void prepare(Map config, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
            try {
                writer = new FileWriter("d://message.txt");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private boolean flag = false;
        
        @Override
        public void execute(Tuple tuple) {
            String word = tuple.getString(0);
    //        List<String> list = (List<String>)tuple.getValueByField("word");
    //        System.out.println("======================" + list);
            try {
                if(!flag && word.equals("hadoop")){
                    flag = true;
                    int a = 1/0;
                }
                writer.write(word);
                writer.write("
    ");
                writer.flush();
            } catch (Exception e) {
                e.printStackTrace();
                collector.fail(tuple);//通知spout处理失败
            }
            collector.emit(tuple, new Values(word));
            collector.ack(tuple);//通知spout处理成功
        }
    
        @Override
        public void cleanup() {
    
        }
    
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
        }
    
        @Override
        public Map<String, Object> getComponentConfiguration() {
            return null;
        }
    
    }

    spout重发机制会带来一个问题:数据重复消费,看上面的例子当WriterBolt执行失败的时候,spout 将hadoop,hbase重发,那么hbase会被WriterBolt再执行一次,目前storm对此没有保障机制,按照业务设计的通用做法就是使用幂等性(比如使用唯一性ID),防止重复消费数据。

  • 相关阅读:
    docker 安装nginx 并部署 配置本地化
    vue安装tinyMCE
    gitignore文件不生效的问题解决
    docker安装Mysql挂载数据卷 实现容器配置本地化
    淘宝网店经营场所证明如何下载
    leetcode 100.相同的树
    深度优先搜索和广度优先搜索
    leetcode 329 矩阵中的最长递增路径
    leetcode 410 分割数组的最大值
    leetcode 95 不同的二叉搜索树II
  • 原文地址:https://www.cnblogs.com/cac2020/p/9857697.html
Copyright © 2011-2022 走看看