前言
当集群规模在日益变大的时候,往往有的时候出现机器的老化,而这些“老化”的机器又会表现出一些奇怪的特征:“磁盘读写慢”、“网络数据传输慢”等。对于前者,曾经笔者写过一篇 Hadoop节点”慢磁盘”监控 的解决方案,当然社区目前已有更好的方案: HDFS-10959 ( Adding per disk IO statistics and metrics in DataNode )。而对于后者,我们同样需要有相应的监控方案,方便让我们这类异常的节点。此功能的实现于最近刚刚完成的 HDFS-11194 ( Maintain aggregated peer performance metrics on NameNode )。本文笔者来讲解讲解这个功能的设计思路以及它是如何做到对于“数据传输慢”节点的监控的。
HDFS“慢节点”监控的设计
HDFS“慢节点”监控分析功能的内部其实可以划分为2个部分。 第一个部分是监控数据的采集,这里监控数据对应的是网络数据传输的耗时。第二个则是监控数据的汇总处理。在这个过程中会进行一定的筛选比较,然后给出分析报告 。
与笔者之前提过的纯Metric统计方案有所不同,HDFS-11194在实现这个功能的时候,还定义了一个 SlowPeerReports 这样的对象,这个对象内部包含的数据就是“慢”节点的数据以及对应的耗时时间。先抛开这个slow report,如果是纯Metric的统计方案,有什么不好的地方呢?笔者认为有以下原因:
Metrics方便用户查阅,不方便用户获取其所包含的统计数据。
于是,在这里设计者定义来了SlowPeerReports对象老包装的这样的数据,然后作为心跳数据的一部分,发给NameNode。最终达到的目的是:用户可以通过简单的jmx接口就能获取这些慢节点的数据了。
下图是此功能的简单结构图。
!
图 1-1 HDFS”慢节点”监控功能结构图
HDFS”慢节点”监控功能的实现
数据的采集
正如上一部分所提到,这里的“慢”指的是“网络数据传输慢”。所以我们需要在HDFS数据传输的操作上做一个耗时统计,此处添加监控的位置在BlockReceiver的receivePacket方法。receivePacket方法的作用正如其名称所表示的意思:接收和处理数据包。代码如下:
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in);
...
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
// 记住开始时间
long begin = Time.monotonicNow();
// For testing. Normally no-op.
DataNodeFaultInjector.get().stopSendingPacketDownstream(mirrorAddr);
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
// 获取目前时间
long now = Time.monotonicNow();
setLastSentTime(now);
// 计算数据传输耗时
long duration = now - begin;
DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
mirrorAddr,
duration);
// 加入到metric统计中
trackSendPacketToLastNodeInPipeline(duration);
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
} catch (IOException e) {
handleMirrorOutError(e);
}
}
...
}
注意此方法监控的操作行为是发往Pipeline中最后一个节点数据包的耗时情况。我们进入trackSendPacketToLastNodeInPipeline方法内部,
private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
if (peerMetrics != null && isPenultimateNode) {
peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
}
}
这里的mirrorNameForMetrics指的是与当前DataNode通信的节点。
“慢”节点报告的生成与汇报
有了这些数据之后,我们需要把这些数据信息以心跳的信息报告给NameNode。所以这里需要更改NameNode与DataNode之间的心跳报告的协议,增加一类报告信息的定义。关于心跳协议的改造,笔者在之前的一篇文章DataNode生命线消息一文中也涉及到一些。
我们直接定位到相应的方法,
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
throws IOException {
...
final long now = monotonicNow();
scheduler.updateLastHeartbeatTime(now);
VolumeFailureSummary volumeFailureSummary = dn.getFSDataset()
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
// 判断此时是否已经到了发送慢节点报告的周期时间内
final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
// 利用metric统计值数据构造慢节点报告数据
final SlowPeerReports slowPeers =
slowPeersReportDue && dn.getPeerMetrics() != null ?
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
SlowPeerReports.EMPTY_REPORT;
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
// 将慢节点报告数据也加入到心跳信息中
slowPeers);
...
}
SlowPeerReports报告数据由 DataNodePeerMetrics().getOutliers()) 所产生,在getOutliers方法内部,会对收集到的数据做一层简单的过滤。
public Map<String, Double> getOutliers() {
// 从滑动窗口中获取部分采集数据
final Map<String, Double> stats =
sendPacketDownstreamRollingAvgerages.getStats(
MIN_OUTLIER_DETECTION_SAMPLES);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
// 对采集到的部分数据进行过滤出来
return slowNodeDetector.getOutliers(stats);
}
getOutliers方法如下:
public Map<String, Double> getOutliers(Map<String, Double> stats) {
...
// Compute the median absolute deviation of the aggregates.
final List<Double> sorted = new ArrayList<>(stats.values());
Collections.sort(sorted);
final Double median = computeMedian(sorted);
final Double mad = computeMad(sorted);
// 计算延时的上限值,如果收集的耗时时间比此值还要大的话,则为慢节点
Double upperLimitLatency = Math.max(
lowThresholdMs, median * MEDIAN_MULTIPLIER);
upperLimitLatency = Math.max(
upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
final Map<String, Double> slowNodes = new HashMap<>();
LOG.trace("getOutliers: List={}, MedianLatency={}, " +
"MedianAbsoluteDeviation={}, upperLimitLatency={}",
sorted, median, mad, upperLimitLatency);
// 根据延时的上限值,选出慢节点
for (Map.Entry<String, Double> entry : stats.entrySet()) {
if (entry.getValue() > upperLimitLatency) {
slowNodes.put(entry.getKey(), entry.getValue());
}
}
// 返回慢节点列表
return slowNodes;
}
“慢”节点数据的展示
当这些“慢”节点数据成功被发送给NameNode之后,我们就可以将其暴露给用户,使得用户能方便地拿到这个数据,比如说通过jmx接口就能直接看到这个数据了。
那么我们是否需要暴露出所有收集到的数据呢?因为每个DataNode内部都会有一份自己的“慢”节点数据,如果NameNode都将其进行暴露,那么这个信息量绝对是不小的。还有一个问题,延时的报告信息需不需要?因为心跳有的时候会出现延时到达的情况。所以作者在这里做了以下2点限制。
- 在对外生成慢节点报告时,对节点做数量的限制。
- 对报告信息做规定延时时间的验证。
然后我们继续上面的分析,报告信息到了NameNode这边,会在DatanodeManager的handleHeartbeat被处理。
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, final String blockPoolId,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
@Nonnull SlowPeerReports slowPeers) throws IOException {
...
// 慢报告数据信息的处理
if (slowPeerTracker != null) {
// 获取慢节点报告信息
final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
if (!slowPeersMap.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " +
slowPeersMap);
}
// 将慢节点信息加入到SlowPeerTracker
for (String slowNodeId : slowPeersMap.keySet()) {
// 同上带上报告节点的ipc地址,表明当前节点与目标慢节点出现慢通信的情况
slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false));
}
}
}
...
return new DatanodeCommand[0];
}
经过上述方法的处理之后,这些报告信息会被汇总到SlowPeerTracker中,被维护在了下面的映射图中:
// 组织关系为慢节点--><报告节点, 报告时间>
private final ConcurrentMap<String, ConcurrentMap<String, Long>> allReports;
addReport方法的处理逻辑如下:
public void addReport(String slowNode,
String reportingNode) {
// 获取总报告信息中对应慢节点名称的map对象
ConcurrentMap<String, Long> nodeEntries = allReports.get(slowNode);
if (nodeEntries == null) {
// putIfAbsent guards against multiple writers.
allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>());
nodeEntries = allReports.get(slowNode);
}
// 加入报告的节点,并更新报告时间
nodeEntries.put(reportingNode, timer.monotonicNow());
}
这里将时间更新的操作是为了过滤掉过期的报告数据。报告数据最终json字符串的方式来呈现。
private Collection<ReportForJson> getJsonReports(int numNodes) {
...
final PriorityQueue<ReportForJson> topNReports =
...
});
// 获取当前时间
final long now = timer.monotonicNow();
for (Map.Entry<String, ConcurrentMap<String, Long>> entry :
allReports.entrySet()) {
// 以当前时间算起,过滤掉落后时间比较多的报告数据,得到慢节点列表
SortedSet<String> validReports = filterNodeReports(
entry.getValue(), now);
// 加入慢节点报告数据到排好序的列表中
// 如果当前队列中的报告数量还没到需要返回的节点报告数时,则直接添加
if (!validReports.isEmpty()) {
if (topNReports.size() < numNodes) {
topNReports.add(new ReportForJson(entry.getKey(), validReports));
} else if (topNReports.peek().getReportingNodes().size() <
validReports.size()){
// 否则比较当前队列头部报告数据中的报告节点数,如果包含的节点数量小于当前报告中的节点数,则进行取代并移除
// Remove the lowest element
topNReports.poll();
topNReports.add(new ReportForJson(entry.getKey(), validReports));
}
}
}
return topNReports;
}
getJsonReports方法最终会被NameNode的getSlowPeersReport方法所调用。
参考资料
[1]. https://issues.apache.org/jira/browse/HDFS-11194
[2]. https://issues.apache.org/jira/browse/HDFS-10917
[3].https://issues.apache.org/jira/secure/attachment/12849166/HDFS-11194.06.patch