zoukankan      html  css  js  c++  java
  • Storm-源码分析- metric

    首先定义一系列metric相关的interface, IMetric, IReducer, ICombiner (backtype.storm.metric.api)

    在task中, 创建一系列builtin-metrics, (backtype.storm.daemon.builtin-metrics), 并注册到topology context里面

    task会不断的利用如spout-acked-tuple!的functions去更新这些builtin-metrics

    task会定期将builtin-metrics里面的统计数据通过METRICS-STREAM发送给metric-bolt (backtype.storm.metric.MetricsConsumerBolt, 该bolt会创建实现backtype.storm.metric.api.IMetricsConsumer的对象, 用于计算出metrics)
    然后如何使用这些metrics?
    由于这是builtin metrics, 是不会被外界使用的
    如果处理这些metrics, 取决于_metricsConsumer.handleDataPoints, 这里的_metricsConsumer是通过topology's configuration配置的
    比如backtype.storm.metric.LoggingMetricsConsumer, 如果使用这个consumer就会将metrics写入log中

     

    1. backtype.storm.metric.api

    IMetric

    package backtype.storm.metric.api;
    public interface IMetric {
        public Object getValueAndReset(); ;;取得当前值并恢复初始状态
    }

    CountMetric, 计数, reset时清零
    AssignableMetric, 赋值, 不用reset
    MultiCountMetric, 使用hashmap记录多个count, reset时分别对每个count对象调用getValueAndReset

    public class CountMetric implements IMetric {
        long _value = 0;
    
        public CountMetric() {
        }
        
        public void incr() {
            _value++;
        }
    
        public void incrBy(long incrementBy) {
            _value += incrementBy;
        }
    
        public Object getValueAndReset() {
            long ret = _value;
            _value = 0;
            return ret;
        }
    }

     

    ICombiner

    public interface ICombiner<T> {
        public T identity();
        public T combine(T a, T b);
    }

    CombinedMetric, 结合ICombiner和IMetric

    public class CombinedMetric implements IMetric {
        private final ICombiner _combiner;
        private Object _value;
    
        public CombinedMetric(ICombiner combiner) {
            _combiner = combiner;
            _value = _combiner.identity();
        }
        
        public void update(Object value) {
            _value = _combiner.combine(_value, value);
        }
    
        public Object getValueAndReset() {
            Object ret = _value;
            _value = _combiner.identity();
            return ret;
        }
    }

     

    IReducer

    public interface IReducer<T> {
        T init();
        T reduce(T accumulator, Object input);
        Object extractResult(T accumulator);
    }

    实现IReducer接口, 实现平均数Reducer, reduce里面累加和计数, extractResult里面acc/count求平均数

    class MeanReducerState {
        public int count = 0;
        public double sum = 0.0;
    }
    
    public class MeanReducer implements IReducer<MeanReducerState> {
        public MeanReducerState init() {
            return new MeanReducerState();
        }
    
        public MeanReducerState reduce(MeanReducerState acc, Object input) {
            acc.count++;
            if(input instanceof Double) {
                acc.sum += (Double)input;
            } else if(input instanceof Long) {
                acc.sum += ((Long)input).doubleValue();
            } else if(input instanceof Integer) {
                acc.sum += ((Integer)input).doubleValue();
            } else {
                throw new RuntimeException(
                    "MeanReducer::reduce called with unsupported input type `" + input.getClass()
                    + "`. Supported types are Double, Long, Integer.");
            }
            return acc;
        }
    
        public Object extractResult(MeanReducerState acc) {
            if(acc.count > 0) {
                return new Double(acc.sum / (double)acc.count);
            } else {
                return null;
            }
        }
    }

     

    ReducedMetric

    结合IReducer和IMetric

    public class ReducedMetric implements IMetric {
        private final IReducer _reducer;
        private Object _accumulator;
    
        public ReducedMetric(IReducer reducer) {
            _reducer = reducer;
            _accumulator = _reducer.init();
        }
    
        public void update(Object value) {
            _accumulator = _reducer.reduce(_accumulator, value);
        }
    
        public Object getValueAndReset() {
            Object ret = _reducer.extractResult(_accumulator);
            _accumulator = _reducer.init();
            return ret;
        }
    }

     

    IMetricsConsumer

    这个interface, 内嵌TaskInfo和DataPoint类
    handleDataPoints, 添加逻辑以处理task对应的一系列DataPoint

    public interface IMetricsConsumer {
        public static class TaskInfo {
            public TaskInfo() {}
            public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) {
                this.srcWorkerHost = srcWorkerHost;
                this.srcWorkerPort = srcWorkerPort;
                this.srcComponentId = srcComponentId; 
                this.srcTaskId = srcTaskId; 
                this.timestamp = timestamp;
                this.updateIntervalSecs = updateIntervalSecs; 
            }
            public String srcWorkerHost;
            public int srcWorkerPort;
            public String srcComponentId; 
            public int srcTaskId; 
            public long timestamp;
            public int updateIntervalSecs; 
        }
        public static class DataPoint {
            public DataPoint() {}
            public DataPoint(String name, Object value) {
                this.name = name;
                this.value = value;
            }
            @Override
            public String toString() {
                return "[" + name + " = " + value + "]";
            }
            public String name; 
            public Object value;
        }
    
        void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);
        void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
        void cleanup();
    }

     

    2. backtype.storm.daemon.builtin-metrics

    定义Spout和Bolt所需要的一些metric, 主要两个record, BuiltinSpoutMetrics和BuiltinBoltMetrics, [metric-name, metric-object]的hashmap

    (defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count                                
                                    ^MultiReducedMetric complete-latency
                                    ^MultiCountMetric fail-count
                                    ^MultiCountMetric emit-count
                                    ^MultiCountMetric transfer-count])
    (defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count
                                   ^MultiReducedMetric process-latency
                                   ^MultiCountMetric fail-count
                                   ^MultiCountMetric execute-count
                                   ^MultiReducedMetric execute-latency
                                   ^MultiCountMetric emit-count
                                   ^MultiCountMetric transfer-count])
    
    (defn make-data [executor-type]
      (condp = executor-type
        :spout (BuiltinSpoutMetrics. (MultiCountMetric.)
                                     (MultiReducedMetric. (MeanReducer.))
                                     (MultiCountMetric.)
                                     (MultiCountMetric.)
                                     (MultiCountMetric.))
        :bolt (BuiltinBoltMetrics. (MultiCountMetric.)
                                   (MultiReducedMetric. (MeanReducer.))
                                   (MultiCountMetric.)
                                   (MultiCountMetric.)
                                   (MultiReducedMetric. (MeanReducer.))
                                   (MultiCountMetric.)
                                   (MultiCountMetric.))))
    
    (defn register-all [builtin-metrics  storm-conf topology-context]
      (doseq [[kw imetric] builtin-metrics]
        (.registerMetric topology-context (str "__" (name kw)) imetric
                         (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))

    在mk-task-data的时候, 调用make-data来创建相应的metrics,

    :builtin-metrics (builtin-metrics/make-data (:type executor-data))

    并在executor的mk-threads中, 会将这些builtin-metrics注册到topologycontext中去,

    (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))

    上面完成的builtin-metrics的创建和注册, 接着定义了一系列用于更新metrics的functions,

    以spout-acked-tuple!为例, 需要更新MultiCountMetric ack-count和MultiReducedMetric complete-latency
    .scope从MultiCountMetric取出某个CountMetric, 然后incrBy来将stats的rate增加到count上

    (defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]  
      (-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
      (-> m .complete-latency (.scope stream) (.update latency-ms)))

     

    3. backtype.storm.metric

    MetricsConsumerBolt

    创建实现IMetricsConsumer的对象, 并在execute里面调用handleDataPoints

    package backtype.storm.metric;
    public class MetricsConsumerBolt implements IBolt {
        IMetricsConsumer _metricsConsumer;
        String _consumerClassName;
        OutputCollector _collector;
        Object _registrationArgument;
    
        public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
            _consumerClassName = consumerClassName;
            _registrationArgument = registrationArgument;
        }
    
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            try {
                _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance();
            } catch (Exception e) {
                throw new RuntimeException("Could not instantiate a class listed in config under section " +
                    Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
            }
            _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector);
            _collector = collector;
        }
        
        @Override
        public void execute(Tuple input) {
            _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1));
            _collector.ack(input);
        }
    
        @Override
        public void cleanup() {
            _metricsConsumer.cleanup();
        }
        
    }

     

    SystemBolt

    SystemBolt, 根据comments里面说的, 每个worker都有一个, taskid=-1
    定义些system相关的metric, 并注册到topologycontext里面

     

    需要使用Java调用clojure, 所以需要import下面的package

    import clojure.lang.AFn;
    import clojure.lang.IFn; //funtion
    import clojure.lang.RT;  //run-time

    并且用到些用于监控memory和JVM的java package

    java.lang.management.MemoryUsage, 表示内存使用量快照的MemoryUsage对象
    java.lang.management.GarbageCollectorMXBean, 用于Java虚拟机的垃圾回收的管理接口, 比如发生的回收的总次数, 和累计回收时间
    java.lang.management.RuntimeMXBean, 用于Java 虚拟机的运行时系统的管理接口


    这个bolt的特点是, 只有prepare实现了逻辑, 并且通过_prepareWasCalled保证prepare只被执行一次
    prepare中的逻辑, 主要就是定义各种metric, 并且通过registerMetric注册到TopologyContext中
    metic包含, JVM的运行时间, 开始时间, memory情况, 和每个GarbageCollector的情况
    注册的这些system metrics也会一起被发送到MetricsConsumerBolt进行处理
    这应该用spout实现, 为啥用bolt实现?

    // There is one task inside one executor for each worker of the topology.
    // TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
    // This bolt was conceived to export worker stats via metrics api.
    public class SystemBolt implements IBolt {
        private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
        private static boolean _prepareWasCalled = false;
    
        private static class MemoryUsageMetric implements IMetric {
            IFn _getUsage;
            public MemoryUsageMetric(IFn getUsage) {
                _getUsage = getUsage;
            }
            @Override
            public Object getValueAndReset() {
                MemoryUsage memUsage = (MemoryUsage)_getUsage.invoke();
                HashMap m = new HashMap();
                m.put("maxBytes", memUsage.getMax());
                m.put("committedBytes", memUsage.getCommitted());
                m.put("initBytes", memUsage.getInit());
                m.put("usedBytes", memUsage.getUsed());
                m.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed());
                m.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed());
                return m;
            }
        }
    
        // canonically the metrics data exported is time bucketed when doing counts.
        // convert the absolute values here into time buckets.
        private static class GarbageCollectorMetric implements IMetric {
            GarbageCollectorMXBean _gcBean;
            Long _collectionCount;
            Long _collectionTime;
            public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
                _gcBean = gcBean;
            }
            @Override
            public Object getValueAndReset() {
                Long collectionCountP = _gcBean.getCollectionCount();
                Long collectionTimeP = _gcBean.getCollectionTime();
    
                Map ret = null;
                if(_collectionCount!=null && _collectionTime!=null) {
                    ret = new HashMap();
                    ret.put("count", collectionCountP - _collectionCount);
                    ret.put("timeMs", collectionTimeP - _collectionTime);
                }
    
                _collectionCount = collectionCountP;
                _collectionTime = collectionTimeP;
                return ret;
            }
        }
    
        @Override
        public void prepare(final Map stormConf, TopologyContext context, OutputCollector collector) {
            if(_prepareWasCalled && stormConf.get(Config.STORM_CLUSTER_MODE) != "local") {
                throw new RuntimeException("A single worker should have 1 SystemBolt instance.");
            }
            _prepareWasCalled = true;
    
            int bucketSize = RT.intCast(stormConf.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
    
            final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();
    
            context.registerMetric("uptimeSecs", new IMetric() {
                @Override
                public Object getValueAndReset() {
                    return jvmRT.getUptime()/1000.0;
                }
            }, bucketSize);
    
            context.registerMetric("startTimeSecs", new IMetric() {
                @Override
                public Object getValueAndReset() {
                    return jvmRT.getStartTime()/1000.0;
                }
            }, bucketSize);
    
            context.registerMetric("newWorkerEvent", new IMetric() {
                boolean doEvent = true;
    
                @Override
                public Object getValueAndReset() {
                    if (doEvent) {
                        doEvent = false;
                        return 1;
                    } else return 0;
                }
            }, bucketSize);
    
            final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
    
            context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
                public Object invoke() {
                    return jvmMemRT.getHeapMemoryUsage();
                }
            }), bucketSize);
            context.registerMetric("memory/nonHeap", new MemoryUsageMetric(new AFn() {
                public Object invoke() {
                    return jvmMemRT.getNonHeapMemoryUsage();
                }
            }), bucketSize);
    
            for(GarbageCollectorMXBean b : ManagementFactory.getGarbageCollectorMXBeans()) {
                context.registerMetric("GC/" + b.getName().replaceAll("\W", ""), new GarbageCollectorMetric(b), bucketSize);
            }
        }
    
        @Override
        public void execute(Tuple input) {
            throw new RuntimeException("Non-system tuples should never be sent to __system bolt.");
        }
    
        @Override
        public void cleanup() {
        }
    }

     

    4. system-topology!

    这里会动态的往topology里面, 加入metric-component (MetricsConsumerBolt) 和system-component (SystemBolt), 以及相应的steam信息

    system-topology!会往topology加上些东西

    1. acker, 后面再说
    2. metric-bolt, input是所有component的tasks发来的METRICS-STREAM, 没有output
    3. system-bolt, 没有input, output是两个TICK-STREAM
    4. 给所有component, 增加额外的输出metrics-stream, system-stream

    (defn system-topology! [storm-conf ^StormTopology topology]
      (validate-basic! topology)
      (let [ret (.deepCopy topology)]
        (add-acker! storm-conf ret)
        (add-metric-components! storm-conf ret)    
        (add-system-components! storm-conf ret)
        (add-metric-streams! ret)
        (add-system-streams! ret)
        (validate-structure! ret)
        ret
        ))

    4.1 增加component

    看下thrift中的定义, 往topology里面增加一个blot component, 其实就是往hashmap中增加一组[string, Bolt]
    关键就是看看如何使用thrift/mk-bolt-spec*来创建blot spec

    struct StormTopology {
      1: required map<string, SpoutSpec> spouts;
      2: required map<string, Bolt> bolts;
      3: required map<string, StateSpoutSpec> state_spouts;
    }
    struct Bolt {
      1: required ComponentObject bolt_object;
      2: required ComponentCommon common;
    }  
    struct ComponentCommon {
      1: required map<GlobalStreamId, Grouping> inputs;
      2: required map<string, StreamInfo> streams; //key is stream id, outputs
      3: optional i32 parallelism_hint; //how many threads across the cluster should be dedicated to this component
      4: optional string json_conf;
    }
    struct StreamInfo {
      1: required list<string> output_fields;
      2: required bool direct;
    }

    (defn add-metric-components! [storm-conf ^StormTopology topology]  
      (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs storm-conf topology)] ;;从metrics-consumer-bolt-specs中可以看出该bolt会以METRICS-STREAM-ID为输入, 且没有输出
        (.put_to_bolts topology comp-id bolt-spec)))
    
    (defn add-system-components! [conf ^StormTopology topology]
      (let [system-bolt-spec (thrift/mk-bolt-spec*
                              {} ;;input为空, 没有输入
                              (SystemBolt.) ;;object
                              {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
                               METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} ;;output, 定义两个output streams, 但代码中并没有emit                         
                              :p 0
                              :conf {TOPOLOGY-TASKS 0})]
        (.put_to_bolts topology SYSTEM-COMPONENT-ID system-bolt-spec)))

     

    metric-components

    首先, topology里面所有的component(包含system component), 都需要往metics-bolt发送统计数据, 所以component-ids-that-emit-metrics就是all-components-ids+SYSTEM-COMPONENT-ID
    那么对于任意一个comp, 都会对metics-bolt产生如下输入, {[comp-id METRICS-STREAM-ID] :shuffle} (采用:suffle grouping方式)

    然后, 用thrift/mk-bolt-spec*来定义创建bolt的fn, mk-bolt-spec

    最后, 调用mk-bolt-spec来创建metics-bolt的spec, 参考上面的定义
    关键就是, 创建MetricsConsumerBolt对象, 需要从storm-conf里面读出, MetricsConsumer的实现类和参数
    这个bolt负责, 将从各个task接收到的数据, 调用handleDataPoints生成metircs, 参考前面的定义

    (defn metrics-consumer-bolt-specs [storm-conf topology]
      (let [component-ids-that-emit-metrics (cons SYSTEM-COMPONENT-ID (keys (all-components topology)))
            inputs (->> (for [comp-id component-ids-that-emit-metrics]
                          {[comp-id METRICS-STREAM-ID] :shuffle})
                        (into {}))
            
            mk-bolt-spec (fn [class arg p]
                           (thrift/mk-bolt-spec*
                            inputs  ;;inputs集合
                            (backtype.storm.metric.MetricsConsumerBolt. class arg) ;;object
                            {} ;;output为空
    :p p :conf {TOPOLOGY-TASKS p}))] (map (fn [component-id register] [component-id (mk-bolt-spec (get register "class") (get register "argument") (or (get register "parallelism.hint") 1))]) (metrics-consumer-register-ids storm-conf) (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))

     

    4.2 增加stream

    给每个component增加两个output stream
    METRICS-STREAM-ID, 发送给metric-blot, 数据结构为output-fields ["task-info" "data-points"]
    SYSTEM-STREAM-ID, ,数据结构为output-fields ["event"]

    (defn add-metric-streams! [^StormTopology topology]
      (doseq [[_ component] (all-components topology)
              :let [common (.get_common component)]]
        (.put_to_streams common METRICS-STREAM-ID
                         (thrift/output-fields ["task-info" "data-points"]))))
    
    (defn add-system-streams! [^StormTopology topology]
      (doseq [[_ component] (all-components topology)
              :let [common (.get_common component)]]
        (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))
  • 相关阅读:
    slf4j+log4j的使用
    <context:component-scan>详解
    Spring装配Bean---使用xml配置
    Spring应用上下文中Bean的生命周期
    bootstrap table 复选框选中后,翻页之后保留先前选中数据
    前后端分离的时代,如何解决前后端接口联调问题?
    利用vue-cli搭建vue项目
    vue之注册自定义的全局js函数
    小程序之图片上传
    微信小程序-蓝牙连接
  • 原文地址:https://www.cnblogs.com/fxjwind/p/3225057.html
Copyright © 2011-2022 走看看