zoukankan      html  css  js  c++  java
  • Storm入门(七)可靠性机制代码示例

    一、关联代码

    使用maven,代码如下。

    pom.xml  参考 http://www.cnblogs.com/hd3013779515/p/6970551.html

    MessageTopology.java

    package cn.ljh.storm.reliability;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    public class MessageTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout("MessageSpout", new MessageSpout(), 1);
            builder.setBolt("SpilterBolt", new SpliterBolt(), 5).shuffleGrouping("MessageSpout");
            builder.setBolt("WriterBolt", new WriterBolt(), 1).shuffleGrouping("SpilterBolt");
    
            Config conf = new Config();
            conf.setDebug(false);
    
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("messagetest", conf, builder.createTopology());
            Utils.sleep(20000);
            cluster.killTopology("messagetest");
            cluster.shutdown();
        }
    }

    MessageSpou.java

    package cn.ljh.storm.reliability;
    
    import org.apache.storm.topology.OutputFieldsDeclarer;
    
    import java.util.Map;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MessageSpout extends BaseRichSpout {
       public static Logger LOG = LoggerFactory.getLogger(MessageSpout.class);
       private SpoutOutputCollector _collector;
       
       private int index = 0;
       private String[] subjects = new String[]{
               "Java,Python",
               "Storm,Kafka",
               "Spring,Solr",
               "Zookeeper,FastDFS",
               "Dubbox,Redis"
       };
           
       public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
           _collector = collector;
       }
       
       public void nextTuple() {
           
           if(index < subjects.length){
               String sub = subjects[index];
               //使用messageid参数,使可靠性机制生效
               _collector.emit(new Values(sub), index);
               index++;
           }
       }
       
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
           declarer.declare(new Fields("subjects"));
       }
       
       @Override
       public void ack(Object msgId) {
           LOG.info("【消息发送成功!】(msgId = " + msgId + ")");
       }
    
       @Override
       public void fail(Object msgId) {
           LOG.info("【消息发送失败!】(msgId = " + msgId + ")");
           LOG.info("【重发进行中。。。】");
           _collector.emit(new Values(subjects[(Integer)msgId]), msgId);
           LOG.info("【重发成功!】");
       }
       
    }

    SpliterBolt.java

    package cn.ljh.storm.reliability;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class SpliterBolt extends BaseRichBolt {
        OutputCollector _collector;
        private boolean flag = false;
    
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
          _collector = collector;
        }
    
        public void execute(Tuple tuple) {
            
            try{
                String subjects = tuple.getStringByField("subjects");
                
    //            if(!flag && subjects.equals("Spring,Solr")){
    //                flag = true;
    //                int a = 1/0;
    //            }
                
                String[] words = subjects.split(",");
                for(String word : words){
                    //注意:要携带tuple对象,用于处理异常时重发策略。
                    _collector.emit(tuple, new Values(word));
                }
                
                //对tuple进行ack
                _collector.ack(tuple);
            }catch(Exception ex){
                ex.printStackTrace();
                //对tuple进行fail,使重发。
                _collector.fail(tuple);
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word"));
        }
    
      }

    WriterBolt.java

    package cn.ljh.storm.reliability;
    
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class WriterBolt extends BaseRichBolt {
            private static Logger LOG = LoggerFactory.getLogger(WriterBolt.class);
            OutputCollector _collector;
            
            private FileWriter fileWriter;
            private boolean flag = false;
    
            public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
              _collector = collector;
    
                if(fileWriter == null){
                    try {
                        fileWriter = new FileWriter("D:\test\"+"words.txt");
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
              
            }
    
            public void execute(Tuple tuple) {
                try {
                      String word = tuple.getStringByField("word");
                    
    //                if(!flag && word.equals("Kafka")){
    //                    flag = true;
    //                    int a = 1/0;
    //                }
                    fileWriter.write(word + "
    ");
                    fileWriter.flush();
                } catch (Exception e) {
                    e.printStackTrace();
                    //对tuple进行fail,使重发。
                    _collector.fail(tuple);
                }
                //对tuple进行ack
                _collector.ack(tuple);
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }
    }

    二、执行效果

    1、代码要点说明

    MessageSpout.java

    (1)发射tuple时要设置messageId来使可靠性机制生效

    _collector.emit(new Values(sub), index);

    (2)重写ack和fail方法

    @Override
       public void ack(Object msgId) {
           LOG.info("【消息发送成功!】(msgId = " + msgId + ")");
       }
    
       @Override
       public void fail(Object msgId) {
           LOG.info("【消息发送失败!】(msgId = " + msgId + ")");
           LOG.info("【重发进行中。。。】");
           _collector.emit(new Values(subjects[(Integer)msgId]), msgId);
           LOG.info("【重发成功!】");
       }

    SpliterBolt.java

    (1)发射新tuple时设置输入tuple参数,以使新tuple和输入tuple为一个整体

    _collector.emit(tuple, new Values(word));

    (2)完成处理后进行ack,失败时进行fail

    _collector.ack(tuple);
    
    _collector.fail(tuple);

    WriterBolt.java

    (1)完成处理后进行ack,失败时进行fail

    _collector.ack(tuple);
    
    _collector.fail(tuple);

    2、正常处理结果

    image

    image

    3、放开SpliterBolt 的错误代码

    结果显示能够正确的重发。

    image

    image

    4、放开SpliterBolt 的错误代码

    能够正确进行重发,但是文件中storm字符串出现了两次。

    image

    image

    5、总结

    通过以上测试,如果在第一个bolt处理时出现异常,可以让整个数据进行重发,如果第二个bolt处理时出现异常,也可以让整个数据进行重发,但是同时出现了重复处理的事务性问题,需要进行特殊的处理。

    (1)如果数据入库的话,可以把messageId也进行入库保存。此messageId可以用来判断是否重复处理。

    (2)事务性tuple尽量不要拆分。

    (3)使用storm的Trident框架。

  • 相关阅读:
    Chandy-Lamport_algorithm
    3 differences between Savepoints and Checkpoints in Apache Flink
    列数 行数 表数 限制
    数据收集、传输、元数据管理、作业流调度、海量数据查询引擎、数据可视化
    分析云负载均衡产品
    端口被占用通过域名的处理 把www.domain.com均衡到本机不同的端口 反向代理 隐藏端口 Nginx做非80端口转发 搭建nginx反向代理用做内网域名转发 location 规则
    JSON Web Token
    查看开启端口的应用
    If the parts of an organization (e.g., teams, departments, or subdivisions) do not closely reflect the essential parts of the product, or if the relationship between organizations do not reflect the r
    微服务架构的理论基础
  • 原文地址:https://www.cnblogs.com/hd3013779515/p/6972525.html
Copyright © 2011-2022 走看看