zoukankan      html  css  js  c++  java
  • storm的可靠性

    消息确认机制:

    在数据发送的过程中可能会数据丢失导致没能接收到,spout有个超时时间(默认是30S),如果30S过去了还是没有接收到数据,也认为是处理失败。

    运行结果都是处理成功

    参考代码StormTopologyAcker.java

    package yehua.storm;
    
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    
    public class StormTopologyAcker {
        
        public static class MySpout extends BaseRichSpout{
            private Map conf;
            private TopologyContext context;
            private SpoutOutputCollector collector;
            @Override
            public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                this.conf = conf;
                this.collector = collector;
                this.context = context;
            }
    
            int num = 0; 
            @Override
            public void nextTuple() {
                num++;
                System.out.println("spout:"+num);
                int messageid = num;
                //开启消息确认机制,就是在发送数据的时候发送一个messageid,一般情况下,messageid可以理解为mysql数据里面的主键id字段
                //要保证messageid和tuple之间有一个唯一的对应关系,这个关系需要程序员自己维护
                this.collector.emit(new Values(num),messageid);
                Utils.sleep(1000);
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("num"));
            }
    
            @Override
            public void ack(Object msgId) {
                System.out.println("处理成功!"+msgId);
            }
    
            @Override
            public void fail(Object msgId) {
                System.out.println("处理失败!"+msgId);
                //TODO  可以吧这个数据单独记录下来
            }
            
        }
        
        
        
        public static class MyBolt extends BaseRichBolt{
            
            private Map stormConf;
            private TopologyContext context;
            private OutputCollector collector;
            @Override
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
            
            int sum = 0;
            @Override
            public void execute(Tuple input) {
                try{
                    Integer num = input.getIntegerByField("num");
                    sum += num;
                    System.out.println("sum="+sum);
                    this.collector.ack(input);
                }catch(Exception e){
                    this.collector.fail(input);
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                
            }
            
        }
        
        
        
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            String spout_id = MySpout.class.getSimpleName();
            String bolt_id = MyBolt.class.getSimpleName();
            
            topologyBuilder.setSpout(spout_id, new MySpout());
            topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id);
            
            
            Config config = new Config();
            config.setMaxSpoutPending(1000);//如果设置了这个参数,必须要保证开启了acker机制才有效
            String topology_name = StormTopologyAcker.class.getSimpleName();
            if(args.length==0){
                //在本地运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
            }else{
                //在集群运行
                try {
                    StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            }
            
        }
    
    }
  • 相关阅读:
    重启sqlserver服务命令
    k8s学习
    collection包1.1.0都升级了什么功能
    Golang项目的测试实践
    一个让业务开发效率提高10倍的golang库
    GopherChina第二天小结
    GopherChina第一天小结
    slice是什么时候决定要扩张?
    史上最快的后台搭建框架
    gorm的日志模块源码解析
  • 原文地址:https://www.cnblogs.com/braveym/p/6964978.html
Copyright © 2011-2022 走看看