zoukankan      html  css  js  c++  java
  • Hadoop源码分析之FileSystem抽象文件系统

    Hadopo提供了一个抽象的文件系统模型FileSystem,HDFS是其中的一个实现。

    FileSystem是Hadoop中所有文件系统的抽象父类,它定义了文件系统所具有的基本特征和基本操作。

    FileSystem类在org.apache.hadoop.fs包中。在eclipse中按ctrl+shift+T进行搜索,提示导入源码包hadoop-hdfs-client-3.0.0-sources.jar。导入即可。

    一、成员变量

      1.Hadoop使用的默认的文件系统的配置项,在core-default.xml中

    public static final String FS_DEFAULT_NAME_KEY =CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;  //该值为fs.defultFS

      2.文件系统的缓存

    /** FileSystem cache. */
      static final Cache CACHE = new Cache();

      3.该文件系统在Cache中的key实例

      /** The key this instance is stored under in the cache. */
      private Cache.Key key;

      4.记录每一个文件系统类的统计信息

      /** Recording statistics per a FileSystem class. */
      private static final Map<Class<? extends FileSystem>, Statistics>
          statisticsTable = new IdentityHashMap<>();

      5.该文件系统的统计信息

      /**
       * The statistics for this file system.
       */
      protected Statistics statistics;

       6.在文件系统关闭或者JVN退出时,需要删除缓存中的文件

      /**
       * A cache of files that should be deleted when the FileSystem is closed
       * or the JVM is exited.
       */
      private final Set<Path> deleteOnExit = new TreeSet<>();

     二、内部类

    内部类 Cache:缓存文件系统对象

      1.Hadoop将文件系统对象以键值对的形式保存到HashMap中。key是一个Cache的静态内部类

        private final Map<Key, FileSystem> map = new HashMap<>();

       2.根据文件系统的URI和配置信息得到一个key,再得到一个文件系统实例。如果文件系统不存在,则创建并初始化一个文件系统

        FileSystem get(URI uri, Configuration conf) throws IOException{
          Key key = new Key(uri, conf);
          return getInternal(uri, conf, key);
        }
    
        private FileSystem getInternal(URI uri, Configuration conf, Key key)
            throws IOException{
          FileSystem fs;
          synchronized (this) {
            fs = map.get(key);
          }
          if (fs != null) {
            return fs;
          }
    
          fs = createFileSystem(uri, conf);
          synchronized (this) { // refetch the lock again
            FileSystem oldfs = map.get(key);
            if (oldfs != null) { // a file system is created while lock is releasing
              fs.close(); // close the new file system
              return oldfs;  // return the old file system
            }
    
            // now insert the new file system into the map
            if (map.isEmpty()
                    && !ShutdownHookManager.get().isShutdownInProgress()) {
              ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
            }
            fs.key = key;
            map.put(key, fs);
            if (conf.getBoolean(
                FS_AUTOMATIC_CLOSE_KEY, FS_AUTOMATIC_CLOSE_DEFAULT)) {
              toAutoClose.add(key);
            }
            return fs;
          }
        }

       3.删除指定的key对应的文件系统实例

        synchronized void remove(Key key, FileSystem fs) {
          FileSystem cachedFs = map.remove(key);
          if (fs == cachedFs) {
            toAutoClose.remove(key);
          } else if (cachedFs != null) {
            map.put(key, cachedFs);
          }
        }

      4.删除所有的文件系统对象,并关闭这些文件系统。onlyAutomatic - 仅仅关闭这些标记为自动关闭的。

        /**
         * Close all FileSystem instances in the Cache.
         * @param onlyAutomatic only close those that are marked for automatic closing
         * @throws IOException a problem arose closing one or more FileSystem.
         */
        synchronized void closeAll(boolean onlyAutomatic) throws IOException {
          List<IOException> exceptions = new ArrayList<>();
    
          // Make a copy of the keys in the map since we'll be modifying
          // the map while iterating over it, which isn't safe.
          List<Key> keys = new ArrayList<>();
          keys.addAll(map.keySet());
    
          for (Key key : keys) {
            final FileSystem fs = map.get(key);
    
            if (onlyAutomatic && !toAutoClose.contains(key)) {
              continue;
            }
    
            //remove from cache
            map.remove(key);
            toAutoClose.remove(key);
    
            if (fs != null) {
              try {
                fs.close();
              }
              catch(IOException ioe) {
                exceptions.add(ioe);
              }
            }
          }
    
          if (!exceptions.isEmpty()) {
            throw MultipleIOException.createIOException(exceptions);
          }
        }

       5.根据用户组信息关闭对应的文件系统

        synchronized void closeAll(UserGroupInformation ugi) throws IOException {
          List<FileSystem> targetFSList = new ArrayList<>(map.entrySet().size());
          //Make a pass over the list and collect the FileSystems to close
          //we cannot close inline since close() removes the entry from the Map
          for (Map.Entry<Key, FileSystem> entry : map.entrySet()) {
            final Key key = entry.getKey();
            final FileSystem fs = entry.getValue();
            if (ugi.equals(key.ugi) && fs != null) {
              targetFSList.add(fs);
            }
          }
          List<IOException> exceptions = new ArrayList<>();
          //now make a pass over the target list and close each
          for (FileSystem fs : targetFSList) {
            try {
              fs.close();
            }
            catch(IOException ioe) {
              exceptions.add(ioe);
            }
          }
          if (!exceptions.isEmpty()) {
            throw MultipleIOException.createIOException(exceptions);
          }
        }

     内部类 Key :

      1.成员变量

          final String scheme;    //模式信息    
          final String authority;    //授权信息
          final UserGroupInformation ugi;    //用户组信息

      2.有两种构造方法

          Key(URI uri, Configuration conf) throws IOException {
            this(uri, conf, 0);
          }
    
          Key(URI uri, Configuration conf, long unique) throws IOException {
            scheme = uri.getScheme()==null ?
                "" : StringUtils.toLowerCase(uri.getScheme());
            authority = uri.getAuthority()==null ?
                "" : StringUtils.toLowerCase(uri.getAuthority());
            this.unique = unique;
    
            this.ugi = UserGroupInformation.getCurrentUser();
          }

     内部类 Statistics :文件系统的统计信息

       1.成员变量

          private final String scheme;    //文件系统URI的模式信息
          private volatile long bytesRead;     //从统计信息中读取的字节数
          private volatile long bytesWritten;    //向统计信息中写入的字节数
          private volatile int readOps;     //执行读操作的次数
          private volatile int largeReadOps;   //执行读取大数据操作的次数
          private volatile int writeOps;    //执行写操作的次数

     三、成员方法

    抽象方法如下:

       1.得到唯一标识文件系统的URI

      /**
       * Returns a URI which identifies this FileSystem.
       *
       * @return the URI of this filesystem.
       */
      public abstract URI getUri();

      2.打开Path路径指定的文件的FSDataInputStream输入流

      /**
       * Opens an FSDataInputStream at the indicated Path.
       * @param f the file name to open
       * @param bufferSize the size of the buffer to be used.
       * @throws IOException IO failure
       */
      public abstract FSDataInputStream open(Path f, int bufferSize)
        throws IOException;

      3.在指定的路径上创建一个具有写入进度的FSDataOutputStream

      /**
       * Create an FSDataOutputStream at the indicated Path with write-progress
       * reporting.
       * @param f the file name to open
       * @param permission file permission
       * @param overwrite if a file with this name already exists, then if true,
       *   the file will be overwritten, and if false an error will be thrown.
       * @param bufferSize the size of the buffer to be used.
       * @param replication required block replication for the file.
       * @param blockSize block size
       * @param progress the progress reporter
       * @throws IOException IO failure
       * @see #setPermission(Path, FsPermission)
       */
      public abstract FSDataOutputStream create(Path f,
          FsPermission permission,
          boolean overwrite,
          int bufferSize,
          short replication,
          long blockSize,
          Progressable progress) throws IOException;

      4.在已经存在的文件后执行追加操作

      /**
       * Append to an existing file (optional operation).
       * @param f the existing file to be appended.
       * @param bufferSize the size of the buffer to be used.
       * @param progress for reporting progress if it is not null.
       * @throws IOException IO failure
       * @throws UnsupportedOperationException if the operation is unsupported
       *         (default).
       */
      public abstract FSDataOutputStream append(Path f, int bufferSize,
          Progressable progress) throws IOException;

      5.将src指定的文件重命名为dst指定的文件

      /**
       * Renames Path src to Path dst.
       * @param src path to be renamed
       * @param dst new path after rename
       * @throws IOException on failure
       * @return true if rename is successful
       */
      public abstract boolean rename(Path src, Path dst) throws IOException;

      6.删除一个目录。如果这个path是一个目录并设置为true,则该目录被删除,否则将抛出一个异常。设置为true时会递归地删除目录。

      /** Delete a file.
       *
       * @param f the path to delete.
       * @param recursive if path is a directory and set to
       * true, the directory is deleted else throws an exception. In
       * case of a file the recursive can be set to either true or false.
       * @return  true if delete is successful else false.
       * @throws IOException IO failure
       */
      public abstract boolean delete(Path f, boolean recursive) throws IOException;

      7.列出一个目录下面的文件和子目录的状态信息

      /**
       * List the statuses of the files/directories in the given path if the path is
       * a directory.
       * <p>
       * Does not guarantee to return the List of files/directories status in a
       * sorted order.
       * <p>
       * Will not return null. Expect IOException upon access error.
       * @param f given path
       * @return the statuses of the files/directories in the given patch
       * @throws FileNotFoundException when the path does not exist
       * @throws IOException see specific implementation
       */
      public abstract FileStatus[] listStatus(Path f) throws FileNotFoundException,IOException;

      8.设置给定文件系统的当前工作目录

      /**
       * Set the current working directory for the given FileSystem. All relative
       * paths will be resolved relative to it.
       *
       * @param new_dir Path of new working directory
       */
      public abstract void setWorkingDirectory(Path new_dir);

      9.得到给定文件系统的当前工作目录

      /**
       * Get the current working directory for the given FileSystem
       * @return the directory pathname
       */
      public abstract Path getWorkingDirectory();

      10.创建一个具有操作权限的目录

      /**
       * Make the given file and all non-existent parents into
       * directories. Has roughly the semantics of Unix @{code mkdir -p}.
       * Existence of the directory hierarchy is not an error.
       * @param f path to create
       * @param permission to apply to f
       * @throws IOException IO failure
       */
      public abstract boolean mkdirs(Path f, FsPermission permission
          ) throws IOException;

      11.得到指定文件目录的状态信息

      /**
       * Return a file status object that represents the path.
       * @param f The path we want information from
       * @return a FileStatus object
       * @throws FileNotFoundException when the path does not exist
       * @throws IOException see specific implementation
       */
      public abstract FileStatus getFileStatus(Path f) throws IOException;

    以上是FileSystem的一些抽象方法。

    以下是FileSystem对抽象方法的一些重载方法。

      1.open的重载方法,IO_FILE_BUFFER_SIZE_KEY的值为"io.file.buffer.size",IO_FILE_BUFFER_SIZE_DEFULE的值为4096,即4KB。

      /**
       * Opens an FSDataInputStream at the indicated Path.
       * @param f the file to open
       * @throws IOException IO failure
       */
      public FSDataInputStream open(Path f) throws IOException {
        return open(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
            IO_FILE_BUFFER_SIZE_DEFAULT));
      }

       2.create的重载方法

        2.1默认对已经存在的文件进行重写。

      /**
       * Create an FSDataOutputStream at the indicated Path.
       * Files are overwritten by default.
       * @param f the file to create
       * @throws IOException IO failure
       */
      public FSDataOutputStream create(Path f) throws IOException {
        return create(f, true);
      }

           2.1.1根据缓冲区大小、副本数量、块大小来创建FSDataOutputStream

      /**
       * Create an FSDataOutputStream at the indicated Path.
       * @param f the file to create
       * @param overwrite if a file with this name already exists, then if true,
       *   the file will be overwritten, and if false an exception will be thrown.
       * @throws IOException IO failure
       */
      public FSDataOutputStream create(Path f, boolean overwrite)
          throws IOException {
        return create(f, overwrite,
                      getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
                          IO_FILE_BUFFER_SIZE_DEFAULT),
                      getDefaultReplication(f),
                      getDefaultBlockSize(f));
      }

      默认的块的副本数是1

      /**
       * Get the default replication.
       * @return the replication; the default value is "1".
       * @deprecated use {@link #getDefaultReplication(Path)} instead
       */
      @Deprecated
      public short getDefaultReplication() { return 1; }
    
      /**
       * Get the default replication for a path.
       * The given path will be used to locate the actual FileSystem to query.
       * The full path does not have to exist.
       * @param path of the file
       * @return default replication for the path's filesystem
       */
      public short getDefaultReplication(Path path) {
        return getDefaultReplication();
      }

       默认的块大小为32MB

      /**
       * Return the number of bytes that large input files should be optimally
       * be split into to minimize I/O time.
       * @deprecated use {@link #getDefaultBlockSize(Path)} instead
       */
      @Deprecated
      public long getDefaultBlockSize() {
        // default to 32MB: large enough to minimize the impact of seeks
        return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
      }
    
      /**
       * Return the number of bytes that large input files should be optimally
       * be split into to minimize I/O time.  The given path will be used to
       * locate the actual filesystem.  The full path does not have to exist.
       * @param f path of file
       * @return the default block size for the path's filesystem
       */
      public long getDefaultBlockSize(Path f) {
        return getDefaultBlockSize();
      }

       2.2在创建FSDataOutputStream的同时会向Hadoop汇报执行进度

      /**
       * Create an FSDataOutputStream at the indicated Path with write-progress
       * reporting.
       * Files are overwritten by default.
       * @param f the file to create
       * @param progress to report progress
       * @throws IOException IO failure 
       */
      public FSDataOutputStream create(Path f, Progressable progress)
          throws IOException {
        return create(f, true,
                      getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
                          IO_FILE_BUFFER_SIZE_DEFAULT),
                      getDefaultReplication(f),
                      getDefaultBlockSize(f), progress);
      }

       其他的create的重载方法大概就是使用指定或默认的缓冲区大小、块的副本数、块大小作为参数来创建FSDataOutputSream,以及指定跨文件系统进行创建。这里就不一一列举。

       3.append的重载方法

        3.1根据默认的缓冲区大小打开文件,然后向文件末尾追加内容

      /**
       * Append to an existing file (optional operation).
       * Same as
       * {@code append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
       *     IO_FILE_BUFFER_SIZE_DEFAULT), null)}
       * @param f the existing file to be appended.
       * @throws IOException IO failure
       * @throws UnsupportedOperationException if the operation is unsupported
       *         (default).
       */
      public FSDataOutputStream append(Path f) throws IOException {
        return append(f, getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
            IO_FILE_BUFFER_SIZE_DEFAULT), null);
      }

         3.2根据用户指定的缓冲区大小来打开文件,然后向文件末尾追加内容

      /**
       * Append to an existing file (optional operation).
       * Same as append(f, bufferSize, null).
       * @param f the existing file to be appended.
       * @param bufferSize the size of the buffer to be used.
       * @throws IOException IO failure
       * @throws UnsupportedOperationException if the operation is unsupported
       *         (default).
       */
      public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
        return append(f, bufferSize, null);
      }

      4.mkdisr的重载方法

        4.1根据默认的文件系统权限来创建目录,默认为00777

      /**
       * Call {@link #mkdirs(Path, FsPermission)} with default permission.
       * @param f path
       * @return true if the directory was created
       * @throws IOException IO failure
       */
      public boolean mkdirs(Path f) throws IOException {
        return mkdirs(f, FsPermission.getDirDefault());
      }

        4.2跨文件系统来执行创建目录的操作。首先根据指定的路径创建一个目录,然后再将其权限设置为用户指定的权限

      /**
       * Create a directory with the provided permission.
       * The permission of the directory is set to be the provided permission as in
       * setPermission, not permission&~umask
       *
       * @see #create(FileSystem, Path, FsPermission)
       *
       * @param fs FileSystem handle
       * @param dir the name of the directory to be created
       * @param permission the permission of the directory
       * @return true if the directory creation succeeds; false otherwise
       * @throws IOException A problem creating the directories.
       */
      public static boolean mkdirs(FileSystem fs, Path dir, FsPermission permission)
          throws IOException {
        // create the directory using the default permission
        boolean result = fs.mkdirs(dir);
        // set its permission to be the supplied one
        fs.setPermission(dir, permission);
        return result;
      }

        5.listStatus的重载方法

          5.1使用指定的过滤器过滤指定path的文件,然后将剩余的文件的状态信息保存到用户给定的ArrayList集合中。

      /**
       * Filter files/directories in the given path using the user-supplied path
       * filter. Results are added to the given array <code>results</code>.
       * @throws FileNotFoundException when the path does not exist
       * @throws IOException see specific implementation
       */
      private void listStatus(ArrayList<FileStatus> results, Path f,
          PathFilter filter) throws FileNotFoundException, IOException {
        FileStatus listing[] = listStatus(f);
        Preconditions.checkNotNull(listing, "listStatus should not return NULL");
        for (int i = 0; i < listing.length; i++) {
          if (filter.accept(listing[i].getPath())) {
            results.add(listing[i]);
          }
        }
      }

          5.2和上面的区别是,保存状态信息的集合是内部新建的,不是用户指定的。

      public FileStatus[] listStatus(Path f, PathFilter filter)
                                       throws FileNotFoundException, IOException {
        ArrayList<FileStatus> results = new ArrayList<>();
        listStatus(results, f, filter);
        return results.toArray(new FileStatus[results.size()]);
      }

            5.3使用默认的过滤器来过滤指定path集合中的文件,然后将剩余的文件的状态信息保存到内部新建的列表中。

      /**
       * Filter files/directories in the given list of paths using default
       * path filter.
       * <p>
       * Does not guarantee to return the List of files/directories status in a
       * sorted order.
       *
       * @param files
       *          a list of paths
       * @return a list of statuses for the files under the given paths after
       *         applying the filter default Path filter
       * @throws FileNotFoundException when the path does not exist
       * @throws IOException see specific implementation
       */
      public FileStatus[] listStatus(Path[] files)
          throws FileNotFoundException, IOException {
        return listStatus(files, DEFAULT_FILTER);
      }

         5.4使用指定的过滤器来过滤指定path集合中的文件,然后将剩余的文件的状态信息保存到内部新建的列表中。

      public FileStatus[] listStatus(Path[] files, PathFilter filter)
          throws FileNotFoundException, IOException {
        ArrayList<FileStatus> results = new ArrayList<FileStatus>();
        for (int i = 0; i < files.length; i++) {
          listStatus(results, files[i], filter);
        }
        return results.toArray(new FileStatus[results.size()]);
      }

      6.copyFromLocalFile方法的重载

        6.1将本地磁盘上src指定路径的文件复制到dst指定的路径,不删除源文件

      /**
       * The src file is on the local disk.  Add it to filesystem at
       * the given dst name and the source is kept intact afterwards
       * @param src path
       * @param dst path
       * @throws IOException IO failure
       */
      public void copyFromLocalFile(Path src, Path dst)
        throws IOException {
        copyFromLocalFile(false, src, dst);
      }

        6.2根据用户指定的delSrc的值来决定删不删除源文件

      /**
       * The src file is on the local disk.  Add it to the filesystem at
       * the given dst name.
       * delSrc indicates if the source should be removed
       * @param delSrc whether to delete the src
       * @param src path
       * @param dst path
       */
      public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
        throws IOException {
        copyFromLocalFile(delSrc, true, src, dst);
      }

        6.3根据delSrc参数决定是否删除源文件,根据overwrite参数决定是否覆盖dst路径下已有的目标文件。

      /**
       * The src files are on the local disk.  Add it to the filesystem at
       * the given dst name.
       * delSrc indicates if the source should be removed
       * @param delSrc whether to delete the src
       * @param overwrite whether to overwrite an existing file
       * @param srcs array of paths which are source
       * @param dst path
       * @throws IOException IO failure
       */
      public void copyFromLocalFile(boolean delSrc, boolean overwrite,
                                    Path[] srcs, Path dst)
        throws IOException {
        Configuration conf = getConf();
        FileUtil.copy(getLocal(conf), srcs, this, dst, delSrc, overwrite, conf);
      }

      7.moveFromLocalFile的重载方法,通过调用copyToLocalFile方法来实现

      /**
       * Copy a file to the local filesystem, then delete it from the
       * remote filesystem (if successfully copied).
       * @param src path src file in the remote filesystem
       * @param dst path local destination
       * @throws IOException IO failure
       */
      public void moveToLocalFile(Path src, Path dst) throws IOException {
        copyToLocalFile(true, src, dst);
      }

      8.将远程文件系统中src指定的文件复制到本地dst指定的路径下,delSrc参数决定是否删除源文件

      /**
       * Copy it a file from a remote filesystem to the local one.
       * delSrc indicates if the src will be removed or not.
       * @param delSrc whether to delete the src
       * @param src path src file in the remote filesystem
       * @param dst path local destination
       * @throws IOException IO failure
       */
      public void copyToLocalFile(boolean delSrc, Path src, Path dst)
        throws IOException {
        copyToLocalFile(delSrc, src, dst, false);
      }
  • 相关阅读:
    转:Git: git stash 用法小结
    Checkbox: ListView 与CheckBox 触发事件冲突的问题
    android: getDimension, getDimensionPixelOffset 和getDimensionPixelSize 区别
    java: 保留两位小数4种方法
    java: 保留两位小数4种方法
    转:在eclipse中 使用7.0及以上手机进行测试时logcat不打印日志的解决办法
    转:Android文件存储路径getFilesDir()与getExternalFilesDir的区别
    Gradle-修改.gradle默认目录
    Windows: 打开关闭网络连接的方法
    dom4j: 生成XML时文本中回车换行无效
  • 原文地址:https://www.cnblogs.com/dj-blog/p/9178465.html
Copyright © 2011-2022 走看看