CorruptReplicasMap用于存储文件系统中所有损坏数据块的信息。仅当它的所有副本损坏时一个数据块才被认定为损坏。当汇报数据块的副本时,我们隐藏所有损坏副本。一旦一个数据块被发现完好副本达到预期,它将从CorruptReplicasMap中被移除。
我们先看下CorruptReplicasMap都有哪些成员变量,如下所示:
- // 存储损坏数据块Block与它对应每个数据节点与损坏原因集合映射关系的集合
- private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap =
- new TreeMap<Block, Map<DatanodeDescriptor, Reason>>();
是的,你没看错,就这一个corruptReplicasMap集合,它是一个用来存储损坏数据块Block实例与它对应每个数据节点和损坏原因集合的映射关系的集合。
我们看下CorruptReplicasMap都提供了哪些有用的方法。
一、addToCorruptReplicasMap()
标记属于指定数据节点的数据块为损坏
- /**
- * Mark the block belonging to datanode as corrupt.
- * 标记属于指定数据节点的数据块为损坏
- *
- * @param blk Block to be added to CorruptReplicasMap
- * @param dn DatanodeDescriptor which holds the corrupt replica
- * @param reason a textual reason (for logging purposes)
- * @param reasonCode the enum representation of the reason
- */
- void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
- String reason, Reason reasonCode) {
- / 先从corruptReplicasMap集合中查找是否存在对应数据块blk
- Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
- // 如果不存在,构造一个HashMap<DatanodeDescriptor, Reason>集合nodes,将blk与nodes存入corruptReplicasMap
- if (nodes == null) {
- nodes = new HashMap<DatanodeDescriptor, Reason>();
- corruptReplicasMap.put(blk, nodes);
- }
- String reasonText;
- if (reason != null) {
- reasonText = " because " + reason;
- } else {
- reasonText = "";
- }
- // 判断nodes中是否存在对应数据节点dn,分别记录日志信息
- if (!nodes.keySet().contains(dn)) {
- NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
- blk.getBlockName() +
- " added as corrupt on " + dn +
- " by " + Server.getRemoteIp() +
- reasonText);
- } else {
- NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
- "duplicate requested for " +
- blk.getBlockName() + " to add as corrupt " +
- "on " + dn +
- " by " + Server.getRemoteIp() +
- reasonText);
- }
- // Add the node or update the reason.
- // 将数据节点dn、损坏原因编码reasonCode加入或更新入nodes
- nodes.put(dn, reasonCode);
- }
处理逻辑很简单,大体如下:
1、先从corruptReplicasMap集合中查找是否存在对应数据块blk;
2、如果不存在,构造一个HashMap<DatanodeDescriptor, Reason>集合nodes,将blk与nodes存入corruptReplicasMap;
3、判断nodes中是否存在对应数据节点dn,分别记录日志信息;
4、将数据节点dn、损坏原因编码reasonCode加入或更新入nodes。
二、removeFromCorruptReplicasMap()
将指定数据块、数据节点,根据指定原因从集合corruptReplicasMap移除
- // 将指定数据块、数据节点,根据指定原因从集合corruptReplicasMap移除
- boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
- Reason reason) {
- / 先从corruptReplicasMap集合中查找是否存在对应数据块blk,获得datanodes
- Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk);
- // 如果不存在,直接返回false,表明移除失败
- if (datanodes==null)
- return false;
- // if reasons can be compared but don't match, return false.
- // 取出数据节点datanode对应的存储损坏原因storedReason
- Reason storedReason = datanodes.get(datanode);
- // 判断存储损坏原因storedReason与参数损坏原因reason是否一致,不一致直接返回false,表明移除失败,
- // 判断的依据为参数损坏原因reason不是ANY且存储损坏原因storedReason不为空的情况下,两者不一致
- if (reason != Reason.ANY && storedReason != null &&
- reason != storedReason) {
- return false;
- }
- // 将datanode对应数据从datanodes中移除
- if (datanodes.remove(datanode) != null) { // remove the replicas
- // 移除datanode后,如果datanodes为空
- if (datanodes.isEmpty()) {
- // remove the block if there is no more corrupted replicas
- // 将数据块blk从集合corruptReplicasMap中移除
- corruptReplicasMap.remove(blk);
- }
- // 返回true,表明移除成功
- return true;
- }
- // 其他情况下直接返回false,表明移除失败
- return false;
- }
三、getCorruptReplicaBlockIds()
获取指定大小和起始数据块ID的损坏数据块ID数组
- /**
- * Return a range of corrupt replica block ids. Up to numExpectedBlocks
- * blocks starting at the next block after startingBlockId are returned
- * (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
- * is null, up to numExpectedBlocks blocks are returned from the beginning.
- * If startingBlockId cannot be found, null is returned.
- * 获取指定大小和起始数据块ID的损坏数据块ID数组
- *
- * @param numExpectedBlocks Number of block ids to return.
- * 0 <= numExpectedBlocks <= 100
- * @param startingBlockId Block id from which to start. If null, start at
- * beginning.
- * @return Up to numExpectedBlocks blocks from startingBlockId if it exists
- *
- */
- long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
- Long startingBlockId) {
- / 校验numExpectedBlocks,需要获取的数据块ID数组最多有100个元素
- if (numExpectedBlocks < 0 || numExpectedBlocks > 100) {
- return null;
- }
- // 获得corruptReplicasMap集合的数据块迭代器blockIt
- Iterator<Block> blockIt = corruptReplicasMap.keySet().iterator();
- // if the starting block id was specified, iterate over keys until
- // we find the matching block. If we find a matching block, break
- // to leave the iterator on the next block after the specified block.
- // 如果设定了起始数据块艾迪startingBlockId
- if (startingBlockId != null) {
- boolean isBlockFound = false;
- // 遍历corruptReplicasMap,查看是否存在startingBlockId,如果存在,跳出循环,此时已记录住迭代器的位置了
- while (blockIt.hasNext()) {
- Block b = blockIt.next();
- if (b.getBlockId() == startingBlockId) {
- isBlockFound = true;
- break;
- }
- }
- // 如果不存在,直接返回null
- if (!isBlockFound) {
- return null;
- }
- }
- // 构造一个存储数据块ID的列表corruptReplicaBlockIds
- ArrayList<Long> corruptReplicaBlockIds = new ArrayList<Long>();
- // append up to numExpectedBlocks blockIds to our list
- // 遍历corruptReplicasMap,将最多numExpectedBlocks个数据块ID添加到列表corruptReplicaBlockIds,
- // 此时的迭代器可能不是从头开始取数据的,在startingBlockId需要并存在的情况下,它是从下一个元素开始获取的
- for(int i=0; i<numExpectedBlocks && blockIt.hasNext(); i++) {
- corruptReplicaBlockIds.add(blockIt.next().getBlockId());
- }
- // 将数据块ID列表corruptReplicaBlockIds转换成数组ret
- long[] ret = new long[corruptReplicaBlockIds.size()];
- for(int i=0; i<ret.length; i++) {
- ret[i] = corruptReplicaBlockIds.get(i);
- }
- // 返回数据块ID数组ret
- return ret;
- }
四、getNodes()
根据损坏数据块获取对应数据节点集合
- /**
- * Get Nodes which have corrupt replicas of Block
- * 根据损坏数据块获取对应数据节点集合
- *
- * @param blk Block for which nodes are requested
- * @return collection of nodes. Null if does not exists
- */
- Collection<DatanodeDescriptor> getNodes(Block blk) {
- Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk);
- if (nodes == null)
- return null;
- return nodes.keySet();
- }
五、isReplicaCorrupt()
检测指定数据块和数据节点是否为损坏的
- /**
- * Check if replica belonging to Datanode is corrupt
- * 检测指定数据块和数据节点是否为损坏的
- *
- * @param blk Block to check
- * @param node DatanodeDescriptor which holds the replica
- * @return true if replica is corrupt, false if does not exists in this map
- */
- boolean isReplicaCorrupt(Block blk, DatanodeDescriptor node) {
- Collection<DatanodeDescriptor> nodes = getNodes(blk);
- return ((nodes != null) && (nodes.contains(node)));
- }
六、numCorruptReplicas()
获取给定数据块对应数据节点数量
- // 获取给定数据块对应数据节点数量
- int numCorruptReplicas(Block blk) {
- Collection<DatanodeDescriptor> nodes = getNodes(blk);
- return (nodes == null) ? 0 : nodes.size();
- }
七、size()
获取损坏数据块数量
- // 获取损坏数据块数量
- int size() {
- return corruptReplicasMap.size();
- }