前言
最近一段时间笔者在工作过程中遇到了HDFS块Topology不准确导致的一系列潜在问题,需要重新更新DataNode的Topology位置,使其上的块数据保持有高可用性。以下是笔者对此想到的一些操作方案和需要注意的点,或许对同领域的小伙伴有所帮助。
DataNode/块 Topology不准确问题
这里我们说的Topology位置并不是指的是物理位置的概念,而是逻辑上的。意思是说在逻辑空间上,我们将各个DataNode进行位置划分。在HDFS的Topology中,它是一树型结构,Switch和Router是其中的父亲节点,叶子节点代表的是实际的DN节点。
在HDFS中,Topology的一个关键作用是用来保证数据的rack awareness属性的。通俗地理解,就是数据副本需要分布在不同的rack下,以此防止同一个rack崩溃导致数据不可用的情况。因此从这里,我们可以得出下面一个Topology位置和物理真实位置的一个关系:
不同逻辑rack位置对应的物理rack位置必然是物理隔离的,不同物理位置rack的节点它们的逻辑rack可能是相同的。
本小节标题所述的Topology不准确问题指的就是逻辑rack位置对应实际物理rack位置不完全隔离的问题。这种情况往往发生在集群管理员在初始设定集群Topology位置划分时没有考虑完全。当然,最简单的方案就是完全按照真实物理rack划分,逻辑rack名就按照物理rack名来。
数据Topology位置不准确不仅仅会带来数据丢失的可能性,还有其它方面的影响。比如当我们利用逻辑位置来减少机房跨楼层间的带宽使用,我们只保证3副本中的1个副本在不同的楼层,2个在同个楼层。如果此时按照楼层所划分的大location位置不准确的话,就可能引起大量的楼层间的数据传输。
因此笔者这里想重点强调的是,数据的placement在实际的运用场景中是十分重要的。
HDFS的块位置重分布
当我们发现自己的HDFS集群数据的Topology位置违背了rack awareness的原则时,我们需要尽快的纠正它,以免其在后续引发一系列的问题。
这里笔者简单聊聊在HDFS中,有哪些方式可以让block块按照新的Topology规则进行分布。
依赖现成Balancer工具做块迁移
首先是第一种方法,利用现有Balancer移动块的逻辑进行块的重分布。HDFS Balancer工具在除了用于正常的块数据平衡之外,它也有一定的数据打散重分布的作用。在数据重新move的过程中 ,Balancer会根据当前最新的Topology位置来进行placement policy的满足。于是乎,旧的Topology位置块就能够被挪至新Topology下了。
但是过度依赖于Balancer的块迁移动作往往覆盖不全所有的块,因为Balancer受限制于节点的存储使用空间进行源,目标块的选择。换句话说,如果一些节点存储使用空间都在平均值阈值附近,它上面的块就会很少得到被移动的机会了。
NameNode的failover行为触发块的重分布
HDFS在每次切换Active服务时,会做一次初始全量块的检查,里面检查的步骤包含以下几点:
- 块副本数是否达到预期值,就是我们所说的mis-replica的block。如果replica少了,则发送一次replication请求。
- 块副本数满足条件之后,再进行block placement的检查,是否满足rack awareness,意为是否副本分布于多个rack。如果全部位于一个rack下,则需要进行一次重分布。这里的重分布行为和Balancer就不一样了,它是通过发起一次replica replication请求,然后由系统再进行over replica清理老的replica来达到副本位置的更新的。
相关代码如下,首先是切换Active时的replica检查方法:
/**
* Start services required in active state
* @throws IOException
*/
void startActiveServices() throws IOException {
startingActiveService = true;
LOG.info("Starting services required for active state");
writeLock();
...
blockManager.setPostponeBlocksFromFuture(false);
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
// Only need to re-process the queue, If not in SafeMode.
if (!isInSafeMode()) {
LOG.info("Reprocessing replication and invalidation queues");
// BlockManager初始化副本队列
blockManager.initializeReplQueues();
}
...
}
这里进入BlockManager的initializeReplQueues方法:
/**
* Initialize replication queues.
*/
public void initializeReplQueues() {
LOG.info("initializing replication queues");
processMisReplicatedBlocks();
initializedReplQueues = true;
}
可以看到,上面processMisReplicatedBlocks方法就是做miss副本块的检查的,此方法最终会进入到processMisReplicatedBlock方法:
/**
* A block needs reconstruction if the number of redundancies is less than
* expected or if it does not have enough racks.
*/
boolean isNeededReconstruction(BlockInfo storedBlock,
NumberReplicas numberReplicas, int pending) {
return storedBlock.isComplete() &&
!hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
}
...
// Check if the number of live + pending replicas satisfies
// the expected redundancy.
boolean hasEnoughEffectiveReplicas(BlockInfo block,
NumberReplicas numReplicas, int pendingReplicaNum) {
int required = getExpectedLiveRedundancyNum(block, numReplicas);
int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
return (numEffectiveReplicas >= required) &&
(pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
}
当BlockManager检测出当前块placement不正确的时候,会返回一个UNDER_REPLICATE的处理结果同时将此block加入待复制块列表中。
...
// add to low redundancy queue if need to be
if (isNeededReconstruction(block, num)) {
if (neededReconstruction.add(block, numCurrentReplica,
num.readOnlyReplicas(), num.outOfServiceReplicas(),
expectedRedundancy)) {
return MisReplicationResult.UNDER_REPLICATED;
}
}
...
因为BlockManager在此过程中是对全量块做检查的,为了避免同时触发大量块复制所引发的性能问题,这里是将此过程拆分为小的iteration,每次iteration还会有睡眠间隔时间。
同时为了避免block副本队列初始化过程实际太长,此过程已改为异步进行,不会阻后续服务的初始化。综上所述,通过failover行为方式触发的block块重分布是一个可行且高效的办法,但是需要我们控制好相关速率参数,相关参数如下:
<property>
<name>dfs.block.misreplication.processing.limit</name>
<value>10000</value>
<description>
Maximum number of blocks to process for initializing replication queues.
</description>
</property>
局部数据文件块位置迁移
上面小节介绍的是集群全量块级别的块Topology位置迁移,但在有的使用场景中,我们可能只需要对极个别文件目录或是具体一些DN节点上的数据做块检查迁移,以此缩短Topology变更带来的影响。
这个功能是在前段时间不久被合入的,HDFS-14053: Provide ability for NN to re-replicate based on topology changes.
这个JIRA的的本质核心还是利用了上文讲述的BlockManager的process miss replica的方法。不过此时它传入的不是全量块,而是收集了指定目录下的块列表,作为参数传入。此功能命令被集成进了fsck命令来方便用户的使用。
核心patch改动如下:
}
@@ -683,6 +694,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
StringBuilder report = new StringBuilder();
int blockNumber = 0;
final LocatedBlock lastBlock = blocks.getLastLocatedBlock();
+ List<BlockInfo> misReplicatedBlocks = new LinkedList<>();
for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
ExtendedBlock block = lBlk.getBlock();
if (!blocks.isLastBlockComplete() && lastBlock != null &&
@@ -791,6 +803,9 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
}
out.println(" Replica placement policy is violated for " +
block + ". " + blockPlacementStatus.getErrorDescription());
+ if (doReplicate) {
+ //如果需要replica重新replication,则收集fsck扫描的块结果
+ misReplicatedBlocks.add(storedBlock);
+ }
}
// count storage summary
@@ -888,6 +903,19 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
out.print(report + "
");
}
}
+
+ if (doReplicate && !misReplicatedBlocks.isEmpty()) {
+ // 将收集好的列表传入BlockManager的processMisReplicatedBlocks的方法
+ int processedBlocks = this.blockManager.processMisReplicatedBlocks(
+ misReplicatedBlocks);
+ if (processedBlocks < misReplicatedBlocks.size()) {
+ LOG.warn("Fsck: Block manager is able to process only " +
+ processedBlocks +
+ " mis-replicated blocks (Total count : " +
+ misReplicatedBlocks.size() +
+ " ) for path " + path);
+ }
+ res.numBlocksQueuedForReplication += processedBlocks;
+ }
}
如果是按照单台DN来的话,相似的道理,我们需要去收集目标DN上的所有块数据,g感兴趣尝试的同学可以参考BlockManager现有的移除节点的方法,下面的toRemove就是DN上的当前块汇总。
/** Remove the blocks associated to the given datanode. */
void removeBlocksAssociatedTo(final DatanodeDescriptor node) {
providedStorageMap.removeDatanode(node);
for (DatanodeStorageInfo storage : node.getStorageInfos()) {
final Iterator<BlockInfo> it = storage.getBlockIterator();
//add the BlockInfos to a new collection as the
//returned iterator is not modifiable.
Collection<BlockInfo> toRemove = new ArrayList<>();
while (it.hasNext()) {
toRemove.add(it.next());
}
for (BlockInfo b : toRemove) {
removeStoredBlock(b, node);
}
}
// Remove all pending DN messages referencing this DN.
pendingDNMessages.removeAllMessagesForDatanode(node);
node.resetBlocks();
invalidateBlocks.remove(node);
}
以上就是笔者今天要阐述的所有内容了,希望对大家有所帮助。
引言
[1].https://issues.apache.org/jira/browse/HDFS-14053: Provide ability for NN to re-replicate based on topology changes