zoukankan      html  css  js  c++  java
  • hadoop之hdfs------------------FileSystem及其源码分析

    FileSystem及其源码分析

      FileSystem这个抽象类提供了丰富的方法用于对文件系统的操作,包括上传、下载、删除、创建等。这里多说的文件系统通常指的是HDFS(DistributedFileSystem),其实,hadoop处理支持分布式文件系统,还提供了对诸如本地文件系统(LocalFileSystem)、FTP文件系统(FTPFIle)的支持。

      在这里我们主要介绍一下DistributedFileSystem的创建过程。如下代码:

      主要包括两个阶段:

        1. 加载配置文件

        2. 初始化文件系统

    Configuration conf = new Configuration();//加载配置文件
    FileSystem fs = FileSystem.get(conf);//初始化文件系统

      首先来看一下配置文件加载阶段。

      这是Configuration类的静态代码块,默认加载core-default.xml和core-site.xml这两个配置文件。

    static{
        //print deprecation warning if hadoop-site.xml is found in classpath
        ClassLoader cL = Thread.currentThread().getContextClassLoader();
        if (cL == null) {
          cL = Configuration.class.getClassLoader();
        }
        if(cL.getResource("hadoop-site.xml")!=null) {//确保在类路径下不存在hadoop-site.xml(已过时)
          LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
              "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
              + "mapred-site.xml and hdfs-site.xml to override properties of " +
              "core-default.xml, mapred-default.xml and hdfs-default.xml " +
              "respectively");
        }
        addDefaultResource("core-default.xml");
        addDefaultResource("core-site.xml");
      }

      接下来进入到初始化文件系统阶段:

      FileSystem的get(Configuration conf)方法调用了它的另一个方法get(getDefaultUri(conf),conf),这个方法通过判断是否采用了缓存机制,如果采用了缓存机制,则从缓存中获取,如果没有采用缓存机制,则创建新的文件系统,默认开启缓存机制。

     public static FileSystem get(Configuration conf) throws IOException {
        return get(getDefaultUri(conf), conf);
      }
      public static URI getDefaultUri(Configuration conf) {
        return URI.create(fixName(conf.get(FS_DEFAULT_NAME_KEY, DEFAULT_FS)));//通过conf中的fs.defaultFS属性获得URI(hdfs://s101)
      }
    public static FileSystem get(URI uri, Configuration conf) throws IOException {
        String scheme = uri.getScheme();//hdfs
        String authority = uri.getAuthority();//s101
    
        if (scheme == null && authority == null) {     // use default FS :默认为本地文件系统 file:///
          return get(conf);
        }
    
      //省略部分代码
      //判断是否缓存FileSystem   String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);   //如果不采用缓存机制,每次都创建新的FileSystem if (conf.getBoolean(disableCacheName, false)) { return createFileSystem(uri, conf); }   //如果采用缓存机制,则从CACHE中获取 return CACHE.get(uri, conf);

      先让我们来看一下这个CACHE到底是个什么东西?

      CACHE是FileSystem的一个静态内部类,内部维护一个HashMap<Key,FileSystem>(FileSystem容器),键为Key类型,Key是CACHE的一个静态内部类,内部封装了Schema(协议,这里指hdfs)、Authority(权限主机,这里指s101),Vaule就是缓存的文件系统。

     static class Cache {
      //省略......
        private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();//FileSystem容器
        
        /** FileSystem.Cache.Key */
        static class Key {
          final String scheme;//hdfs
          final String authority;//s101
          final UserGroupInformation ugi;
          //省略...  
          }    
    }

      CACHE.get(uri,conf)方法用于获得具体的FileSystem

    FileSystem get(URI uri, Configuration conf) throws IOException{
          Key key = new Key(uri, conf);
          return getInternal(uri, conf, key);
        }

      调用getInterval(uri,conf.key)方法:该方法通过createFileSystem创建新的文件系统,并将其存入缓存容器map中。

    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
          FileSystem fs;
          synchronized (this) {
            fs = map.get(key);//由于此时为一次创建FileSystem,所以此时map为null
          }
          if (fs != null) {
            return fs;
          }
          //fs不为null,创建文件系统
          fs = createFileSystem(uri, conf);
        
          synchronized (this) { // refetch the lock again
            FileSystem oldfs = map.get(key);
            if (oldfs != null) { // a file system is created while lock is releasing
              fs.close(); // close the new file system
              return oldfs;  // return the old file system
            }
            
            // now insert the new file system into the map
            if (map.isEmpty()
                    && !ShutdownHookManager.get().isShutdownInProgress()) {
              ShutdownHookManager.get().addShutdownHook(clientFinalizer, SHUTDOWN_HOOK_PRIORITY);
            }
            fs.key = key;
            //将文件系统存入map容器
            map.put(key, fs);
            if (conf.getBoolean("fs.automatic.close", true)) {
              toAutoClose.add(key);
            }
            return fs;
          }
        }

     下面我们来看一下 createFileSystem(uri, conf)是如何创建FileSystem的:

    private static FileSystem createFileSystem(URI uri, Configuration conf
          ) throws IOException {
        //根据conf和Schema获取对应的FileSystemClass,这里指的是DistributedFileSystem.class
        Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
        //通过反射创建文件系统
        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
        //初始化文件系统
        fs.initialize(uri, conf);
        return fs;
      }

     简单了解一下getFileSystemClass的获取过程:加载

    public static Class<? extends FileSystem> getFileSystemClass(String scheme,
          Configuration conf) throws IOException {
        if (!FILE_SYSTEMS_LOADED) {
          loadFileSystems();//加载文件系统,加载了hadoop支持的9种文件系统,存放到了SERVICE_FILE_SYSTEMS=new HashMap<String,Class<? extends FileSystem>>
    }
        Class<? extends FileSystem> clazz = null;
        //省略
        if (clazz == null) {//根据schema获得对应的文件系统的clazz
          clazz = SERVICE_FILE_SYSTEMS.get(scheme);
        }
        if (clazz == null) {
          throw new IOException("No FileSystem for scheme: " + scheme);
        }
        return clazz;
      }

     再来看一下文件系统的initialize()方法做了些什么,最主要的就是创建了DFS客户端对象,是一个DFSClient,它负责与namenode进行远程通信,是一个绝对重要的家伙。

      public void initialize(URI uri, Configuration conf) throws IOException {
            super.initialize(uri, conf);
            this.setConf(conf);
            String host = uri.getHost();
            if (host == null) {
                throw new IOException("Incomplete HDFS URI, no host: " + uri);
            } else {
                this.homeDirPrefix = conf.get("dfs.user.home.dir.prefix", "/user");
                //创建DFS客户端(每个文件系统都持有一个dfs客户端对象)
                this.dfs = new DFSClient(uri, conf, this.statistics);
                this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
                //工作空间
                this.workingDir = this.getHomeDirectory();
            }

    到此为止,FileSystem的创建过程就完成了,下面做一下总结。

    FileSystem 的创建过程:

      1. 首先加载配置文件,主要是获得fs.defaultFS的属性值。

      2. 创建文件系统:

        首先从CACHE.map缓存中获得相应的文件系统。

        如果是第一次创建该文件系统,加载相应的文件系统的Class对象,通过反射创建文件系统对象,然后调用initialize()方法对初始化

        并存入CACHE.map中。

  • 相关阅读:
    4、2 核心组件
    promise
    Content-Type
    $routeProvider
    广告
    $apply() $digest()
    异常
    switch
    autoprefixer
    $resource
  • 原文地址:https://www.cnblogs.com/gdy1993/p/9379872.html
Copyright © 2011-2022 走看看