zoukankan      html  css  js  c++  java
  • Storm入门(七)可靠性机制代码示例

    一、关联代码

    使用maven,代码如下。

    pom.xml  参考 http://www.cnblogs.com/hd3013779515/p/6970551.html

    MessageTopology.java

    package cn.ljh.storm.reliability;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    public class MessageTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder builder = new TopologyBuilder();
    
            builder.setSpout("MessageSpout", new MessageSpout(), 1);
            builder.setBolt("SpilterBolt", new SpliterBolt(), 5).shuffleGrouping("MessageSpout");
            builder.setBolt("WriterBolt", new WriterBolt(), 1).shuffleGrouping("SpilterBolt");
    
            Config conf = new Config();
            conf.setDebug(false);
    
    
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("messagetest", conf, builder.createTopology());
            Utils.sleep(20000);
            cluster.killTopology("messagetest");
            cluster.shutdown();
        }
    }

    MessageSpou.java

    package cn.ljh.storm.reliability;
    
    import org.apache.storm.topology.OutputFieldsDeclarer;
    
    import java.util.Map;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MessageSpout extends BaseRichSpout {
       public static Logger LOG = LoggerFactory.getLogger(MessageSpout.class);
       private SpoutOutputCollector _collector;
       
       private int index = 0;
       private String[] subjects = new String[]{
               "Java,Python",
               "Storm,Kafka",
               "Spring,Solr",
               "Zookeeper,FastDFS",
               "Dubbox,Redis"
       };
           
       public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
           _collector = collector;
       }
       
       public void nextTuple() {
           
           if(index < subjects.length){
               String sub = subjects[index];
               //使用messageid参数,使可靠性机制生效
               _collector.emit(new Values(sub), index);
               index++;
           }
       }
       
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
           declarer.declare(new Fields("subjects"));
       }
       
       @Override
       public void ack(Object msgId) {
           LOG.info("【消息发送成功!】(msgId = " + msgId + ")");
       }
    
       @Override
       public void fail(Object msgId) {
           LOG.info("【消息发送失败!】(msgId = " + msgId + ")");
           LOG.info("【重发进行中。。。】");
           _collector.emit(new Values(subjects[(Integer)msgId]), msgId);
           LOG.info("【重发成功!】");
       }
       
    }

    SpliterBolt.java

    package cn.ljh.storm.reliability;
    
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class SpliterBolt extends BaseRichBolt {
        OutputCollector _collector;
        private boolean flag = false;
    
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
          _collector = collector;
        }
    
        public void execute(Tuple tuple) {
            
            try{
                String subjects = tuple.getStringByField("subjects");
                
    //            if(!flag && subjects.equals("Spring,Solr")){
    //                flag = true;
    //                int a = 1/0;
    //            }
                
                String[] words = subjects.split(",");
                for(String word : words){
                    //注意:要携带tuple对象,用于处理异常时重发策略。
                    _collector.emit(tuple, new Values(word));
                }
                
                //对tuple进行ack
                _collector.ack(tuple);
            }catch(Exception ex){
                ex.printStackTrace();
                //对tuple进行fail,使重发。
                _collector.fail(tuple);
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
          declarer.declare(new Fields("word"));
        }
    
      }

    WriterBolt.java

    package cn.ljh.storm.reliability;
    
    import java.io.FileWriter;
    import java.io.IOException;
    import java.util.Map;
    
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Tuple;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class WriterBolt extends BaseRichBolt {
            private static Logger LOG = LoggerFactory.getLogger(WriterBolt.class);
            OutputCollector _collector;
            
            private FileWriter fileWriter;
            private boolean flag = false;
    
            public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
              _collector = collector;
    
                if(fileWriter == null){
                    try {
                        fileWriter = new FileWriter("D:\test\"+"words.txt");
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
              
            }
    
            public void execute(Tuple tuple) {
                try {
                      String word = tuple.getStringByField("word");
                    
    //                if(!flag && word.equals("Kafka")){
    //                    flag = true;
    //                    int a = 1/0;
    //                }
                    fileWriter.write(word + "
    ");
                    fileWriter.flush();
                } catch (Exception e) {
                    e.printStackTrace();
                    //对tuple进行fail,使重发。
                    _collector.fail(tuple);
                }
                //对tuple进行ack
                _collector.ack(tuple);
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }
    }

    二、执行效果

    1、代码要点说明

    MessageSpout.java

    (1)发射tuple时要设置messageId来使可靠性机制生效

    _collector.emit(new Values(sub), index);

    (2)重写ack和fail方法

    @Override
       public void ack(Object msgId) {
           LOG.info("【消息发送成功!】(msgId = " + msgId + ")");
       }
    
       @Override
       public void fail(Object msgId) {
           LOG.info("【消息发送失败!】(msgId = " + msgId + ")");
           LOG.info("【重发进行中。。。】");
           _collector.emit(new Values(subjects[(Integer)msgId]), msgId);
           LOG.info("【重发成功!】");
       }

    SpliterBolt.java

    (1)发射新tuple时设置输入tuple参数,以使新tuple和输入tuple为一个整体

    _collector.emit(tuple, new Values(word));

    (2)完成处理后进行ack,失败时进行fail

    _collector.ack(tuple);
    
    _collector.fail(tuple);

    WriterBolt.java

    (1)完成处理后进行ack,失败时进行fail

    _collector.ack(tuple);
    
    _collector.fail(tuple);

    2、正常处理结果

    image

    image

    3、放开SpliterBolt 的错误代码

    结果显示能够正确的重发。

    image

    image

    4、放开SpliterBolt 的错误代码

    能够正确进行重发,但是文件中storm字符串出现了两次。

    image

    image

    5、总结

    通过以上测试,如果在第一个bolt处理时出现异常,可以让整个数据进行重发,如果第二个bolt处理时出现异常,也可以让整个数据进行重发,但是同时出现了重复处理的事务性问题,需要进行特殊的处理。

    (1)如果数据入库的话,可以把messageId也进行入库保存。此messageId可以用来判断是否重复处理。

    (2)事务性tuple尽量不要拆分。

    (3)使用storm的Trident框架。

  • 相关阅读:
    服务器状态码
    QuerySet中添加Extra进行SQL查询
    django配置一个网站建设
    MySQL数据库查询中的特殊命令
    125. Valid Palindrome
    121. Best Time to Buy and Sell Stock
    117. Populating Next Right Pointers in Each Node II
    98. Validate Binary Search Tree
    91. Decode Ways
    90. Subsets II
  • 原文地址:https://www.cnblogs.com/hd3013779515/p/6972525.html
Copyright © 2011-2022 走看看