zoukankan      html  css  js  c++  java
  • 【原创】大叔问题定位分享(20)hdfs文件create写入正常,append写入报错

    最近在hdfs写文件的时候发现一个问题,create写入正常,append写入报错,每次都能重现,代码示例如下:

            FileSystem fs = FileSystem.get(conf);
            OutputStream out = fs.create(file);
            IOUtils.copyBytes(in, out, 4096, true); //正常
            out = fs.append(file);
            IOUtils.copyBytes(in, out, 4096, true); //报错

    通过hdfs fsck命令检查出问题的文件,发现只有一个副本,难道是因为这个?

    看FileSystem.append执行过程:

    org.apache.hadoop.fs.FileSystem

        public abstract FSDataOutputStream append(Path var1, int var2, Progressable var3) throws IOException;

    实现类在这里:

    org.apache.hadoop.hdfs.DistributedFileSystem

        public FSDataOutputStream append(Path f, final int bufferSize, final Progressable progress) throws IOException {
            this.statistics.incrementWriteOps(1);
            Path absF = this.fixRelativePart(f);
            return (FSDataOutputStream)(new FileSystemLinkResolver<FSDataOutputStream>() {
                public FSDataOutputStream doCall(Path p) throws IOException, UnresolvedLinkException {
                    return DistributedFileSystem.this.dfs.append(DistributedFileSystem.this.getPathName(p), bufferSize, progress, DistributedFileSystem.this.statistics);
                }
    
                public FSDataOutputStream next(FileSystem fs, Path p) throws IOException {
                    return fs.append(p, bufferSize);
                }
            }).resolve(this, absF);
        }

    这里会调用DFSClient.append方法

    org.apache.hadoop.hdfs.DFSClient

        private DFSOutputStream append(String src, int buffersize, Progressable progress) throws IOException {
            this.checkOpen();
            DFSOutputStream result = this.callAppend(src, buffersize, progress);
            this.beginFileLease(result.getFileId(), result);
            return result;
        }
    
        private DFSOutputStream callAppend(String src, int buffersize, Progressable progress) throws IOException {
            LocatedBlock lastBlock = null;
    
            try {
                lastBlock = this.namenode.append(src, this.clientName);
            } catch (RemoteException var6) {
                throw var6.unwrapRemoteException(new Class[]{AccessControlException.class, FileNotFoundException.class, SafeModeException.class, DSQuotaExceededException.class, UnsupportedOperationException.class, UnresolvedPathException.class, SnapshotAccessControlException.class});
            }
    
            HdfsFileStatus newStat = this.getFileInfo(src);
            return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress, lastBlock, newStat, this.dfsClientConf.createChecksum());
        }

    DFSClient.append最终会调用NameNodeRpcServer的append方法

    org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer

        public LocatedBlock append(String src, String clientName) throws IOException {
            this.checkNNStartup();
            String clientMachine = getClientMachine();
            if (stateChangeLog.isDebugEnabled()) {
                stateChangeLog.debug("*DIR* NameNode.append: file " + src + " for " + clientName + " at " + clientMachine);
            }
    
            this.namesystem.checkOperation(OperationCategory.WRITE);
            LocatedBlock info = this.namesystem.appendFile(src, clientName, clientMachine);
            this.metrics.incrFilesAppended();
            return info;
        }

    这里调用到FSNamesystem.append

    org.apache.hadoop.hdfs.server.namenode.FSNamesystem

        LocatedBlock appendFile(String src, String holder, String clientMachine) throws AccessControlException, SafeModeException, 
    ...
                    lb = this.appendFileInt(src, holder, clientMachine, cacheEntry != null);
    
        private LocatedBlock appendFileInt(String srcArg, String holder, String clientMachine, boolean logRetryCache) throws 
    ...
                    lb = this.appendFileInternal(pc, src, holder, clientMachine, logRetryCache);
    
        private LocatedBlock appendFileInternal(FSPermissionChecker pc, String src, String holder, String clientMachine, boolean logRetryCache) throws AccessControlException, UnresolvedLinkException, FileNotFoundException, IOException {
            assert this.hasWriteLock();
    
            INodesInPath iip = this.dir.getINodesInPath4Write(src);
            INode inode = iip.getLastINode();
            if (inode != null && inode.isDirectory()) {
                throw new FileAlreadyExistsException("Cannot append to directory " + src + "; already exists as a directory.");
            } else {
                if (this.isPermissionEnabled) {
                    this.checkPathAccess(pc, src, FsAction.WRITE);
                }
    
                try {
                    if (inode == null) {
                        throw new FileNotFoundException("failed to append to non-existent file " + src + " for client " + clientMachine);
                    } else {
                        INodeFile myFile = INodeFile.valueOf(inode, src, true);
                        BlockStoragePolicy lpPolicy = this.blockManager.getStoragePolicy("LAZY_PERSIST");
                        if (lpPolicy != null && lpPolicy.getId() == myFile.getStoragePolicyID()) {
                            throw new UnsupportedOperationException("Cannot append to lazy persist file " + src);
                        } else {
                            this.recoverLeaseInternal(myFile, src, holder, clientMachine, false);
                            myFile = INodeFile.valueOf(this.dir.getINode(src), src, true);
                            BlockInfo lastBlock = myFile.getLastBlock();
                            if (lastBlock != null && lastBlock.isComplete() && !this.getBlockManager().isSufficientlyReplicated(lastBlock)) {
                                throw new IOException("append: lastBlock=" + lastBlock + " of src=" + src + " is not sufficiently replicated yet.");
                            } else {
                                return this.prepareFileForWrite(src, iip, holder, clientMachine, true, logRetryCache);
                            }
                        }
                    }
                } catch (IOException var11) {
                    NameNode.stateChangeLog.warn("DIR* NameSystem.append: " + var11.getMessage());
                    throw var11;
                }
            }
        }
    
        public boolean isSufficientlyReplicated(BlockInfo b) {
            int replication = Math.min(this.minReplication, this.getDatanodeManager().getNumLiveDataNodes());
            return this.countNodes(b).liveReplicas() >= replication;
        }

    在append文件的时候,会首先取出这个文件最后一个block,然后会检查这个block是否满足副本要求,如果不满足就抛出异常,如果满足就准备写入;
    看来原因确实是因为文件只有1个副本导致append报错,那为什么新建文件只有1个副本,后来找到原因是因为机架配置有问题导致的,详见 https://www.cnblogs.com/barneywill/p/10114504.html

  • 相关阅读:
    HDU 1501 Zipper(DFS)
    HDU 2181 哈密顿绕行世界问题(DFS)
    HDU 1254 推箱子(BFS)
    HDU 1045 Fire Net (DFS)
    HDU 2212 DFS
    HDU 1241Oil Deposits (DFS)
    HDU 1312 Red and Black (DFS)
    HDU 1010 Tempter of the Bone(DFS+奇偶剪枝)
    HDU 1022 Train Problem I(栈)
    HDU 1008 u Calculate e
  • 原文地址:https://www.cnblogs.com/barneywill/p/10154645.html
Copyright © 2011-2022 走看看