zoukankan      html  css  js  c++  java
  • storm的可靠性

    消息确认机制:

    在数据发送的过程中可能会数据丢失导致没能接收到,spout有个超时时间(默认是30S),如果30S过去了还是没有接收到数据,也认为是处理失败。

    运行结果都是处理成功

    参考代码StormTopologyAcker.java

    package yehua.storm;
    
    import java.util.Map;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.generated.AlreadyAliveException;
    import org.apache.storm.generated.AuthorizationException;
    import org.apache.storm.generated.InvalidTopologyException;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    
    
    public class StormTopologyAcker {
        
        public static class MySpout extends BaseRichSpout{
            private Map conf;
            private TopologyContext context;
            private SpoutOutputCollector collector;
            @Override
            public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                this.conf = conf;
                this.collector = collector;
                this.context = context;
            }
    
            int num = 0; 
            @Override
            public void nextTuple() {
                num++;
                System.out.println("spout:"+num);
                int messageid = num;
                //开启消息确认机制,就是在发送数据的时候发送一个messageid,一般情况下,messageid可以理解为mysql数据里面的主键id字段
                //要保证messageid和tuple之间有一个唯一的对应关系,这个关系需要程序员自己维护
                this.collector.emit(new Values(num),messageid);
                Utils.sleep(1000);
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("num"));
            }
    
            @Override
            public void ack(Object msgId) {
                System.out.println("处理成功!"+msgId);
            }
    
            @Override
            public void fail(Object msgId) {
                System.out.println("处理失败!"+msgId);
                //TODO  可以吧这个数据单独记录下来
            }
            
        }
        
        
        
        public static class MyBolt extends BaseRichBolt{
            
            private Map stormConf;
            private TopologyContext context;
            private OutputCollector collector;
            @Override
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
            
            int sum = 0;
            @Override
            public void execute(Tuple input) {
                try{
                    Integer num = input.getIntegerByField("num");
                    sum += num;
                    System.out.println("sum="+sum);
                    this.collector.ack(input);
                }catch(Exception e){
                    this.collector.fail(input);
                }
            }
    
            @Override
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                
            }
            
        }
        
        
        
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            String spout_id = MySpout.class.getSimpleName();
            String bolt_id = MyBolt.class.getSimpleName();
            
            topologyBuilder.setSpout(spout_id, new MySpout());
            topologyBuilder.setBolt(bolt_id, new MyBolt()).shuffleGrouping(spout_id);
            
            
            Config config = new Config();
            config.setMaxSpoutPending(1000);//如果设置了这个参数,必须要保证开启了acker机制才有效
            String topology_name = StormTopologyAcker.class.getSimpleName();
            if(args.length==0){
                //在本地运行
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());
            }else{
                //在集群运行
                try {
                    StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());
                } catch (AlreadyAliveException e) {
                    e.printStackTrace();
                } catch (InvalidTopologyException e) {
                    e.printStackTrace();
                } catch (AuthorizationException e) {
                    e.printStackTrace();
                }
            }
            
        }
    
    }
  • 相关阅读:
    VMware虚拟机中常见的问题汇总
    Windows10下安装VMware虚拟机并搭建CentOS系统环境
    myeclipse2017使用总结
    mybatis如何通过接口查找对应的mapper.xml及方法执行详解
    (转)将SVN从一台服务器迁移到另一台服务器(Windows Server VisualSVN Server)
    (转)Maven中的库(repository)详解 ---repository配置查找构件(如.jar)的远程库
    Git知识讲解
    (转)MyEclipse中使用git
    在SpringBoot中添加Logback日志处理
    (转)Spring Boot干货系列:(七)默认日志logback配置解析
  • 原文地址:https://www.cnblogs.com/braveym/p/6964978.html
Copyright © 2011-2022 走看看