zoukankan      html  css  js  c++  java
  • storm的可靠性

    消息确认机制:

    在数据发送的过程中可能会数据丢失导致没能接收到,spout有个超时时间(默认是30S),如果30S过去了还是没有接收到数据,也认为是处理失败。

    运行结果都是处理成功

    参考代码StormTopologyAcker.java

    package yehua.storm;
    
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    
    public class StormTopologyAcker {
        
        public static class MySpout extends BaseRichSpout{
            private Map conf;
            private TopologyContext context;
            private SpoutOutputCollector collector;
            @Override
            public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                this.conf = conf;
                this.collector = collector;
                this.context = context;
            }
    
            int num = 0; 
            @Override
            public void nextTuple() {
                num++;
                System.out.println("spout:"+num);
                int messageid = num;
                //开启消息确认机制,就是在发送数据的时候发送一个messageid,一般情况下,messageid可以理解为mysql数据里面的主键id字段
                //要保证messageid和tuple之间有一个唯一的对应关系,这个关系需要程序员自己维护
                this.collector.emit(new Values(num),messageid);
                Utils.sleep(1000);
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("num"));
            }
    
            @Override
            public void ack(Object msgId) {
                System.out.println("处理成功!"+msgId);
            }
    
            @Override
            public void fail(Object msgId) {
                System.out.println("处理失败!"+msgId);
                //TODO  可以吧这个数据单独记录下来
            }
            
        }
        
        
        
        public static class MyBolt extends BaseRichBolt{
            
            private Map stormConf;
            private TopologyContext context;
            private OutputCollector collector;
            @Override
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
            
            int sum = 0;
            @Override
            public void execute(Tuple input) {
                try{
                    Integer num = input.getIntegerByField("num");
                    sum += num;
                    System.out.println("sum="+sum);
                    this.collector.ack(input);
                }catch(Exception e){
                    this.collector.fail(input);
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                
            }
            
        }
        
        
        
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            String spout_id = MySpout.class.getSimpleName();
            String bolt_id = MyBolt.class.getSimpleName();
            
            topologyBuilder.setSpout(spout_id, new MySpout());
            topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id);
            
            
            Config config = new Config();
            config.setMaxSpoutPending(1000);//如果设置了这个参数,必须要保证开启了acker机制才有效
            String topology_name = StormTopologyAcker.class.getSimpleName();
            if(args.length==0){
                //在本地运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
            }else{
                //在集群运行
                try {
                    StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            }
            
        }
    
    }
  • 相关阅读:
    setTimeout中0毫秒延时
    javascript中call和apply方法
    javascript闭包
    apns 服务
    新的开始,新的起点
    心情笔记
    如何解决控件附件上传时超大附件无法上传的问题
    BPM实例分享——日期自动计算
    BPM实例分享——金额规则大写
    分享一个程序猿在流程数据查看权限问题的总结
  • 原文地址:https://www.cnblogs.com/braveym/p/6964978.html
Copyright © 2011-2022 走看看