zoukankan      html  css  js  c++  java
  • Hadoop源码分析之FileSystem抽象文件系统

    Hadopo提供了一个抽象的文件系统模型FileSystem,HDFS是其中的一个实现。

    FileSystem是Hadoop中所有文件系统的抽象父类,它定义了文件系统所具有的基本特征和基本操作。

    FileSystem类在org.apache.hadoop.fs包中。在eclipse中按ctrl+shift+T进行搜索,提示导入源码包hadoop-hdfs-client-3.0.0-sources.jar。导入即可。

    一、成员变量

      1.Hadoop使用的默认的文件系统的配置项,在core-default.xml中

    public static final String FS_DEFAULT_NAME_KEY =CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;  //该值为fs.defultFS

      2.文件系统的缓存

    /** FileSystem cache. */
      static final Cache CACHE = new Cache();

      3.该文件系统在Cache中的key实例

      /** The key this instance is stored under in the cache. */
      private Cache.Key key;

      4.记录每一个文件系统类的统计信息

      /** Recording statistics per a FileSystem class. */
      private static final Map<Class<? extends FileSystem>, Statistics>
          statisticsTable = new IdentityHashMap<>();

      5.该文件系统的统计信息

      /**
       * The statistics for this file system.
       */
      protected Statistics statistics;

       6.在文件系统关闭或者JVN退出时,需要删除缓存中的文件

      /**
       * A cache of files that should be deleted when the FileSystem is closed
       * or the JVM is exited.
       */
      private final Set<Path> deleteOnExit = new TreeSet<>();

     二、内部类

    内部类 Cache:缓存文件系统对象

      1.Hadoop将文件系统对象以键值对的形式保存到HashMap中。key是一个Cache的静态内部类

        private final Map<Key, FileSystem> map = new HashMap<>();

       2.根据文件系统的URI和配置信息得到一个key,再得到一个文件系统实例。如果文件系统不存在,则创建并初始化一个文件系统

        FileSystem get(URI uri, Configuration conf) throws IOException{
          Key key = new Key(uri, conf);
          return getInternal(uri, conf, key);
        }
    
        private FileSystem getInternal(URI uri, Configuration conf, Key key)
            throws IOException{
          FileSystem fs;
          synchronized (this) {
            fs = map.get(key);
          }
          if (fs != null) {
            return fs;
          }
    
          fs = createFileSystem(uri, conf);
          synchronized (this) { // refetch the lock again
            FileSystem oldfs = map.get(key);
            if (oldfs != null) { // a file system is created while lock is releasing
              fs.close(); // close the new file system
              return oldfs;  // return the old file system
            }
    
            // now insert the new file system into the map
            if (map.isEmpty()
                    && !ShutdownHookManager.get().isShutdownInProgress()) {
              ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
            }
            fs.key = key;
            map.put(key, fs);
            if (conf.getBoolean(
                FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
              toAutoClose.add(key);
            }
            return fs;
          }
        }

       3.删除指定的key对应的文件系统实例

        synchronized void remove(Key key, FileSystem fs) {
          FileSystem cachedFs = map.remove(key);
          if (fs == cachedFs) {
            toAutoClose.remove(key);
          } else if (cachedFs != null) {
            map.put(key, cachedFs);
          }
        }

      4.删除所有的文件系统对象,并关闭这些文件系统。onlyAutomatic - 仅仅关闭这些标记为自动关闭的。

        /**
         * Close all FileSystem instances in the Cache.
         * @param onlyAutomatic only close those that are marked for automatic closing
         * @throws IOException a problem arose closing one or more FileSystem.
         */
        synchronized void closeAll(boolean onlyAutomatic) throws IOException {
          List<IOException> exceptions = new ArrayList<>();
    
          // Make a copy of the keys in the map since we'll be modifying
          // the map while iterating over it, which isn't safe.
          List<Key> keys = new ArrayList<>();
          keys.addAll(map.keySet());
    
          for (Key key : keys) {
            final FileSystem fs = map.get(key);
    
            if (onlyAutomatic && !toAutoClose.contains(key)) {
              continue;
            }
    
            //remove from cache
            map.remove(key);
            toAutoClose.remove(key);
    
            if (fs != null) {
              try {
                fs.close();
              }
              catch(IOException ioe) {
                exceptions.add(ioe);
              }
            }
          }
    
          if (!exceptions.isEmpty()) {
            throw MultipleIOException.createIOException(exceptions);
          }
        }

       5.根据用户组信息关闭对应的文件系统

        synchronized void closeAll(UserGroupInformation ugi) throws IOException {
          List<FileSystem> targetFSList = new ArrayList<>(map.entrySet().size());
          //Make a pass over the list and collect the FileSystems to close
          //we cannot close inline since close() removes the entry from the Map
          for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
            final Key key = entry.getKey();
            final FileSystem fs = entry.getValue();
            if (ugi.equals(key.ugi) && fs != null) {
              targetFSList.add(fs);
            }
          }
          List<IOException> exceptions = new ArrayList<>();
          //now make a pass over the target list and close each
          for (FileSystem fs : targetFSList) {
            try {
              fs.close();
            }
            catch(IOException ioe) {
              exceptions.add(ioe);
            }
          }
          if (!exceptions.isEmpty()) {
            throw MultipleIOException.createIOException(exceptions);
          }
        }

     内部类 Key :

      1.成员变量

          final String scheme;    //模式信息    
          final String authority;    //授权信息
          final UserGroupInformation ugi;    //用户组信息

      2.有两种构造方法

          Key(URI uri, Configuration conf) throws IOException {
            this(uri, conf, 0);
          }
    
          Key(URI uri, Configuration conf, long unique) throws IOException {
            scheme = uri.getScheme()==null ?
                "" : StringUtils.toLowerCase(uri.getScheme());
            authority = uri.getAuthority()==null ?
                "" : StringUtils.toLowerCase(uri.getAuthority());
            this.unique = unique;
    
            this.ugi = UserGroupInformation.getCurrentUser();
          }

     内部类 Statistics :文件系统的统计信息

       1.成员变量

          private final String scheme;    //文件系统URI的模式信息
          private volatile long bytesRead;     //从统计信息中读取的字节数
          private volatile long bytesWritten;    //向统计信息中写入的字节数
          private volatile int readOps;     //执行读操作的次数
          private volatile int largeReadOps;   //执行读取大数据操作的次数
          private volatile int writeOps;    //执行写操作的次数

     三、成员方法

    抽象方法如下:

       1.得到唯一标识文件系统的URI

      /**
       * Returns a URI which identifies this FileSystem.
       *
       * @return the URI of this filesystem.
       */
      public abstract URI getUri();

      2.打开Path路径指定的文件的FSDataInputStream输入流

      /**
       * Opens an FSDataInputStream at the indicated Path.
       * @param f the file name to open
       * @param bufferSize the size of the buffer to be used.
       * @throws IOException IO failure
       */
      public abstract FSDataInputStream open(Path f, int bufferSize)
        throws IOException;

      3.在指定的路径上创建一个具有写入进度的FSDataOutputStream

      /**
       * Create an FSDataOutputStream at the indicated Path with write-progress
       * reporting.
       * @param f the file name to open
       * @param permission file permission
       * @param overwrite if a file with this name already exists, then if true,
       *   the file will be overwritten, and if false an error will be thrown.
       * @param bufferSize the size of the buffer to be used.
       * @param replication required block replication for the file.
       * @param blockSize block size
       * @param progress the progress reporter
       * @throws IOException IO failure
       * @see #setPermission(Path, FsPermission)
       */
      public abstract FSDataOutputStream create(Path f,
          FsPermission permission,
          boolean overwrite,
          int bufferSize,
          short replication,
          long blockSize,
          Progressable progress) throws IOException;

      4.在已经存在的文件后执行追加操作

      /**
       * Append to an existing file (optional operation).
       * @param f the existing file to be appended.
       * @param bufferSize the size of the buffer to be used.
       * @param progress for reporting progress if it is not null.
       * @throws IOException IO failure
       * @throws UnsupportedOperationException if the operation is unsupported
       *         (default).
       */
      public abstract FSDataOutputStream append(Path f, int bufferSize,
          Progressable progress) throws IOException;

      5.将src指定的文件重命名为dst指定的文件

      /**
       * Renames Path src to Path dst.
       * @param src path to be renamed
       * @param dst new path after rename
       * @throws IOException on failure
       * @return true if rename is successful
       */
      public abstract boolean rename(Path src, Path dst) throws IOException;

      6.删除一个目录。如果这个path是一个目录并设置为true,则该目录被删除,否则将抛出一个异常。设置为true时会递归地删除目录。

      /** Delete a file.
       *
       * @param f the path to delete.
       * @param recursive if path is a directory and set to
       * true, the directory is deleted else throws an exception. In
       * case of a file the recursive can be set to either true or false.
       * @return  true if delete is successful else false.
       * @throws IOException IO failure
       */
      public abstract boolean delete(Path f, boolean recursive) throws IOException;

      7.列出一个目录下面的文件和子目录的状态信息

      /**
       * List the statuses of the files/directories in the given path if the path is
       * a directory.
       * <p>
       * Does not guarantee to return the List of files/directories status in a
       * sorted order.
       * <p>
       * Will not return null. Expect IOException upon access error.
       * @param f given path
       * @return the statuses of the files/directories in the given patch
       * @throws FileNotFoundException when the path does not exist
       * @throws IOException see specific implementation
       */
      public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,IOException;

      8.设置给定文件系统的当前工作目录

      /**
       * Set the current working directory for the given FileSystem. All relative
       * paths will be resolved relative to it.
       *
       * @param new_dir Path of new working directory
       */
      public abstract void setWorkingDirectory(Path new_dir);

      9.得到给定文件系统的当前工作目录

      /**
       * Get the current working directory for the given FileSystem
       * @return the directory pathname
       */
      public abstract Path getWorkingDirectory();

      10.创建一个具有操作权限的目录

      /**
       * Make the given file and all non-existent parents into
       * directories. Has roughly the semantics of Unix @{code mkdir -p}.
       * Existence of the directory hierarchy is not an error.
       * @param f path to create
       * @param permission to apply to f
       * @throws IOException IO failure
       */
      public abstract boolean mkdirs(Path f, FsPermission permission
          ) throws IOException;

      11.得到指定文件目录的状态信息

      /**
       * Return a file status object that represents the path.
       * @param f The path we want information from
       * @return a FileStatus object
       * @throws FileNotFoundException when the path does not exist
       * @throws IOException see specific implementation
       */
      public abstract FileStatus getFileStatus(Path f) throws IOException;

    以上是FileSystem的一些抽象方法。

    以下是FileSystem对抽象方法的一些重载方法。

      1.open的重载方法,IO_FILE_BUFFER_SIZE_KEY的值为"io.file.buffer.size",IO_FILE_BUFFER_SIZE_DEFULE的值为4096,即4KB。

      /**
       * Opens an FSDataInputStream at the indicated Path.
       * @param f the file to open
       * @throws IOException IO failure
       */
      public FSDataInputStream open(Path f) throws IOException {
        return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
            IO_FILE_BUFFER_SIZE_DEFAULT));
      }

       2.create的重载方法

        2.1默认对已经存在的文件进行重写。

      /**
       * Create an FSDataOutputStream at the indicated Path.
       * Files are overwritten by default.
       * @param f the file to create
       * @throws IOException IO failure
       */
      public FSDataOutputStream create(Path f) throws IOException {
        return create(f, true);
      }

           2.1.1根据缓冲区大小、副本数量、块大小来创建FSDataOutputStream

      /**
       * Create an FSDataOutputStream at the indicated Path.
       * @param f the file to create
       * @param overwrite if a file with this name already exists, then if true,
       *   the file will be overwritten, and if false an exception will be thrown.
       * @throws IOException IO failure
       */
      public FSDataOutputStream create(Path f, boolean overwrite)
          throws IOException {
        return create(f, overwrite,
                      getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
                          IO_FILE_BUFFER_SIZE_DEFAULT),
                      getDefaultReplication(f),
                      getDefaultBlockSize(f));
      }

      默认的块的副本数是1

      /**
       * Get the default replication.
       * @return the replication; the default value is "1".
       * @deprecated use {@link #getDefaultReplication(Path)} instead
       */
      @Deprecated
      public short getDefaultReplication() { return 1; }
    
      /**
       * Get the default replication for a path.
       * The given path will be used to locate the actual FileSystem to query.
       * The full path does not have to exist.
       * @param path of the file
       * @return default replication for the path's filesystem
       */
      public short getDefaultReplication(Path path) {
        return getDefaultReplication();
      }

       默认的块大小为32MB

      /**
       * Return the number of bytes that large input files should be optimally
       * be split into to minimize I/O time.
       * @deprecated use {@link #getDefaultBlockSize(Path)} instead
       */
      @Deprecated
      public long getDefaultBlockSize() {
        // default to 32MB: large enough to minimize the impact of seeks
        return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
      }
    
      /**
       * Return the number of bytes that large input files should be optimally
       * be split into to minimize I/O time.  The given path will be used to
       * locate the actual filesystem.  The full path does not have to exist.
       * @param f path of file
       * @return the default block size for the path's filesystem
       */
      public long getDefaultBlockSize(Path f) {
        return getDefaultBlockSize();
      }

       2.2在创建FSDataOutputStream的同时会向Hadoop汇报执行进度

      /**
       * Create an FSDataOutputStream at the indicated Path with write-progress
       * reporting.
       * Files are overwritten by default.
       * @param f the file to create
       * @param progress to report progress
       * @throws IOException IO failure 
       */
      public FSDataOutputStream create(Path f, Progressable progress)
          throws IOException {
        return create(f, true,
                      getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
                          IO_FILE_BUFFER_SIZE_DEFAULT),
                      getDefaultReplication(f),
                      getDefaultBlockSize(f), progress);
      }

       其他的create的重载方法大概就是使用指定或默认的缓冲区大小、块的副本数、块大小作为参数来创建FSDataOutputSream,以及指定跨文件系统进行创建。这里就不一一列举。

       3.append的重载方法

        3.1根据默认的缓冲区大小打开文件,然后向文件末尾追加内容

      /**
       * Append to an existing file (optional operation).
       * Same as
       * {@code append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
       *     IO_FILE_BUFFER_SIZE_DEFAULT), null)}
       * @param f the existing file to be appended.
       * @throws IOException IO failure
       * @throws UnsupportedOperationException if the operation is unsupported
       *         (default).
       */
      public FSDataOutputStream append(Path f) throws IOException {
        return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
            IO_FILE_BUFFER_SIZE_DEFAULT), null);
      }

         3.2根据用户指定的缓冲区大小来打开文件,然后向文件末尾追加内容

      /**
       * Append to an existing file (optional operation).
       * Same as append(f, bufferSize, null).
       * @param f the existing file to be appended.
       * @param bufferSize the size of the buffer to be used.
       * @throws IOException IO failure
       * @throws UnsupportedOperationException if the operation is unsupported
       *         (default).
       */
      public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
        return append(f, bufferSize, null);
      }

      4.mkdisr的重载方法

        4.1根据默认的文件系统权限来创建目录,默认为00777

      /**
       * Call {@link #mkdirs(Path, FsPermission)} with default permission.
       * @param f path
       * @return true if the directory was created
       * @throws IOException IO failure
       */
      public boolean mkdirs(Path f) throws IOException {
        return mkdirs(f, FsPermission.getDirDefault());
      }

        4.2跨文件系统来执行创建目录的操作。首先根据指定的路径创建一个目录,然后再将其权限设置为用户指定的权限

      /**
       * Create a directory with the provided permission.
       * The permission of the directory is set to be the provided permission as in
       * setPermission, not permission&~umask
       *
       * @see #create(FileSystem, Path, FsPermission)
       *
       * @param fs FileSystem handle
       * @param dir the name of the directory to be created
       * @param permission the permission of the directory
       * @return true if the directory creation succeeds; false otherwise
       * @throws IOException A problem creating the directories.
       */
      public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
          throws IOException {
        // create the directory using the default permission
        boolean result = fs.mkdirs(dir);
        // set its permission to be the supplied one
        fs.setPermission(dir, permission);
        return result;
      }

        5.listStatus的重载方法

          5.1使用指定的过滤器过滤指定path的文件,然后将剩余的文件的状态信息保存到用户给定的ArrayList集合中。

      /**
       * Filter files/directories in the given path using the user-supplied path
       * filter. Results are added to the given array <code>results</code>.
       * @throws FileNotFoundException when the path does not exist
       * @throws IOException see specific implementation
       */
      private void listStatus(ArrayList<FileStatus> results, Path f,
          PathFilter filter) throws FileNotFoundException, IOException {
        FileStatus listing[] = listStatus(f);
        Preconditions.checkNotNull(listing, "listStatus should not return NULL");
        for (int i = 0; i < listing.length; i++) {
          if (filter.accept(listing[i].getPath())) {
            results.add(listing[i]);
          }
        }
      }

          5.2和上面的区别是,保存状态信息的集合是内部新建的,不是用户指定的。

      public FileStatus[] listStatus(Path f, PathFilter filter)
                                       throws FileNotFoundException, IOException {
        ArrayList<FileStatus> results = new ArrayList<>();
        listStatus(results, f, filter);
        return results.toArray(new FileStatus[results.size()]);
      }

            5.3使用默认的过滤器来过滤指定path集合中的文件,然后将剩余的文件的状态信息保存到内部新建的列表中。

      /**
       * Filter files/directories in the given list of paths using default
       * path filter.
       * <p>
       * Does not guarantee to return the List of files/directories status in a
       * sorted order.
       *
       * @param files
       *          a list of paths
       * @return a list of statuses for the files under the given paths after
       *         applying the filter default Path filter
       * @throws FileNotFoundException when the path does not exist
       * @throws IOException see specific implementation
       */
      public FileStatus[] listStatus(Path[] files)
          throws FileNotFoundException, IOException {
        return listStatus(files, DEFAULT_FILTER);
      }

         5.4使用指定的过滤器来过滤指定path集合中的文件,然后将剩余的文件的状态信息保存到内部新建的列表中。

      public FileStatus[] listStatus(Path[] files, PathFilter filter)
          throws FileNotFoundException, IOException {
        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
        for (int i = 0; i < files.length; i++) {
          listStatus(results, files[i], filter);
        }
        return results.toArray(new FileStatus[results.size()]);
      }

      6.copyFromLocalFile方法的重载

        6.1将本地磁盘上src指定路径的文件复制到dst指定的路径,不删除源文件

      /**
       * The src file is on the local disk.  Add it to filesystem at
       * the given dst name and the source is kept intact afterwards
       * @param src path
       * @param dst path
       * @throws IOException IO failure
       */
      public void copyFromLocalFile(Path src, Path dst)
        throws IOException {
        copyFromLocalFile(false, src, dst);
      }

        6.2根据用户指定的delSrc的值来决定删不删除源文件

      /**
       * The src file is on the local disk.  Add it to the filesystem at
       * the given dst name.
       * delSrc indicates if the source should be removed
       * @param delSrc whether to delete the src
       * @param src path
       * @param dst path
       */
      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
        throws IOException {
        copyFromLocalFile(delSrc, true, src, dst);
      }

        6.3根据delSrc参数决定是否删除源文件,根据overwrite参数决定是否覆盖dst路径下已有的目标文件。

      /**
       * The src files are on the local disk.  Add it to the filesystem at
       * the given dst name.
       * delSrc indicates if the source should be removed
       * @param delSrc whether to delete the src
       * @param overwrite whether to overwrite an existing file
       * @param srcs array of paths which are source
       * @param dst path
       * @throws IOException IO failure
       */
      public void copyFromLocalFile(boolean delSrc, boolean overwrite,
                                    Path[] srcs, Path dst)
        throws IOException {
        Configuration conf = getConf();
        FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
      }

      7.moveFromLocalFile的重载方法,通过调用copyToLocalFile方法来实现

      /**
       * Copy a file to the local filesystem, then delete it from the
       * remote filesystem (if successfully copied).
       * @param src path src file in the remote filesystem
       * @param dst path local destination
       * @throws IOException IO failure
       */
      public void moveToLocalFile(Path src, Path dst) throws IOException {
        copyToLocalFile(true, src, dst);
      }

      8.将远程文件系统中src指定的文件复制到本地dst指定的路径下,delSrc参数决定是否删除源文件

      /**
       * Copy it a file from a remote filesystem to the local one.
       * delSrc indicates if the src will be removed or not.
       * @param delSrc whether to delete the src
       * @param src path src file in the remote filesystem
       * @param dst path local destination
       * @throws IOException IO failure
       */
      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
        throws IOException {
        copyToLocalFile(delSrc, src, dst, false);
      }
  • 相关阅读:
    how to uninstall devkit
    asp.net中bin目录下的 dll.refresh文件
    查找2个分支的共同父节点
    Three ways to do WCF instance management
    WCF Concurrency (Single, Multiple, and Reentrant) and Throttling
    检查string是否为double
    How to hide TabPage from TabControl
    获取当前系统中的时区
    git svn cygwin_exception
    lodoop打印控制具体解释
  • 原文地址:https://www.cnblogs.com/dj-blog/p/9178465.html
Copyright © 2011-2022 走看看