zoukankan      html  css  js  c++  java
  • hadoop-2.7.6源码学习之 -- HDFS_Client端FileSystem获取

       

      1. FileSystem对象为所有HDFS操作入口,因此首先会通过其get静态方法获取到fs,所有get方法最后都会调用

      get(URI uri, Configuration conf)完成基本基本配置校验后从CACHE获取FileSystem

       

      public static FileSystem get(URI uri, Configuration conf) throws IOException{

      String scheme = uri.getScheme();

      String authority = uri.getAuthority();

      ...

      return CACHE.get(uri,conf);

      }

       

      1. 内部静态类Cacheget方法实际调用了getInternal(URI uri, Configuration conf, Key key)

      static class Cache {

          private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();

      ...

          FileSystem get(URI uri, Configuration conf) throws IOException{

            Key key = new Key(uri, conf);

            return getInternal(uri, conf, key);

          }

       

          private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{

            FileSystem fs;

       

       //map里获取到key对应的fs为空时,再根据配置去创建新的fs并放入

            synchronized (this) {

              fs = map.get(key);

            }

            if (fs != null) {

              return fs;

            }

       

            fs = createFileSystem(uri, conf);

            synchronized (this) { // refetch the lock again

             ...

              fs.key = key;

              map.put(key, fs);

      ...

              return fs;

            }

          }

      ...

      }

       

      1. 创建FileSystem的方法为FileSystem私有静态方法,仅提供给自己活内部内如Cache使用

      private static FileSystem createFileSystem(URI uri, Configuration conf

          ) throws IOException {

        Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);

        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);

        fs.initialize(uri, conf);

        return fs;

      }

       

       //FILE_SYSTEMS_LOADED默认为false,因此获取文件系统类型时需要先加载文件系统,再从缓存获取

      public static Class<? extends FileSystem> getFileSystemClass(String scheme,

          Configuration conf) throws IOException {

        if (!FILE_SYSTEMS_LOADED) {

          loadFileSystems();

        }

        Class<? extends FileSystem> clazz = null;

        if (conf != null) {

          clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);

        }

        if (clazz == null) {

          clazz = SERVICE_FILE_SYSTEMS.get(scheme);

        }

        if (clazz == null) {

          throw new IOException("No FileSystem for scheme: " + scheme);

        }

        return clazz;

      }

       

      //这里使用到了ServiceLoader来加载所有的FileSystem子类

      private static void loadFileSystems() {

        synchronized (FileSystem.class) {

          if (!FILE_SYSTEMS_LOADED) {

            ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);

            Iterator<FileSystem> it = serviceLoader.iterator();

            while (it.hasNext()) {

              FileSystem fs = null;

              try {

                fs = it.next();

                try {

                  SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());

                } catch (Exception e) {

                  LOG.warn("Cannot load: " + fs + " from " +

                      ClassUtil.findContainingJar(fs.getClass()), e);

                }

              } catch (ServiceConfigurationError ee) {

                LOG.warn("Cannot load filesystem", ee);

              }

            }

            FILE_SYSTEMS_LOADED = true;

          }

        }

      }

       

       

      ServiceLoader加载细节可查看 https://www.cnblogs.com/aspirant/p/10616704.html

      最终在SERVICE_FILE_SYSTEMS内缓存着所有加载的具体文件系统类型有9

      viewfs  swebhdfs  file  ftp  har  hsftp  hdfs  webhdfs  hftp

      以下为部分hadoop支持的部分文件系统简介

      文件系统

      URI方案

      Java实现

      定义

       

       

      org.apache.hadoop

       

      Local

      file

      fs.LocalFileSystem

      支持有客户端校验和本地文件系统。带有校验和的本地系统文件在fs.RawLocalFileSystem中实现。

      HDFS

      hdfs

      hdfs.DistributionFileSystem

      Hadoop的分布式文件系统。

      HFTP

      hftp

      hdfs.HftpFileSystem

      支持通过HTTP方式以只读的方式访问HDFSdistcp经常用在不同的HDFS集群间复制数据。

      HSFTP

      hsftp

      hdfs.HsftpFileSystem

      支持通过HTTPS方式以只读的方式访问HDFS

      FTP

      ftp

      fs.ftp.FtpFileSystem

      FTP服务器支持的文件系统。

      ViewFs

      viewfs

      fs.viewfs.ViewFileSystem

      用于联邦模,支持多个hadoop集群的namespace管理

      SwebHdfs

      swebhdfs

      hdfs.web.SWebHdfsFileSystem

      基于https,用于web UI访问相关的文件系统

      WebHdfs

      webhdfs

      hdfs.web.WebHdfsFileSystem

      基于http,用于web UI访问相关的文件系统

      HAR

      har

      fs.HarFileSystem

      构建在Hadoop文件系统之上,对文件进行归档。Hadoop归档文件主要用来减少NameNode的内存使用。

      KFS

      kfs

      fs.kfs.KosmosFileSystem

      Cloudstore(其前身是Kosmos文件系统)文件系统是类似于HDFSGoogleGFS文件系统,使用C++编写。

       

      文件系统加载完后即可获取对应的class类型名字,即org.apache.hadoop.hdfs.DistributedFileSystem

      根据该名字反射出实例对象FileSystem fs,作为HDFS的操作入口,还需要完成关键的初始化才能使用

       

      1. 初始化fs中主要是创建DFSClient对象

      public void initialize(URI uri, Configuration conf) throws IOException {

        super.initialize(uri, conf);  //设置指标统计观察者

        setConf(conf);

       

        String host = uri.getHost();

        if (host == null) {

          throw new IOException("Incomplete HDFS URI, no host: "+ uri);

        }

        homeDirPrefix = conf.get(

            DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,

            DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);

       

        this.dfs = new DFSClient(uri, conf, statistics);

        this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());

        this.workingDir = getHomeDirectory();

      }

       

      public DFSClient(URI nameNodeUri, Configuration conf,

                       FileSystem.Statistics stats)

        throws IOException {

        this(nameNodeUri, null, conf, stats);

      }

       

      1. 以下就是构造DFSClient的核心内容,也应当是源码查看重心部分

       

      public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,

          Configuration conf, FileSystem.Statistics stats)

        throws IOException {

        ... 

          //属性样例authority = 192.8.0.14:9000  clientName = DFSClient_NONMAPREDUCE_303849151_1

        this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();

        this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +

            DFSUtil.getRandom().nextInt()  + "_" + Thread.currentThread().getId();

        ... 

        if (proxyInfo != null) {

          this.dtService = proxyInfo.getDelegationTokenService();

          this.namenode = proxyInfo.getProxy();

        } else if (rpcNamenode != null) {

          // This case is used for testing.

          Preconditions.checkArgument(nameNodeUri == null);

          this.namenode = rpcNamenode;

          dtService = null;

        } else {

          Preconditions.checkArgument(nameNodeUri != null,

              "null URI");

          //此处创建远程对象的代理,调用本地代理对象的方法,即可让RPC框调用服务端真是对象方法

          proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,

              ClientProtocol.class, nnFallbackToSimpleAuth);

          this.dtService = proxyInfo.getDelegationTokenService();

          this.namenode = proxyInfo.getProxy();

        }

       

        ...

      }

       

      1. 创建远程对象的代理

      public static <T> ProxyAndInfo<T> createProxy(Configuration conf,

          URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)

          throws IOException {

       

        //根据配置反射出NN的自动切换器,用于HA下从故障NN切换到正常NN,实现主备倒换

        AbstractNNFailoverProxyProvider<T> failoverProxyProvider =

            createFailoverProxyProvider(conf, nameNodeUri, xface, true,

              fallbackToSimpleAuth);

       

        if (failoverProxyProvider == null) {

          // Non-HA case  本次先看非HA场景,后面HA同样以Debug方式跟踪

          return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,

              UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);

        } else {

          // HA case

          ...

          return new ProxyAndInfo<T>(proxy, dtService, nnAddress);

        }

      }

       

      public static <T> ProxyAndInfo<T> createNonHAProxy(

          Configuration conf, InetSocketAddress nnAddr, Class<T> xface,

          UserGroupInformation ugi, boolean withRetries,

          AtomicBoolean fallbackToSimpleAuth) throws IOException {

        Text dtService = SecurityUtil.buildTokenService(nnAddr);

       

        T proxy;

        if (xface == ClientProtocol.class) {

          proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,

              withRetries, fallbackToSimpleAuth);

        } else if (...) {

          ...

        } else {

          ...

        }

       

        return new ProxyAndInfo<T>(proxy, dtService, nnAddr);

      }

       

      private static ClientProtocol createNNProxyWithClientProtocol(

          InetSocketAddress address, Configuration conf, UserGroupInformation ugi,

          boolean withRetries, AtomicBoolean fallbackToSimpleAuth)

          throws IOException {

       

        //Hadoop-2.7.6支持两种RPC EngineWritableRpcEngineProtobufRpcEngine

        RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);

       

        final RetryPolicy defaultPolicy =

            RetryUtils.getDefaultRetryPolicy(

                conf,

                DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,

                DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,

                DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,

                DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,

                SafeModeException.class);

       

        final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);

       

        //使用RPC框架获取远程对象的代理

        ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(

            ClientNamenodeProtocolPB.class, version, address, ugi, conf,

            NetUtils.getDefaultSocketFactory(conf),

            org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,

            fallbackToSimpleAuth).getProxy();

       

          //最后均返回一个proxy再次封装的PB至上层

        if (withRetries) { // create the proxy with retries

       

          Map<String, RetryPolicy> methodNameToPolicyMap

                     = new HashMap<String, RetryPolicy>();

       

          ClientProtocol translatorProxy =

            new ClientNamenodeProtocolTranslatorPB(proxy);

          return (ClientProtocol) RetryProxy.create(

              ClientProtocol.class,

              new DefaultFailoverProxyProvider<ClientProtocol>(

                  ClientProtocol.class, translatorProxy),

              methodNameToPolicyMap,

              defaultPolicy);

        } else {

          return new ClientNamenodeProtocolTranslatorPB(proxy);

        }

      }

       

      1. 最总得到的fs是远程对象代理的多层封装

      DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_303849151_1, ugi=tyxuan (auth:SIMPLE)]]


    pasting
  • 相关阅读:
    【BUG】java.sql.SQLException: The server time zone value 'Öйú±ê׼ʱ¼ä' is unrecognized or represents more than one time zone
    IntelliJ IDEA控制台输出中文乱码问题
    CMD命令
    MongoDB学习笔记
    MyBatis生成序列ID
    MongoDB配置问题
    正确处理下载文件时HTTP头的编码问题(Content-Disposition)
    SpringJPA主键生成采用自定义ID,自定义ID采用年月日时间格式
    Java根据经纬度算出附近正方形的四个角的经纬度
    gradle
  • 原文地址:https://www.cnblogs.com/tyxuanCX/p/11321112.html
Copyright © 2011-2022 走看看