zoukankan      html  css  js  c++  java
  • HDFS“慢节点”监控分析功能

    前言


    当集群规模在日益变大的时候,往往有的时候出现机器的老化,而这些“老化”的机器又会表现出一些奇怪的特征:“磁盘读写慢”、“网络数据传输慢”等。对于前者,曾经笔者写过一篇 Hadoop节点”慢磁盘”监控 的解决方案,当然社区目前已有更好的方案: HDFS-10959 ( Adding per disk IO statistics and metrics in DataNode )。而对于后者,我们同样需要有相应的监控方案,方便让我们这类异常的节点。此功能的实现于最近刚刚完成的 HDFS-11194 ( Maintain aggregated peer performance metrics on NameNode )。本文笔者来讲解讲解这个功能的设计思路以及它是如何做到对于“数据传输慢”节点的监控的。

    HDFS“慢节点”监控的设计


    HDFS“慢节点”监控分析功能的内部其实可以划分为2个部分。 第一个部分是监控数据的采集,这里监控数据对应的是网络数据传输的耗时。第二个则是监控数据的汇总处理。在这个过程中会进行一定的筛选比较,然后给出分析报告 。

    与笔者之前提过的纯Metric统计方案有所不同,HDFS-11194在实现这个功能的时候,还定义了一个 SlowPeerReports 这样的对象,这个对象内部包含的数据就是“慢”节点的数据以及对应的耗时时间。先抛开这个slow report,如果是纯Metric的统计方案,有什么不好的地方呢?笔者认为有以下原因:

    Metrics方便用户查阅,不方便用户获取其所包含的统计数据。

    于是,在这里设计者定义来了SlowPeerReports对象老包装的这样的数据,然后作为心跳数据的一部分,发给NameNode。最终达到的目的是:用户可以通过简单的jmx接口就能获取这些慢节点的数据了。

    下图是此功能的简单结构图。



    !
    图 1-1 HDFS”慢节点”监控功能结构图

    HDFS”慢节点”监控功能的实现


    数据的采集


    正如上一部分所提到,这里的“慢”指的是“网络数据传输慢”。所以我们需要在HDFS数据传输的操作上做一个耗时统计,此处添加监控的位置在BlockReceiver的receivePacket方法。receivePacket方法的作用正如其名称所表示的意思:接收和处理数据包。代码如下:

    private int receivePacket() throws IOException {
        // read the next packet
        packetReceiver.receiveNextPacket(in);
        ...
        //First write the packet to the mirror:
        if (mirrorOut != null && !mirrorError) {
          try {
            // 记住开始时间
            long begin = Time.monotonicNow();
            // For testing. Normally no-op.
            DataNodeFaultInjector.get().stopSendingPacketDownstream(mirrorAddr);
            packetReceiver.mirrorPacketTo(mirrorOut);
            mirrorOut.flush();
            // 获取目前时间
            long now = Time.monotonicNow();
            setLastSentTime(now);
            // 计算数据传输耗时
            long duration = now - begin;
            DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
                mirrorAddr,
                duration);
            // 加入到metric统计中
            trackSendPacketToLastNodeInPipeline(duration);
            if (duration > datanodeSlowLogThresholdMs) {
              LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
                  + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
            }
          } catch (IOException e) {
            handleMirrorOutError(e);
          }
        }
        ...
    }

    注意此方法监控的操作行为是发往Pipeline中最后一个节点数据包的耗时情况。我们进入trackSendPacketToLastNodeInPipeline方法内部,

    private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
        final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
        if (peerMetrics != null && isPenultimateNode) {
          peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
        }
      }

    这里的mirrorNameForMetrics指的是与当前DataNode通信的节点。

    “慢”节点报告的生成与汇报


    有了这些数据之后,我们需要把这些数据信息以心跳的信息报告给NameNode。所以这里需要更改NameNode与DataNode之间的心跳报告的协议,增加一类报告信息的定义。关于心跳协议的改造,笔者在之前的一篇文章DataNode生命线消息一文中也涉及到一些。

    我们直接定位到相应的方法,

    HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
          throws IOException {
        ...
        final long now = monotonicNow();
        scheduler.updateLastHeartbeatTime(now);
        VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
            .getVolumeFailureSummary();
        int numFailedVolumes = volumeFailureSummary != null ?
            volumeFailureSummary.getFailedStorageLocations().length : 0;
        // 判断此时是否已经到了发送慢节点报告的周期时间内
        final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
        // 利用metric统计值数据构造慢节点报告数据
        final SlowPeerReports slowPeers =
            slowPeersReportDue && dn.getPeerMetrics() != null ?
                SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
                SlowPeerReports.EMPTY_REPORT;
        HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
            reports,
            dn.getFSDataset().getCacheCapacity(),
            dn.getFSDataset().getCacheUsed(),
            dn.getXmitsInProgress(),
            dn.getXceiverCount(),
            numFailedVolumes,
            volumeFailureSummary,
            requestBlockReportLease,
            // 将慢节点报告数据也加入到心跳信息中
            slowPeers);
        ...
      }

    SlowPeerReports报告数据由 DataNodePeerMetrics().getOutliers()) 所产生,在getOutliers方法内部,会对收集到的数据做一层简单的过滤。

    public Map<String, Double> getOutliers() {
        // 从滑动窗口中获取部分采集数据
        final Map<String, Double> stats =
            sendPacketDownstreamRollingAvgerages.getStats(
                MIN_OUTLIER_DETECTION_SAMPLES);
        LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
        // 对采集到的部分数据进行过滤出来
        return slowNodeDetector.getOutliers(stats);
      }

    getOutliers方法如下:

    public Map<String, Double> getOutliers(Map<String, Double> stats) {
        ...
        // Compute the median absolute deviation of the aggregates.
        final List<Double> sorted = new ArrayList<>(stats.values());
        Collections.sort(sorted);
        final Double median = computeMedian(sorted);
        final Double mad = computeMad(sorted);
        // 计算延时的上限值,如果收集的耗时时间比此值还要大的话,则为慢节点
        Double upperLimitLatency = Math.max(
            lowThresholdMs, median * MEDIAN_MULTIPLIER);
        upperLimitLatency = Math.max(
            upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
    
        final Map<String, Double> slowNodes = new HashMap<>();
    
        LOG.trace("getOutliers: List={}, MedianLatency={}, " +
            "MedianAbsoluteDeviation={}, upperLimitLatency={}",
            sorted, median, mad, upperLimitLatency);
    
        // 根据延时的上限值,选出慢节点
        for (Map.Entry<String, Double> entry : stats.entrySet()) {
          if (entry.getValue() > upperLimitLatency) {
            slowNodes.put(entry.getKey(), entry.getValue());
          }
        }
        // 返回慢节点列表
        return slowNodes;
      }

    “慢”节点数据的展示


    当这些“慢”节点数据成功被发送给NameNode之后,我们就可以将其暴露给用户,使得用户能方便地拿到这个数据,比如说通过jmx接口就能直接看到这个数据了。

    那么我们是否需要暴露出所有收集到的数据呢?因为每个DataNode内部都会有一份自己的“慢”节点数据,如果NameNode都将其进行暴露,那么这个信息量绝对是不小的。还有一个问题,延时的报告信息需不需要?因为心跳有的时候会出现延时到达的情况。所以作者在这里做了以下2点限制。

    • 在对外生成慢节点报告时,对节点做数量的限制。
    • 对报告信息做规定延时时间的验证。

    然后我们继续上面的分析,报告信息到了NameNode这边,会在DatanodeManager的handleHeartbeat被处理。

    public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
          StorageReport[] reports, final String blockPoolId,
          long cacheCapacity, long cacheUsed, int xceiverCount, 
          int maxTransfers, int failedVolumes,
          VolumeFailureSummary volumeFailureSummary,
          @Nonnull SlowPeerReports slowPeers) throws IOException {
        ...
        // 慢报告数据信息的处理
        if (slowPeerTracker != null) {
          // 获取慢节点报告信息
          final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
          if (!slowPeersMap.isEmpty()) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
                  slowPeersMap);
            }
            // 将慢节点信息加入到SlowPeerTracker
            for (String slowNodeId : slowPeersMap.keySet()) {
              // 同上带上报告节点的ipc地址,表明当前节点与目标慢节点出现慢通信的情况
              slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
            }
          }
        }
        ...
        return new DatanodeCommand[0];
      }

    经过上述方法的处理之后,这些报告信息会被汇总到SlowPeerTracker中,被维护在了下面的映射图中:

    // 组织关系为慢节点--><报告节点, 报告时间>
      private final ConcurrentMap<String, ConcurrentMap<String, Long>> allReports;

    addReport方法的处理逻辑如下:

    public void addReport(String slowNode,
                            String reportingNode) {
        // 获取总报告信息中对应慢节点名称的map对象
        ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
    
        if (nodeEntries == null) {
          // putIfAbsent guards against multiple writers.
          allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>());
          nodeEntries = allReports.get(slowNode);
        }
        // 加入报告的节点,并更新报告时间
        nodeEntries.put(reportingNode, timer.monotonicNow());
      }

    这里将时间更新的操作是为了过滤掉过期的报告数据。报告数据最终json字符串的方式来呈现。

    private Collection<ReportForJson> getJsonReports(int numNodes) {
        ...
    
        final PriorityQueue<ReportForJson> topNReports =
            ...
            });
        // 获取当前时间
        final long now = timer.monotonicNow();
    
        for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
            allReports.entrySet()) {
          // 以当前时间算起,过滤掉落后时间比较多的报告数据,得到慢节点列表
          SortedSet<String> validReports = filterNodeReports(
              entry.getValue(), now);
          // 加入慢节点报告数据到排好序的列表中
          // 如果当前队列中的报告数量还没到需要返回的节点报告数时,则直接添加
          if (!validReports.isEmpty()) {
            if (topNReports.size() < numNodes) {
              topNReports.add(new ReportForJson(entry.getKey(), validReports));
            } else if (topNReports.peek().getReportingNodes().size() <
                validReports.size()){
              // 否则比较当前队列头部报告数据中的报告节点数,如果包含的节点数量小于当前报告中的节点数,则进行取代并移除
              // Remove the lowest element
              topNReports.poll();
              topNReports.add(new ReportForJson(entry.getKey(), validReports));
            }
          }
        }
        return topNReports;
      }

    getJsonReports方法最终会被NameNode的getSlowPeersReport方法所调用。

    参考资料


    [1]. https://issues.apache.org/jira/browse/HDFS-11194
    [2]. https://issues.apache.org/jira/browse/HDFS-10917
    [3].https://issues.apache.org/jira/secure/attachment/12849166/HDFS-11194.06.patch

  • 相关阅读:
    02_5if switch分支与循环语句
    02_4运算符
    02_3程序格式
    Shell脚本调用ftp上传文件
    02_2数据类型转换
    02_1标识符_关键字_数据类型
    01_3Java Application初步
    01_2Java开发环境的下载 安装 配置
    Mac 安装MySQL
    用 Homebrew 带飞你的 Mac
  • 原文地址:https://www.cnblogs.com/bianqi/p/12183692.html
Copyright © 2011-2022 走看看