zoukankan      html  css  js  c++  java
  • 深入理解Flink Metrics的内部结构

    从Metrics的使用说起

    Flink的Metrics种类有四种CountersGaugesHistograms和Meters.

    如何使用Metrics呢? 以Counter为例,

     1 public class MyMapper extends RichMapFunction<String, String> {
     2   private transient Counter counter;
     3 
     4   @Override
     5   public void open(Configuration config) {
     6     this.counter = getRuntimeContext()
     7       .getMetricGroup()
     8       .counter("myCounter");
     9   }
    10 
    11   @Override
    12   public String map(String value) throws Exception {
    13     this.counter.inc();
    14     return value;
    15   }
    16 }

    行7 getMetricGroup()获取MetricGroup

    行8 从MetricGroup中获取Metric实例

    那么,我们来探访一下MetricGroup

    Metric容器--MetricGroup

    MetricGroup是Metric对象和metric subgroups的容器.

    调用以下4个方法可以获得Metric对象并调用addMetric()注册这个Metric.

    (AbstractMetricGroup.java)

     1 public <C extends Counter> C counter(String name, C counter)     
     2 {
     3     addMetric(name, counter);
     4     return counter;
     5 }
     6 
     7 public <T, G extends Gauge<T>> G gauge(String name, G gauge) {
     8     addMetric(name, gauge);
     9     return gauge;
    10 }
    11 
    12 public <H extends Histogram> H histogram(String name, H histogram) {
    13     addMetric(name, histogram);
    14     return histogram;
    15 }
    16 
    17 public <M extends Meter> M meter(String name, M meter) {
    18     addMetric(name, meter);
    19     return meter;
    20 }

    注,MetricGroup接口的另一个实现UnregisteredMetricsGroup仅仅返回Metric实例而不对Metric进行注册

    注2,MetricGroup接口的第三个实现ProxyMetricGroup有一个parent MetricGroup,ProxyMetricGroup所有的调用都转发到parentMetricGroup上

    (AbstractMetricGroup.java)重要的域

        /** The registry that this metrics group belongs to. */
        protected final MetricRegistry registry; 
    
        /** All metrics that are directly contained in this group. */
        private final Map<String, Metric> metrics = new HashMap<>();
    
        /** All metric subgroups of this group. */
        private final Map<String, AbstractMetricGroup> groups = new HashMap<>();/** Flag indicating whether this group has been closed. */
        private volatile boolean closed;

    (AbstractMetricGroup.java)

     1     protected void addMetric(String name, Metric metric) {
     2         if (metric == null) {
     3             LOG.warn("Ignoring attempted registration of a metric due to being null for name {}.", name);
     4             return;
     5         }
     6         // add the metric only if the group is still open
     7         synchronized (this) {
     8             if (!closed) {
     9                 // immediately put without a 'contains' check to optimize the common case (no collision)
    10                 // collisions are resolved later
    11                 Metric prior = metrics.put(name, metric);
    12 
    13                 // check for collisions with other metric names
    14                 if (prior == null) {
    15                     // no other metric with this name yet
    16 
    17                     if (groups.containsKey(name)) {
    18                         // we warn here, rather than failing, because metrics are tools that should not fail the
    19                         // program when used incorrectly
    20                         LOG.warn("Name collision: Adding a metric with the same name as a metric subgroup: '" +
    21                                 name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
    22                     }
    23 
    24                     registry.register(metric, name, this);
    25                 }
    26                 else {
    27                     // we had a collision. put back the original value
    28                     metrics.put(name, prior);
    29 
    30                     // we warn here, rather than failing, because metrics are tools that should not fail the
    31                     // program when used incorrectly
    32                     LOG.warn("Name collision: Group already contains a Metric with the name '" +
    33                             name + "'. Metric will not be reported." + Arrays.toString(scopeComponents));
    34                 }
    35             }
    36         }
    37     }

    具体再来看一下addMetric()的代码

    行7 获得互斥锁

    行8 检测当前group是否close

    行11~34 把要注册的Metric对象添加到metrics map中

    这里一个小trick是,默认没有key的冲突,直接把这个metric对象添加到map中.再回头检测是否有值被替换出来.这样的做法可以优化性能(若没有key冲突,减少了一次map寻址)

    行24 在MetricRegister中注册Metric,这个在下一节详谈

    调用addGroup()可以添加subgroup

    (AbstractMetricGroup.java)

     1     private AbstractMetricGroup<?> addGroup(String name, ChildType childType) {
     2         synchronized (this) {
     3             if (!closed) {
     4                 // adding a group with the same name as a metric creates problems in many reporters/dashboards
     5                 // we warn here, rather than failing, because metrics are tools that should not fail the
     6                 // program when used incorrectly
     7                 if (metrics.containsKey(name)) {
     8                     LOG.warn("Name collision: Adding a metric subgroup with the same name as an existing metric: '" +
     9                             name + "'. Metric might not get properly reported. " + Arrays.toString(scopeComponents));
    10                 }
    11 
    12                 AbstractMetricGroup newGroup = createChildGroup(name, childType);
    13                 AbstractMetricGroup prior = groups.put(name, newGroup);
    14                 if (prior == null) {
    15                     // no prior group with that name
    16                     return newGroup;
    17                 } else {
    18                     // had a prior group with that name, add the prior group back
    19                     groups.put(name, prior);
    20                     return prior;
    21                 }
    22             }
    23             else {
    24                 // return a non-registered group that is immediately closed already
    25                 GenericMetricGroup closedGroup = new GenericMetricGroup(registry, this, name);
    26                 closedGroup.close();
    27                 return closedGroup;
    28             }
    29         }
    30     }
    31 
    32     protected GenericMetricGroup createChildGroup(String name, ChildType childType) {
    33         switch (childType) {
    34             case KEY:
    35                 return new GenericKeyMetricGroup(registry, this, name);
    36             default:
    37                 return new GenericMetricGroup(registry, this, name);
    38         }
    39     }
    40 
    41     /**
    42      * Enum for indicating which child group should be created.
    43      * `KEY` is used to create {@link GenericKeyMetricGroup}.
    44      * `VALUE` is used to create {@link GenericValueMetricGroup}.
    45      * `GENERIC` is used to create {@link GenericMetricGroup}.
    46      */
    47     protected enum ChildType {
    48         KEY,
    49         VALUE,
    50         GENERIC
    51     }

    行2 获取互斥锁

    行12~21 新建MetricGroup对象

    注意,添加的subgroup的name与Metric对象的name相同会造成问题.

    行25,35,37 同一个tree里的MetricGroup对象使用同一个MetricRegister

    行26 close MetricGroup

     1     public void close() {
     2         synchronized (this) {
     3             if (!closed) {
     4                 closed = true;
     5 
     6                 // close all subgroups
     7                 for (AbstractMetricGroup group : groups.values()) {
     8                     group.close();
     9                 }
    10                 groups.clear();
    11 
    12                 // un-register all directly contained metrics
    13                 for (Map.Entry<String, Metric> metric : metrics.entrySet()) {
    14                     registry.unregister(metric.getValue(), metric.getKey(), this);
    15                 }
    16                 metrics.clear();
    17             }
    18         }
    19     }

    行2 获取互斥锁

    递归地close所有subgroups, 注销所有metrics

    MetricGroup中的addMetric(),addGroup(),close()以及上面未提到的getAllVariables()方法需要获取互斥锁

    原因: 防止关闭group的同时添加metrics和subgroups造成的资源泄露.

    MetricGroup另一个很重要的方法是public String getMetricIdentifier(String metricName, CharacterFilter filter, int reporterIndex).

    作用是获取某个Metric的唯一名作为标志(identifier).

    identifier分为3部分:System scope, User scope, Metric name

    A.B.C   其中,A为System scope,B为User scope,C为Metric name, '.'是分隔符

    System Scope可在conf/flink-conf.yaml中定义.

    User Scope就是groups tree, 可调用addGroup(String)来定义 (可定义多层group)

    MetricGroup与MetricReporter之间的桥梁 -- MetricRegister

    MetricRegistry追踪所有已注册的Metric.它作为MetricGroup和MetricReporter之间的桥梁.

    在MetricGroup的addMetric()方法中调用了MetricRegister的register()方法:

    registry.register(metric, name, this);

    在MetricGroup的close()方法中调用了MetricRegister的unregister()方法:

    registry.unregister(metric.getValue(), metric.getKey(), this);
     1     // ------------------------------------------------------------------------
     2     //  Metrics (de)registration
     3     // ------------------------------------------------------------------------
     4 
     5     @Override
     6     public void register(Metric metric, String metricName, AbstractMetricGroup group) {
     7         synchronized (lock) {
     8             if (isShutdown()) {
     9                 LOG.warn("Cannot register metric, because the MetricRegistry has already been shut down.");
    10             } else {
    11                 if (reporters != null) {
    12                     for (int i = 0; i < reporters.size(); i++) {
    13                         MetricReporter reporter = reporters.get(i);
    14                         try {
    15                             if (reporter != null) {
    16                                 FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
    17                                 reporter.notifyOfAddedMetric(metric, metricName, front);
    18                             }
    19                         } catch (Exception e) {
    20                             LOG.warn("Error while registering metric.", e);
    21                         }
    22                     }
    23                 }
    24                 try {
    25                     if (queryService != null) {
    26                         MetricQueryService.notifyOfAddedMetric(queryService, metric, metricName, group);
    27                     }
    28                 } catch (Exception e) {
    29                     LOG.warn("Error while registering metric.", e);
    30                 }
    31                 try {
    32                     if (metric instanceof View) {
    33                         if (viewUpdater == null) {
    34                             viewUpdater = new ViewUpdater(executor);
    35                         }
    36                         viewUpdater.notifyOfAddedView((View) metric);
    37                     }
    38                 } catch (Exception e) {
    39                     LOG.warn("Error while registering metric.", e);
    40                 }
    41             }
    42         }
    43     }
    44 
    45     @Override
    46     public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
    47         synchronized (lock) {
    48             if (isShutdown()) {
    49                 LOG.warn("Cannot unregister metric, because the MetricRegistry has already been shut down.");
    50             } else {
    51                 if (reporters != null) {
    52                     for (int i = 0; i < reporters.size(); i++) {
    53                         try {
    54                         MetricReporter reporter = reporters.get(i);
    55                             if (reporter != null) {
    56                                 FrontMetricGroup front = new FrontMetricGroup<AbstractMetricGroup<?>>(i, group);
    57                                 reporter.notifyOfRemovedMetric(metric, metricName, front);
    58                             }
    59                         } catch (Exception e) {
    60                             LOG.warn("Error while registering metric.", e);
    61                         }
    62                     }
    63                 }
    64                 try {
    65                     if (queryService != null) {
    66                         MetricQueryService.notifyOfRemovedMetric(queryService, metric);
    67                     }
    68                 } catch (Exception e) {
    69                     LOG.warn("Error while registering metric.", e);
    70                 }
    71                 try {
    72                     if (metric instanceof View) {
    73                         if (viewUpdater != null) {
    74                             viewUpdater.notifyOfRemovedView((View) metric);
    75                         }
    76                     }
    77                 } catch (Exception e) {
    78                     LOG.warn("Error while registering metric.", e);
    79                 }
    80             }
    81         }
    82     }

    register()方法和unregister()方法基本相似

    行7 获取同步锁. 锁对象不再是this,而是new Object().这样做,方便拓展第二个锁.

    行11~23 向所有下属的MetricReporter添加该Metric

    行24~30 向MetricQueryService添加该Metric

    MetricQueryService是个actor,它会将Metric序列化,然后写入到output stream

    行31~40 如果Metric实现了View接口,那么在viewUpdater中注册这个Metric

    Metric类实现View接口后,可以按设定时间间隔来更新这个Metric(由viewUpdater来执行update)

    MetricReporter

    MetricReporter用于把Metric导出到外部backend.

    外部backend的参数可在conf/flink-conf.yaml中设定.

    可同时设定多个外部backend.

    MetricReporter接口

     1 public interface MetricReporter {
     2 
     3     // ------------------------------------------------------------------------
     4     //  life cycle
     5     // ------------------------------------------------------------------------
     6 
     8     void open(MetricConfig config); //
     9 
    10     void close();
    11 
    12     // ------------------------------------------------------------------------
    13     //  adding / removing metrics
    14     // ------------------------------------------------------------------------
    15 
    16     void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group);
    17 
    18 
    19     void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group);
    20 }

    行8 配置这个Reporter.

    由于reporter的构造器是无参的,这个方法用于初始化reporter的域.  

    这个方法总是在对象构造后调用

    行10 关闭这个Reporter.

    应该在这个方法中关闭 channels,streams以及释放资源.

    行16,19 增删metrics

    常规的reporter类还需要实现Scheduled接口用于报告当前的measurements

    1 public interface Scheduled {
    2 
    3     void report();
    4 }

    行3 由metric registry定期地调用report()方法,来报告当前的measurements

  • 相关阅读:
    XML错误信息Referenced file contains errors (http://www.springframework.org/schema/beans/spring-beans-4.0.xsd). For more information, right click on the message in the Problems View ...
    Description Resource Path Location Type Cannot change version of project facet Dynamic Web Module to 2.3.
    maven创建web报错Cannot read lifecycle mapping metadata for artifact org.apache.maven.plugins:maven-compiler-plugin:maven-compiler-plugin:3.5.1:runtime Cause: error in opening zip file
    AJAX跨域
    JavaWeb学习总结(转载)
    JDBC学习笔记
    Java动态代理之JDK实现和CGlib实现
    (转)看懂UML类图
    spring boot配置使用fastjson
    python3下django连接mysql数据库
  • 原文地址:https://www.cnblogs.com/tuowang/p/9108095.html
Copyright © 2011-2022 走看看