zoukankan      html  css  js  c++  java
  • 深入Ambari Metrics 机制分析

    0.简介

      Ambari作为一款针对大数据平台的运维管理工具,提供了集群的创建,管理,监控,升级等多项功能,目前在业界已经得到广泛使用。

    Ambari指标系统( Ambari Metrics System,以下简称AMS)主要负责监控平台各类服务及主机的运行情况,提供各类服务及主机的相关指标,从而达到判断集群健康情况的目的,其重要性不言而喻。 

    本文是在深入阅读AMS源代码的基础之上,力求能够从监控指标的采集、存储、聚合及指标获取4个层面详细阐述AMS的整个工作机制。 


    这里写图片描述 
    图 1 AMS架构图 

    1.AMS指标采集

      对于 AMS 本身来说,涉及的主要模块有 Metrics Monitor、Hadoop Sinks(此处统称,其中还包含kafka,storm,flume等服务的sinks,严格地来说应叫service sinks) 以及 Metrics Collector。

    AMS 也是一个 Master-Slave 结构的框架。Master 模块便是 Metrics Collector,Slave 则是 Metrics Monitor 和 Hadoop Sinks。Slave 模块负责收集信息,并发送给 Collector。

    当然 Metrics Monitor 和 Hadoop Sinks 也有不同的职责,前者主要负责收集机器本身相关的指标,例如 CPU、Mem、Disk 相关信息;后者则负责收集 Hadoop 相关 Service 模块的性能数据,例如该模块Namenode占用了多少 Mem,以及该模块的 CPU 占用率等。

    1.1 指标收集

      关于指标的采集,此处以Flume服务为例,剖析AMS是如何采集Flume运行的相应指标的。Ambari内置了FlumeTimelineMetricsSink这样的jar包,通过Ambari启动flume服务,ambari会在flume的启动脚本参数中加入以下两项:

    -Dflume.monitoring.type=org.apache.hadoop.metrics2.sink.flume.FlumeTimelineMetricsSink 
    -Dflume.monitoring.node=<AMS_HOST>:6188

       其中即为AMS collector的节点名字,而6188则是collector中的Timeline Server对外提供的默认端口,以此来向Timeline Server推送数据。 
       接下来再看一下FlumeTimelineMetricsSink jar包的结构,其中就包含一个FlumeTimelineMetricsSink类,继承自AbstractTimelineMetricsSink抽象类并实现MetricsSink接口,如上所示的所有服务的sink包基本都采用这样的结构。 
    FlumeTimelineMetricsSink类中内置一个TimelineMetricsCollector线程,在flume启动FlumeTimelineMetricsSink jar包时,其就通过其start方法中的线程调度器来轮询调度TimelineMetricsCollector线程,而在此线程中主段代码如下所示。

    @Override
      public void start() {
        LOG.info("Starting Flume Metrics Sink");
        TimelineMetricsCollector timelineMetricsCollector = new TimelineMetricsCollector();
        if (scheduledExecutorService == null || scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated()) {
          scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        }
        scheduledExecutorService.scheduleWithFixedDelay(timelineMetricsCollector, 0,
            pollFrequency, TimeUnit.MILLISECONDS);
      }  }

      从上面可看出Start方法中采取线程池的方法,以pollFrequency(可配置)的周期间隔,调度TimelineMetricsCollector线程,再细看一下TimelineMetricsCollector线程的详细说明,其主要代码如下所示。

    class TimelineMetricsCollector implements Runnable {
        @Override
        public void run() {
          try {
            Map<String, Map<String, String>> metricsMap = JMXPollUtil.getAllMBeans();
            long currentTimeMillis = System.currentTimeMillis();
            for (String component : metricsMap.keySet()) {
              Map<String, String> attributeMap = metricsMap.get(component);
              LOG.debug("Attributes for component " + component);
              processComponentAttributes(currentTimeMillis, component, attributeMap);
            }
          } 

       TimelineMetricsCollector线程轮循从服务的JMX端口中获取指标数据,形成Map对象,并通过processComponentAttributes方法进行逻辑转换后再发送。

    1.2 指标推送

       由上面源码可以看出,本质上,AMS的监控数据还是从各服务JMX端口中取得的,再通过processComponentAttributes方法逻辑上转换成AMS的内部的TimelineMetrics,通过emitMetrics方法post到Timeline Server(emitMetrics方法正是从AbstractTimelineMetricsSink类继承而来),其接口为: 
    http://:6188/ws/v1/timeline/metrics 
    如下是emitMetrics方法的部分片段,从中可以看出,emit方法最终还是将指标数据转化成json格式的数据,通过接口推送至TimelineServer。

    protected void emitMetrics(TimelineMetrics metrics) throws IOException {
        String connectUrl = getCollectorUri();
        try {
          String jsonData = mapper.writeValueAsString(metrics);
          StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
          PostMethod postMethod = new PostMethod(connectUrl);
          postMethod.setRequestEntity(requestEntity);
          int statusCode = httpClient.executeMethod(postMethod);

      若是转换成curl命令的形式,则通过以下这样一条命令进行推送数据:

    curl -i -X POST -H "Content-Type: application/json" -d "${json}" ${url}

       其中json为转化成json的metrics数据,url为上面接口。 
    emitMetrics方法或curl命令发送的url最终会被Timeline server所截获,再通过TimelineMetricStore类以phonenix接口方式存储到hbase数据库中,如下文TimelineWebServices类代码所示。

    @Path("/metrics")
    @POST
    @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
      public TimelinePutResponse postMetrics(
        @Context HttpServletRequest req,
        @Context HttpServletResponse res,
        TimelineMetrics metrics) {
        init(res);
        if (metrics == null) {
          return new TimelinePutResponse();
        }
        try {
          // TODO: Check ACLs for MetricEntity using the TimelineACLManager.
          // TODO: Save owner of the MetricEntity.
          return timelineMetricStore.putMetrics(metrics);
    }

      上文描述了Flume服务指标推送的大概过程,服务运行时主动推送指标, AMS接收推送指标。其它各类服务如hadoop,kafka,storm均以此种方式进行指标的推送,在此不再作详细讨论。

    2.AMS指标存储

      AMS采集到的服务指标通过http/https的方式推送到timeline server,timeline server内部嵌入了一个hbase,通过phoenix(nosql之上的sql查询)将指标数据存入到hbase中。 
       Hbase库中总共有7张表,其相应表名如下表所示。 
      


    表2-1 Metrics指标存储表 
    这里写图片描述 


      虽然库中一共有7张表,但是实际存储指标数据的只有METRIC_RECORD表,其它各表是在其基础之上做相应的统计及聚合而成的,下表是METRIC_RECORD表详细说明。 
      


    表2-2 METRIC_RECORD表字段说明 
    这里写图片描述 


      该表是所有表中唯一存储实际metrics数据的表,其它表都是在此表的基础之上进行时间段的相应统计。 
      (1)针对采集的hosts指标,即由monitor发送的指标值 
       采集的metric记录,由{时间戳1:值,时间戳2:值,….}这样的记录组成,其中数目表现在Metric_count上,对于monitor发送的metric。为12条,每条间隔5秒种,然后一分钟向timelineServer发送一次,存入表中。 
      (2)针对采集的hadoop sink指标 
       采集的metric记录,由{时间戳1:值,时间戳2:值,….}这样的记录组成,每条间隔10秒钟,每隔70秒发送一次,采集7条,所以metric_count为7,一分钟向timelineServer发送一次,存入表中。 
      表METRIC_RECORD_MINUTE是按分钟进行统计的,默认一次统计时间是5min(可配置),该表实则是以METRIC_RECORD表的数据作为统计的基准。下表对METRIC_RECORD_MINUTE做了详细说明。 
      


    表2-3 METRIC_RECORD_MINUTE字段说明 
    这里写图片描述 


       假设5分钟统计一次,以mem_free为例,则本次统计是以主机为单位,假设在metric_record表中,某主机每隔一分钟发送一条mem_free的Record,一条record中有12条metric values,则本次统计共有5条Record,metric_count则为60条。同样的,这五分钟内的最大,最小和总和,只需要比对提取Metric_record中这60条的Record的最大,最小,以及5条总和即能统计出这5分钟内相应的属性。 
      类似于这样几条语句得以统计:

    1select hostname,max(metric_max) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname;------统计5分钟内,每台主机上该metric的最大值。
      (2select hostname,min(metric_min) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------统计5分钟内,每台主机上该metric的最小值。
      (3select hostname,sum(metric_sum) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------统计5分钟内,每台主机上该metric值总和。
      (4select hostname,sum(metric_count) from metric_record where metric_name='mem_free' and server_time>=1471503845324 and server_time<1471504146520 group by hostname; ------统计5分钟内,每台主机上统计的该metric数量和。

      至于METRIC_RECORD_HOURLY以及METRIC_RECORD_DAILY表其原理均是参照MINUTE表的原理,只是时间区间扩大了,已经参照的数据表变更了,METRIC_RECORD_HOURLY以METRIC_RECORD_MINUTE的数据为基准,而METRIC_RECORD_DAILY则以METRIC_RECORD_HOURLY的数据为基准进行统计,在此就不再描述了。

    3.AMS指标聚集

       上文中所统计的7张表,除了以METRIC_RECORD前缀的表之外,还有METRIC_AGGREGATE作为前缀的表,这就是集群的指标聚集表,在聚集表中不区分host,只是以service(APP_ID) 
    进行分组统计,其数据来源也是从METRIC_RECORD表中进行查询后然后再进行聚集的,下表是表字段的详细说明。 


    表3-1 METRIC_AGGREGATE表字段详细说明 
    这里写图片描述 


       以Metric_Name为主要指标,由于是集群级别的统计,所以不再有HOSTNAME相关字段的说明,在此表中增加了HOSTS_COUNT的字段,即聚集的Metric来自主机的数量。 
      实际中不存在表METRIC_AGGREGATE_MINUTE,但是在图3-1中可以看到一个单独的TimelineMetricClusterAggregatorMinute聚集线程类,每2分钟聚集一次,其聚集结果采取了分片的方式,30秒一个片区记录,即0-30秒,30-60秒,60-90秒,90-120秒分成4个片区,每个时间段采集的记录分别存到对应的片区记录中,而各个片区记录直接存入到METRIC_AGGREGATE表中,该表间隔记录为30秒一条,是在表METRIC_AGGREGATE_MINUTE的基础之上进行分片的,之所以这么设定,也是为了防止频繁的聚集对AMS造成过大负载。 
      这个分片时间是由参数timeline.metrics.cluster.aggregator.minute.timeslice.interval进行设定的,所以METRIC_AGGREGATE表其实是METRIC_AGGREGATE_MINUTE表聚合统计的结果之上,再对其结果按时间进行分片而形成的。 
      指标聚集由TimelineMetricAggregator起始,其作为一个接口,也是一个线程,其实现子类如下图所示。 
      


      这里写图片描述 
    图3-1 TimelineMetricAggregator层次图 


       在AbstractTimelineAggregator类中实现了run方法,其中通过轮循调度doWork方法来实现聚集,调度时间可配置,如下是doWork程序代码。

    @Override
      public boolean doWork(long startTime, long endTime) {
        boolean success = true;
        Condition condition = prepareMetricQueryCondition(startTime, endTime);
        Connection conn = null;
        PreparedStatement stmt = null;
        ResultSet rs = null;
        try {
          conn = hBaseAccessor.getConnection();
          stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
          if (condition.doUpdate()) {
            int rows = stmt.executeUpdate();
            conn.commit();
          } else {
            rs = stmt.executeQuery();
          }
          aggregate(rs, startTime, endTime);
          }

      通过以上代码可以看到,在doWork方法中,首先还是通过phoenix接口查询到数据集,再对数据集进行聚集(aggregate方法中),针对不同的AbstractTimelineAggregator的子类,具有不同的aggregate方法,最终存入到METRIC_AGGREGATE表中,如此实现整个AMS的指标聚集功能。

    4.AMS指标获取

      AMS提供了2种获取指标的接口,分别是Collector提供的API以及Ambari Server的API接口。其中前一种方式更接近原生的指标数据,而后一种方式更为常用,应该说整个Ambari上层获取指标的方式都是采取后者,而后者在底层本质上还是调用的第一种方式,拿到库中的原生数据,再进行加工及逻辑处理,最后返回到WEB端。

    4.1 Collector API

    http://<AMS_HOST>:6188/ws/v1/timeline/metrics?metricNames=<>&hostname=<>&appId=<>&startTime=<>&endTime=<>&precision=<>

       如上是AMS Collector的总体API,其参数说明如下表所示。 
      


    表4-1 Collector API参数说明 
    这里写图片描述 


      当以此接口获取指标数据时,首先此URL会被TimelineWebServices捕获到,其类中相应代码如下所示。

    @GET
    @Path("/metrics")
    @Produces({ MediaType.APPLICATION_JSON })
      public TimelineMetrics getTimelineMetrics(
        @Context HttpServletRequest req,
        @Context HttpServletResponse res,
        @QueryParam("metricNames") String metricNames,
        @QueryParam("appId") String appId,
        @QueryParam("instanceId") String instanceId,
        @QueryParam("hostname") String hostname,
        @QueryParam("startTime") String startTime,
        @QueryParam("endTime") String endTime,
        @QueryParam("precision") String precision,
        @QueryParam("limit") String limit,
        @QueryParam("grouped") String grouped,
        @QueryParam("topN") String topN,
        @QueryParam("topNFunction") String topNFunction,
        @QueryParam("isBottomN") String isBottomN,
        @QueryParam("seriesAggregateFunction") String seriesAggregateFunction
     ) {
        init(res);
        try {
          return timelineMetricStore.getTimelineMetrics(
            parseListStr(metricNames, ","), parseListStr(hostname, ","), appId, instanceId,
            parseLongStr(startTime), parseLongStr(endTime),
            Precision.getPrecision(precision), parseIntStr(limit),
            parseBoolean(grouped), parseTopNConfig(topN, topNFunction, isBottomN),
            seriesAggregateFunction);
    }

       如上代码所示,在Timeline server捕获到请求后,会调用TimelineMetricStore.getTimelineMetrics方法,并传入相应的请求参数,获取指标数据。再深入到TimelineMetricStore中可以看到此类为一个接口,唯一的实现子类为HBaseTimelineMetricStore,通过其getTimelineMetrics方法取得指标数据,其主要代码如下所示。

    @Override
      public TimelineMetrics getTimelineMetrics(List<String> metricNames,
          List<String> hostnames, String applicationId, String instanceId,
          Long startTime, Long endTime, Precision precision, Integer limit,
          boolean groupedByHosts, TopNConfig topNConfig, String seriesAggregateFunction) throws SQLException, IOException {
        TimelineMetricsSeriesAggregateFunction seriesAggrFunctionInstance = null;
        ……………………………………….
        Multimap<String, List<Function>> metricFunctions =
          parseMetricNamesToAggregationFunctions(metricNames);
        ConditionBuilder conditionBuilder = new ConditionBuilder(new ArrayList<String>(metricFunctions.keySet()))
          .hostnames(hostnames)
          .appId(applicationId)
          .instanceId(instanceId)
          .startTime(startTime)
          .endTime(endTime)
          .precision(precision)
          .limit(limit)
          .grouped(groupedByHosts);
    ………………………………………….
    ………………………………………
        Condition condition = conditionBuilder.build();
        TimelineMetrics metrics;
        if (hostnames == null || hostnames.isEmpty()) {
          metrics = hBaseAccessor.getAggregateMetricRecords(condition, metricFunctions);
        } else {
          metrics = hBaseAccessor.getMetricRecords(condition, metricFunctions);
        }
        metrics = postProcessMetrics(metrics);
        ………………………………..
        return seriesAggregateMetrics(seriesAggrFunctionInstance, metrics);
      }
    
    

       从上文代码中也可以看出,HBaseTimelineMetricStore类在其内部通过注解映射查询的条件构建用于查询的Condition对象,其次向HBaseAccessor传入此条件用于查询,根据查询参数中是否有无hostname从而决定是查询聚集表还是主机表,最终取得相应的查询结果,其中代码在此不再详述,感兴趣可以自行阅读,至此,通过Collector API取得指标数据的流程就打通了。

    4.2 Ambari Server API

      Ambari Server API总体上分为3个层次,具有3种类型的API,无论是哪种类型的API,在其底层取数据时,最终都是利用Collector的API,如下是整体的架构详细图。 


      这里写图片描述 
    图4-1 AMS工作机制图 

    4.2.1 主机类型指标API

    http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/hosts/<host-name>?fields=metrics/cpu/cpu_user[1430844610,1430848210,15]

       此API是主机类型的API,在其中具有/hosts/的前缀和参数的补充,fileds后面跟的是指标名称,[]内描述的是时间起始。上文中说过,Server的API在其底层还是调用Collector的API,那么此API在其对应的底层便是Collector的API加上hostname及appId可选参数,其中appId设为HOST即可,其后的时间戳便是在startTime和endTime中描述。

    4.2.2 组件类型指标API

    http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/services/HDFS/components/DATANODE?fields=metrics/dfs/datanode/DfsUsed[1430844610,1430848210,15]

       组件类型的API取消了hostname的参数,其主要针对服务整体的聚集查询,其所查询的表也是METRIC_AGGREGATE类型的表,对应于Collector的API则是在其中取消了hostname的参数字段,则API默认去聚集表(METRIC_AGGREGATE)中查询。

    4.2.3 主机组件类型指标API

    http://<ambari-server>:8080/api/v1/clusters/<cluster-name>/hosts/<host-name>/host_components/NAMENODE?fields=metrics/jvm/memHeapCommittedM[1430847303,1430850903,15]
    • 1

      主机组件类型的API实则和主机类型的API类似,只是主机类型直接是针对主机相关指标如cpu,mem类型的指标的获取,而主机组件则是真多host之上的部署的服务的指标的获取,所以在其API中新增了host_components参数。对应于底层Collector的API,只需要将主机类型对应的Collector的API中的appId由HOST替换成相应的服务名称即可。 
    以上3种类型的API,无论哪一种,在其底层最终都是调用Collector API取得metric数据的,主机类型指标则是在Collector API中补充上hostname属性,组件指标则是在Collector API去掉hostname属性,让其做聚集查询(去METRIC_AGGREGATE查询数据),而主机组件指标类型则是同时补充上appId与hostname两个参数进行查询,下图描述了通过Server API获取metric数据时主要实现类及其继承结构。 


    这里写图片描述 
    图 4-2 AMSPropertyProvider类层次图 


      当向Server发送Metric的请求URL时,最终都会在Server端通过AMSPropertyProvider转化成一个MetricRequest,MetricRequest是AMSPropertyProvider的内部类,再由MetricRequest.populateResources方法进行请求的处理,下文是其主要核心代码。

    public Collection<Resource> populateResources() throws SystemException {
    if (!hostComponentHostMetrics.isEmpty()) {
            String hostComponentHostMetricParams = getSetString(processRegexps(hostComponentHostMetrics), -1);
             setQueryParams(hostComponentHostMetricParams, hostnames, true, componentName);
              TimelineMetrics metricsResponse = null;
              try {
                metricsResponse = getTimelineMetricsFromCache(
                  getTimelineAppMetricCacheKey(hostComponentHostMetrics,
                    componentName, uriBuilder.toString(),hostnames), componentName);
              } 

       此方法首先针对metric的请求类型做了判断,针对不同的指标分别进行不同的处理。此处便是hostComponentHostMetrics散列表不为空的情况下(即此为主机指标的请求),针对此请求,首先利用setQueryParams查询参数设置,其次执行getTimelineMetricsFromCache取得返回的metrics,在其中传入了uriBuilder,此对象即为Collector的API,构建完成之后即通过此URL取得相应的metrics数据。 
       接下来直接进入getTimelineMetricsFromCache方法,其代码如下所示。

    private TimelineMetrics getTimelineMetricsFromCache(TimelineAppMetricCacheKey metricCacheKey,String componentName) throws IOException {
          // Cache only the component level metrics
          // No point in time metrics are cached
          if (metricCache != null
              && !StringUtils.isEmpty(componentName)
              && !componentName.equalsIgnoreCase("HOST")
              && metricCacheKey.getTemporalInfo() != null) {
            return metricCache.getAppTimelineMetricsFromCache(metricCacheKey);
          }
          return requestHelper.fetchTimelineMetrics(metricCacheKey.getSpec());
        }

       从代码中看到,其针对有无AMS缓存的情况分别进行了处理,在原始无缓存的情况下,则是通过requestHelper.fetchTimelineMetrics的方法取得相应的metric数据,而传入的参数实则就是上文的URL。 
      紧接着进入fetchTimelineMetrics方法,从下文代码可以看到,最终是通过streamProvider.readFrom(spec)此方法取得metrics数据的,而streamProvider则是一个封装好的URL读取数据对象。

    public TimelineMetrics fetchTimelineMetrics(String spec) throws IOException {
        LOG.debug("Metrics request url = " + spec);
        BufferedReader reader = null;
        TimelineMetrics timelineMetrics = null;
        try {
          reader = new BufferedReader(new InputStreamReader(streamProvider.readFrom(spec)));
          timelineMetrics = timelineObjectReader.readValue(reader);
    }

    5.总结

      本文主要是从AMS指标的采集、存储、聚集、获取四个层面,详细描述了AMS整个内在工作机制,并详细整理了AMS对外各API不同点及其来龙去脉。由于源码的复杂性,本文只罗列相关的主要功能代码及其相应的流程,并未细化到最底层的实现,不足之处,敬请见谅。同时,也期待您的指导与帮助。

    参考资料:

    1.http://www.ibm.com/developerworks/cn/opensource/os-cn-ambari-metrics/index.html 
    2.http://blog.csdn.net/bluishglc/article/details/48155265

  • 相关阅读:
    关于游戏分布式或者多服管理的想法
    surfaceView
    ackerman递归
    netbeans环境的建立
    copy-浅及深的复制操作
    使用VMware安装CentOS6.8详细教程
    Python在线资源优先级排序
    Python导入模块,Python import用法
    编码
    Python清屏命令
  • 原文地址:https://www.cnblogs.com/felixzh/p/9675705.html
Copyright © 2011-2022 走看看