zoukankan      html  css  js  c++  java
  • 统计Metric

    package com.example.mail;
    
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.metric.LoggingMetricsConsumer;
    import com.example.mail.TestWordSpout;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.utils.Utils;
    
    public class ExclamationTopology {
      public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("word", new TestWordSpout());
        builder.setBolt("exclaim1", new ExclamationBolt()).shuffleGrouping("word");
        //builder.setBolt("exclaim2", new ExclamationBolt()).shuffleGrouping("exclaim1");
        Config conf = new Config();
        conf.setDebug(true);
        //输出统计指标值到日志文件中
         conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
        if (args != null && args.length > 0) {
          conf.setNumWorkers(3);
          StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {
          LocalCluster cluster = new LocalCluster();
          cluster.submitTopology("test", conf, builder.createTopology());
          Utils.sleep(100000);
          cluster.killTopology("test");
          cluster.shutdown();
        }
      }
    }
    package com.example.mail;
    
    import org.apache.storm.Config;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import java.util.Map;
    import org.apache.storm.spout.SpoutOutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.base.BaseRichSpout;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Values;
    import org.apache.storm.utils.Utils;
    import java.util.HashMap;
    import java.util.Random;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    public class TestWordSpout extends BaseRichSpout {
        public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
        boolean _isDistributed;
        SpoutOutputCollector _collector;
    
        public TestWordSpout() {
            this(true);
        }
    
        public TestWordSpout(boolean isDistributed) {
            _isDistributed = isDistributed;
        }
            
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
        }
        
        public void close() {
            
        }
            
        public void nextTuple() {
            Utils.sleep(1000);
            final String[] words = new String[] {"a", "b", "c", "d", "e"};
            final Random rand = new Random();
            final String word = words[rand.nextInt(words.length)];
            _collector.emit(new Values(word));
        }
        
        public void ack(Object msgId) {
    
        }
    
        public void fail(Object msgId) {
            
        }
        
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }
    
       /* @Override
        public Map<String, Object> getComponentConfiguration() {
            if(!_isDistributed) {
                Map<String, Object> ret = new HashMap<String, Object>();
                ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
                return ret;
            } else {
                return null;
            }
        }    */
    }
    package com.example.mail;
    
    import java.util.Map;
    
    
    import org.apache.storm.metric.api.CountMetric;
    import org.apache.storm.metric.api.MeanReducer;
    import org.apache.storm.metric.api.MultiCountMetric;
    import org.apache.storm.metric.api.ReducedMetric;
    import org.apache.storm.task.OutputCollector;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.topology.base.BaseRichBolt;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    
    public class ExclamationBolt extends BaseRichBolt {
      OutputCollector _collector;
      //定义指标统计对象
       transient CountMetric _countMetric;
       transient MultiCountMetric _wordCountMetric;
       //transient ReducedMetric _wordLengthMeanMetric;
       @Override
       public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
         _collector = collector;
         initMetrics(context);
       }
       @Override
       public void execute(Tuple tuple) {
         _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
         _collector.ack(tuple);
           updateMetrics(tuple.getString(0));
       }
    
       @Override
       public void declareOutputFields(OutputFieldsDeclarer declarer) {
         declarer.declare(new Fields("word"));
       }
       //初始化计数器
         void initMetrics(TopologyContext context)
         {
             _countMetric = new CountMetric();
             _wordCountMetric = new MultiCountMetric();
             //_wordLengthMeanMetric = new ReducedMetric(new MeanReducer());
             context.registerMetric("execute_count", _countMetric, 5);
             context.registerMetric("word_count", _wordCountMetric, 10);
             //context.registerMetric("word_length", _wordLengthMeanMetric, 60);
         }
    
       //更新计数器
         void updateMetrics(String word)
         {
             _countMetric.incr();
             _wordCountMetric.scope(word).incr();
             //_wordLengthMeanMetric.update(word.length());
         }
    }
  • 相关阅读:
    ==和equals
    java 多重继承
    java单例模式
    基础小知识
    print流之错误日志
    print流
    实现读文本文件(IOl流)
    缓冲流(数据的复制粘贴)IO流
    力扣20题、1047(括号合法性,删除字符串中的所有相邻重复项)
    力扣232题、225题(栈实现队列,队列实现栈)
  • 原文地址:https://www.cnblogs.com/tonggc1668/p/9041560.html
Copyright © 2011-2022 走看看