zoukankan      html  css  js  c++  java
  • Hadoop源码分析18:common包中的fs.FileSystem

    1. Hadoop 抽象文件系统  org.apache.hadoop.fs.FileSystem,具体HDFS是这个抽象类的子类

    publicabstractclassFileSystemextendsConfiguredimplementsCloseable{

     

     publicstaticfinalStringFS_DEFAULT_NAME_KEY="fs.default.name";

     

     

     privatestaticfinalCacheCACHE=newCache();

     

     

     privateCache.Keykey;

     

     

     privatestaticfinalMapClass? extendsFileSystem,Statistics

       statisticsTable=

         newIdentityHashMapClass?extendsFileSystem,Statistics();

     

     

     protectedStatisticsstatistics;

     

     

     privateSetPath deleteOnExit=newTreeSetPath();

     

     staticclassCache {

       privatefinalMapKey,FileSystem map=newHashMapKey,FileSystem();

       closeAll()

       closeAll(UserGroupInformation)

       get(URI, Configuration)

       remove(Key, FileSystem)

       size()

     }

     

     privatestaticclassClientFinalizerextendsThread{

          run()

     }

     publicstaticfinalclassStatistics{

       privatefinalStringscheme;

       privateAtomicLongbytesRead=newAtomicLong();

       privateAtomicLongbytesWritten=newAtomicLong();

       privateAtomicIntegerreadOps=newAtomicInteger();

       privateAtomicIntegerlargeReadOps=newAtomicInteger();

       privateAtomicIntegerwriteOps=newAtomicInteger();

    }

     

    addFileSystemForTesting(URI,Configuration, FileSystem)

    clearStatistics()

    closeAll()

    closeAllForUGI(UserGroupInformation)

    create(FileSystem, Path,FsPermission)

    createFileSystem(URI,Configuration)

    fixName(String)

    get(Configuration)

    get(URI,Configuration)

    get(URI,Configuration, String)

    getAllStatistics()

    getDefaultUri(Configuration)

    getLocal(Configuration)

    getNamed(String,Configuration)

    getStatistics()

    getStatistics(String,Class?extends FileSystem)

    mkdirs(FileSystem, Path,FsPermission)

    printStatistics()

    setDefaultUri(Configuration,String)

    setDefaultUri(Configuration,URI)

    deleteOnExit :SetPath

    key :Key

    statistics :Statistics

    FileSystem()

    append(Path)

    append(Path,int)

    append(Path,int, Progressable)

    checkPath(Path)

    close()

    completeLocalOutput(Path,Path)

    copyFromLocalFile(boolean,boolean, Path, Path)

    copyFromLocalFile(boolean,boolean, Path[], Path)

    copyFromLocalFile(boolean,Path, Path)

    copyFromLocalFile(Path,Path)

    copyToLocalFile(boolean,Path, Path)

    copyToLocalFile(Path,Path)

    create(Path)

    create(Path,boolean)

    create(Path,boolean, int)

    create(Path,boolean, int, Progressable)

    create(Path,boolean, int, short, long)

    create(Path,boolean, int, short, long, Progressable)

    create(Path,FsPermission, boolean, int, short, long, Progressable)

    create(Path,Progressable)

    create(Path,short)

    create(Path,short, Progressable)

    createNewFile(Path)

    createNonRecursive(Path,boolean, int, short, long, Progressable)

    createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)

    delete(Path)

    delete(Path,boolean)

    deleteOnExit(Path)

    exists(Path)

    getBlockSize(Path)

    getCanonicalServiceName()

    getCanonicalUri()

    getContentSummary(Path)

    getDefaultBlockSize()

    getDefaultPort()

    getDefaultReplication()

    getDelegationToken(String)

    getFileBlockLocations(FileStatus,long, long)

    getFileChecksum(Path)

    getFileStatus(Path)

    getFileStatus(Path[])

    getHomeDirectory()

    getLength(Path)

    getName()

    getReplication(Path)

    getUri()

    getUsed()

    getWorkingDirectory()

    globPathsLevel(Path[],String[], int, boolean[])

    globStatus(Path)

    globStatus(Path,PathFilter)

    globStatusInternal(Path,PathFilter)

    initialize(URI,Configuration)

    isDirectory(Path)

    isFile(Path)

    listStatus(ArrayListFileStatus,Path, PathFilter)

    listStatus(Path)

    listStatus(Path,PathFilter)

    listStatus(Path[])

    listStatus(Path[],PathFilter)

    makeQualified(Path)

    mkdirs(Path)

    mkdirs(Path,FsPermission)

    moveFromLocalFile(Path,Path)

    moveFromLocalFile(Path[],Path)

    moveToLocalFile(Path,Path)

    open(Path)

    open(Path,int)

    processDeleteOnExit()

    rename(Path,Path)

    setOwner(Path,String, String)

    setPermission(Path,FsPermission)

    setReplication(Path,short)

    setTimes(Path,long, long)

    setVerifyChecksum(boolean)

    setWorkingDirectory(Path)

    startLocalOutput(Path,Path)

     

    }

    2. 文件状态类  org.apache.hadoop.fs.FileStatus

    publicclassFileStatusimplementsWritable,Comparable {

     

     privatePathpath;

     privatelonglength;

     privatebooleanisdir;

     privateshortblock_replication;

     privatelongblocksize;

     privatelongmodification_time;

     privatelongaccess_time;

     privateFsPermissionpermission;

     privateStringowner;

     privateStringgroup;

    }

    文件权限org.apache.hadoop.fs.FsPermission

    public class FsPermissionimplements Writable{

     privateFsActionuseraction=null;

     privateFsActiongroupaction=null;

     privateFsActionotheraction =null;

    }

    publicenumFsAction{

     // POSIXstyle

     NONE("---"),

     EXECUTE("--x"),

     WRITE("-w-"),

     WRITE_EXECUTE("-wx"),

     READ("r--"),

     READ_EXECUTE("r-x"),

     READ_WRITE("rw-"),

     ALL("rwx");

    }

    资源使用概要 (相当于dudf命令) , org.apache.hadoop.fs.ContentSummary

    publicclassContentSummaryimplementsWritable{

     privatelonglength;

     privatelongfileCount;

     privatelongdirectoryCount;

     privatelongquota;

     privatelongspaceConsumed;

     private long spaceQuota;

    }

     

    3.文件输入输出流

    publicabstractclassFSInputStreamextendsInputStream

       implementsSeekable,PositionedReadable {

    getPos()

    read(long,byte[], int, int)

    readFully(long,byte[])

    readFully(long,byte[], int, int)

    seek(long)

    seekToNewSource(long)

    }

     

    publicclassFSDataInputStreamextendsDataInputStream

       implementsSeekable,PositionedReadable, Closeable {

    getPos()

    read(long,byte[], int, int)

    readFully(long,byte[])

    readFully(long,byte[], int, int)

    seek(long)

    seekToNewSource(long)

    }

     

    public class FSDataOutputStreamextends DataOutputStreamimplementsSyncable{

     privatestaticclassPositionCacheextendsFilterOutputStream{

       privateFileSystem.Statisticsstatistics;

       long position;

      PositionCache(OutputStream, Statistics, long)

       close()

       getPos()

       write(byte[], int, int)

       write(int)

     }

    close()

    getPos()

    getWrappedStream()

    sync()

    }

     

    4.FileSystem打开文件系统

     publicstaticFileSystemget(URIuri, Configuration conf) throwsIOException{

       String scheme = uri.getScheme();

       String authority = uri.getAuthority();

     

       if(scheme== null){                      // no scheme:use default FS

         returnget(conf);

       }

     

       if(authority== null){                      // noauthority

         URI defaultUri = getDefaultUri(conf);

         if(scheme.equals(defaultUri.getScheme())   // if schemematches default

             && defaultUri.getAuthority() != null){  // &default has authority

           returnget(defaultUri,conf);             // returndefault

         }

       }

       

       String disableCacheName = String.format("fs.%s.impl.disable.cache",scheme);

       if(conf.getBoolean(disableCacheName,false)){

         returncreateFileSystem(uri,conf);

       }

     

       returnCACHE.get(uri,conf);

     }

     privatestaticFileSystemcreateFileSystem(URIuri, Configuration conf

         ) throwsIOException{

       Class? clazz =conf.getClass("fs."+uri.getScheme() + ".impl",null);

       LOG.debug("Creatingfilesystem for " + uri);

       if(clazz== null){

         thrownewIOException("NoFileSystem for scheme: " +uri.getScheme());

       }

       FileSystem fs =(FileSystem)ReflectionUtils.newInstance(clazz,conf);

       fs.initialize(uri, conf);

       returnfs;

     }

    staticclassCache{

       privatefinalMapKey,FileSystem map=newHashMapKey,FileSystem();

     

       FileSystem get(URI uri,Configuration conf) throwsIOException{

         Key key = newKey(uri,conf);

         FileSystem fs = null;

         synchronized(this){

           fs = map.get(key);

         }

         if(fs !=null){

           returnfs;

         }

         

         fs = createFileSystem(uri, conf);

         synchronized(this){  // refetchthe lock again

           FileSystem oldfs = map.get(key);

           if(oldfs!= null){ // a filesystem is created while lock is releasing

             fs.close(); // close thenew file system

             returnoldfs; // returnthe old file system

           }

     

           // now insertthe new file system into the map

           if(map.isEmpty()&& !clientFinalizer.isAlive()){

             Runtime.getRuntime().addShutdownHook(clientFinalizer);

           }

           fs.key=key;

           map.put(key,fs);

           returnfs;

         }

       }

     

       staticclassKey{

         finalStringscheme;

         finalStringauthority;

         final UserGroupInformation ugi;

      }

     

    5.本地文件系统org.apache.hadoop.fs.RawLocalFileSystem

    publicclassRawLocalFileSystemextendsFileSystem{

     staticfinalURINAME=URI.create("file:///");

     privatePathworkingDir;

      classLocalFSFileInputStreamextendsFSInputStream{

       FileInputStream fis;

       private long position;LocalFSFileOutputStream;

       LocalFSFileInputStream(Path)

       available()

       close()

       getPos()

       markSupport()

       read()

       read(byte[], int, int)

       read(long, byte[], int, int)

       seek(long)

       seekToNewSource(long)

       skip(long)

       }

      classLocalFSFileOutputStreamextendsOutputStreamimplementsSyncable{

        FileOutputStreamfos;

        close()

        flush()

        sync()

        write(byte[], int, int)

        write(int)

      }

       static class RawLocalFileStatusextends FileStatus{

        getGroup()

        getOwner()

        getPermission()

        isPermissionLoaded()

        loadPermissionInfo()

        write(DataOutput)

       }

     classTrackingFileInputStreamextendsFileInputStream{

        TrackingFileInputStream(File)

        read()

        read(byte[])

        read(byte[],int, int)

     }

    append(Path,int, Progressable)

    close()

    completeLocalOutput(Path,Path)

    create(Path,boolean, boolean, int, short, long, Progressable)

    create(Path,boolean, int, short, long, Progressable)

    create(Path,FsPermission, boolean, int, short, long, Progressable)

    createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)

    delete(Path)

    delete(Path,boolean)

    getFileStatus(Path)

    getHomeDirectory()

    getUri()

    getWorkingDirectory()

    initialize(URI,Configuration)

    listStatus(Path)

    makeAbsolute(Path)

    mkdirs(Path)

    mkdirs(Path,FsPermission)

    moveFromLocalFile(Path,Path)

    open(Path,int)

    pathToFile(Path)

    rename(Path,Path)

    setOwner(Path,String, String)

    setPermission(Path,FsPermission)

    setWorkingDirectory(Path)

    startLocalOutput(Path,Path)

    toString()

    }

    6.带检验和的文件系统org.apache.hadoop.fs.ChecksumFileSystem

    publicabstractclassChecksumFileSystemextendsFilterFileSystem{

     privatestaticfinalbyte[]CHECKSUM_VERSION=newbyte[]{'c','r','c',0};

     privateintbytesPerChecksum=512;

     private boolean verifyChecksum =true;

     private staticclassChecksumFSInputCheckerextendsFSInputChecker{

       publicstaticfinalLogLOG

         = LogFactory.getLog(FSInputChecker.class);

       

       privateChecksumFileSystemfs;

       privateFSDataInputStreamdatas;

       privateFSDataInputStreamsums;

       

       privatestaticfinalintHEADER_LENGTH=8;

       

       privateintbytesPerSum=1;

       private long fileLen =-1L;

       available()

       close()

       getChecksumFilePos(long)

       getChunkPosition(long)

       getFileLength()

       read(long, byte[], int, int)

       readChunk(long, byte[], int, int, byte[])

       seek(long)

       seekToNewSource(long)

       skip(long)

     }

     privatestaticclassChecksumFSOutputSummerextendsFSOutputSummer{

       privateFSDataOutputStreamdatas;   

       privateFSDataOutputStreamsums;

       private static final float CHKSUM_AS_FRACTION =0.01f;

      close()

      writeChunk(byte[], int, int, byte[])

    }

    getApproxChkSumLength(long)

    getChecksumLength(long,int)

    isChecksumFile(Path)

    append(Path,int, Progressable)

    completeLocalOutput(Path,Path)

    copyFromLocalFile(boolean,Path, Path)

    copyToLocalFile(boolean,Path, Path)

    copyToLocalFile(Path,Path, boolean)

    create(Path,FsPermission, boolean, boolean, int, short, long,Progressable)

    create(Path,FsPermission, boolean, int, short, long, Progressable)

    createNonRecursive(Path,FsPermission, boolean, int, short, long, Progressable)

    delete(Path,boolean)

    getBytesPerSum()

    getChecksumFile(Path)

    getChecksumFileLength(Path,long)

    getRawFileSystem()

    getSumBufferSize(int,int)

    listStatus(Path)

    mkdirs(Path)

    open(Path,int)

    rename(Path,Path)

    reportChecksumFailure(Path,FSDataInputStream, long, FSDataInputStream, long)

    setConf(Configuration)

    setReplication(Path,short)

    setVerifyChecksum(boolean)

    startLocalOutput(Path,Path)

    }

    其中

    abstractpublicclassFSInputCheckerextendsFSInputStream{

     publicstaticfinalLogLOG

      =LogFactory.getLog(FSInputChecker.class);

     

     

     protectedPathfile;

     privateChecksumsum;

     privatebooleanverifyChecksum=true;

     privatebyte[]buf;

     privatebyte[]checksum;

     privateintpos;

     privateintcount;

     

     privateintnumOfRetries;

     

     // cachedfile position

     private long chunkPos =0;

    available()

    fill()

    getChecksum()

    getChunkPosition(long)

    getPos()

    mark(int)

    markSupported()

    needChecksum()

    read()

    read(byte[],int, int)

    read1(byte[],int, int)

    readChecksumChunk(byte[],int, int)

    readChunk(long,byte[], int, int, byte[])

    reset()

    resetState()

    seek(long)

    set(boolean,Checksum, int, int)

    skip(long)

    verifySum(long)

    }

     

    abstractpublicclassFSOutputSummerextendsOutputStream{

     // datachecksum

     privateChecksumsum;

     // internalbuffer for storing data before it is checksumed

     privatebytebuf[];

     // internalbuffer for storing checksum

     privatebytechecksum[];

     // The numberof valid bytes in the buffer.

     private int count;

    convertToByteStream(Checksum,int)

    int2byte(int,byte[])

    flushBuffer()

    flushBuffer(boolean)

    resetChecksumChunk(int)

    write(byte[],int, int)

    write(int)

    write1(byte[],int, int)

    writeChecksumChunk(byte[],int, int, boolean)

    writeChunk(byte[],int, int, byte[])

    }

     

    7.用于测试的内存文件系统InMemoryFileSystem

    publicclassInMemoryFileSystemextendsChecksumFileSystem{

     

     privatestaticclassRawInMemoryFileSystemextendsFileSystem{

       privateURIuri;

       privatelongfsSize;

       privatevolatilelongtotalUsed;

       privatePathstaticWorkingDir

       privateMapString,FileAttributes pathToFileAttribs=

         newHashMapString,FileAttributes();

      

       privateMapString,FileAttributes tempFileAttribs=

         newHashMapString,FileAttributes();

       privatestaticclassFileAttributes{

          privatebyte[]data;

          privateintsize;   

        }

         privateclassInMemoryFileStatusextendsFileStatus{

           }

     

        privateclassInMemoryInputStreamextendsFSInputStream{

           privateDataInputBufferdin=newDataInputBuffer();

           privateFileAttributesfAttr;

         }

          privateclassInMemoryOutputStreamextendsOutputStream{

            privateintcount;

            privateFileAttributesfAttr;

            privatePathf;

          }    

         append(Path, int, Progressable)

         canFitInMemory(long)

         close()

         create(Path, FileAttributes)

         create(Path, FsPermission, boolean, int, short, long,Progressable)

         delete(Path)

         delete(Path, boolean)

         getFiles(PathFilter)

         getFileStatus(Path)

         getFSSize()

         getNumFiles(PathFilter)

         getPath(Path)

         getPercentUsed()

         getUri()

         getWorkingDirectory()

         initialize(URI, Configuration)

         listStatus(Path)

         mkdirs(Path, FsPermission)

         open(Path, int)

         rename(Path, Path)

         reserveSpace(Path, long)

         setReplication(Path, short)

         setWorkingDirectory(Path)

         unreserveSpace(Path)

     }

      

     getFiles(PathFilter)

     getFSSize()

     getNumFiles(PathFilter)

     getPercentUsed()

     reserveSpaceWithCheckSum(Path, long)

    }

  • 相关阅读:
    #Responsive# 响应式图片//www.w3cplus.com/blog/tags/509.html 整个系列完结。
    用js实现帧动画
    js判断对象是否存在
    canvas
    函数的四种调用模式
    递归真的好难啊!!! 看完之后好多了
    词法作用域
    变量名提升
    ajax调用后台Java
    左图又文字自适应
  • 原文地址:https://www.cnblogs.com/leeeee/p/7276514.html
Copyright © 2011-2022 走看看