zoukankan      html  css  js  c++  java
  • Flink实战(七十一):监控(三)自定义metrics相关指标(一)

    0 简介

    User-defined Metrics

    除了系统的 Metrics 之外,Flink 支持自定义 Metrics ,即 User-defined Metrics。上文说的都是系统框架方面,对于自己的业务逻辑也可以用 Metrics 来暴露一些指标,以便进行监控。

    User-defined Metrics 现在提及的都是 datastream 的 API,table、sql 可能需要 context 协助,但如果写 UDF,它们其实是大同小异的。

    Datastream 的 API 是继承 RichFunction ,继承 RichFunction 才可以有 Metrics 的接口。然后通过 RichFunction 会带来一个 getRuntimeContext().getMetricGroup().addGroup(…) 的方法,这里就是 User-defined Metrics 的入口。通过这种方式,可以自定义 user-defined Metric Group。如果想定义具体的 Metrics,同样需要用getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…) 方法,它会有相应的构造函数,可以定义到自己的 Metrics 类型中。

    继承 RichFunction
        •Register user-defined Metric Group: getRuntimeContext().getMetricGroup().addGroup(…)
        •Register user-defined Metric: getRuntimeContext().getMetricGroup().counter/gauge/meter/histogram(…)

    User-defined Metrics Example

    下面通过一段简单的例子说明如何使用 Metrics。比如,定义了一个 Counter 传一个 name,Counter 默认的类型是 single counter(Flink 内置的一个实现),可以对 Counter 进行 inc()操作,并在代码里面直接获取。

    Meter 也是这样,Flink 有一个内置的实现是 Meterview,因为 Meter 是多长时间内发生事件的记录,所以它是要有一个多长时间的窗口。平常用 Meter 时直接 markEvent(),相当于加一个事件不停地打点,最后用 getrate() 的方法直接把这一段时间发生的事件除一下给算出来。

    Gauge 就比较简单了,把当前的时间打出来,用 Lambda 表达式直接把 System::currentTimeMillis 打进去就可以,相当于每次调用的时候都会去真正调一下系统当天时间进行计算。

    Histogram 稍微复杂一点,Flink 中代码提供了两种实现,在此取一其中个实现,仍然需要一个窗口大小,更新的时候可以给它一个值。

    这些 Metrics 一般都不是线程安全的。如果想要用多线程,就需要加同步,更多详情请参考下面链接。

    •Counter processedCount = getRuntimeContext().getMetricGroup().counter("processed_count");
      processedCount.inc();
    •Meter processRate = getRuntimeContext().getMetricGroup().meter("rate", new MeterView(60));
      processRate.markEvent();
    •getRuntimeContext().getMetricGroup().gauge("current_timestamp", System::currentTimeMillis);
    •Histogram histogram = getRuntimeContext().getMetricGroup().histogram("histogram", new DescriptiveStatisticsHistogram(1000));
      histogram.update(1024);
    •[https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#metric-types]

    1 实例

    实例一:

    Counter:
    用与存储数值类型,比如统计数据输入、输出总数量。

    public class MyMapper extends RichMapFunction<String, String> {
      private transient Counter counter;
    
      @Override
      public void open(Configuration config) {
        this.counter = getRuntimeContext()
          .getMetricGroup()
          .counter("myCounter");
      }
    
      @Override
      public String map(String value) throws Exception {
        this.counter.inc();
        return value;
      }
    }

    实例二:

    Gauge:
    可以用来存储任何类型,前提要实现org.apache.flink.metrics.Gauge接口,重写getValue方法,如果返回类型为Object则该类需要重写toString方法。

    有些场景下,需要根据业务计算出指标,则Gauge使用起来更灵活。

    public class MyMapper extends RichMapFunction<String, String> {
      private transient int valueToExpose = 0;
    
      @Override
      public void open(Configuration config) {
        getRuntimeContext()
          .getMetricGroup()
          .gauge("MyGauge", new Gauge<Integer>() {
            @Override
            public Integer getValue() {
              return valueToExpose;
            }
          });
      }
    
      @Override
      public String map(String value) throws Exception {
        valueToExpose++;
        return value;
      }
    }

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13910798.html

  • 相关阅读:
    SourceInsight3.5中文乱码问题解决
    执行脚本程序出现gzip:stdin:not in gzip format的解决方法
    libjpeg实现arm板上yuv420p转jpg
    yuv420p转jpg linux(纯C语言实现)
    linux下交叉编译libjpeg库并移植到开发板上
    关于arm板上ifup eth0出现问题的解决方法
    关于windows与ubuntu两台主机传输文件的一个小软件
    数据结构之查找算法篇
    利用paramiko将服务器的文件批量匹配并下载到本地
    最近写了中药系统药理学数据库与分析平台的爬虫,可以交流 https://tcmspw.com/index.php
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13910798.html
Copyright © 2011-2022 走看看