zoukankan      html  css  js  c++  java
  • FileSystem实例化过程

    HDFS案例代码

    Configuration configuration = new Configuration();
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration);
        
    InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));
    OutputStream out = new FileOutputStream(new File("log4j_download.properties"));
    IOUtils.copyBytes(in, out, 4096, true); //最后一个参数表示完成拷贝之后关闭输入/出流

    FileSystem.java

    static final Cache CACHE = new Cache();
    
    public static FileSystem get(URI uri, Configuration conf) throws IOException {
        String scheme = uri.getScheme();   //hdfs
        String authority = uri.getAuthority();  //hadoop000:8020
    
        return CACHE.get(uri, conf);
    }
    
    FileSystem get(URI uri, Configuration conf) throws IOException{
        Key key = new Key(uri, conf);
        return getInternal(uri, conf, key);
    }
    
    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
        FileSystem fs;
        synchronized (this) {
            fs = map.get(key);
        }
        
        //根据URI取得一个FileSystem实例,如果允许缓存,会中从缓存中取出,否则将调用createFileSystem创建一个新实例
        if (fs != null) { 
            return fs;
        }
        
        fs = createFileSystem(uri, conf);
        synchronized (this) { 
            FileSystem oldfs = map.get(key);
            ... //放入到CACHE中秋
            return fs;
        }
    }
    
    private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
        Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // 返回的是:org.apache.hadoop.hdfs.DistributedFileSystem
        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
        fs.initialize(uri, conf); //初始化DistributedFileSystem
        return fs;
    }
    
    public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {
        if (!FILE_SYSTEMS_LOADED) { //文件系统是否被加载过,刚开始时为false
            loadFileSystems();
        }
        Class<? extends FileSystem> clazz = null;
        if (conf != null) {
            clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,此时我们并没有在core-default.xml和core-site.xml中配置该属性
        }
        if (clazz == null) {
            clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem
        }
        if (clazz == null) {
            throw new IOException("No FileSystem for scheme: " + scheme);
        }
        return clazz;
    }
    
    
    private static void loadFileSystems() {
        synchronized (FileSystem.class) {
            if (!FILE_SYSTEMS_LOADED) {
                ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
                for (FileSystem fs : serviceLoader) {
                    SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
                }
                FILE_SYSTEMS_LOADED = true; //标识为已经从系统中加载过
            }
        }
    }

    loadFileSystems后SERVICE_FILE_SYSTEMS存在如下值:

    file=class org.apache.hadoop.fs.LocalFileSystem, 
    ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem, 
    hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, 
    hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem, 
    webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, 
    s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem, 
    viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, 
    swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, 
    har=class org.apache.hadoop.fs.HarFileSystem, 
    s3=class org.apache.hadoop.fs.s3.S3FileSystem, 
    hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem

    DistributedFileSystem.java

    DFSClient dfs; //重点属性:客户端与服务端交互操作需要先拿到DFSClient
    
    @Override
    public void initialize(URI uri, Configuration conf) throws IOException {
        super.initialize(uri, conf);
        setConf(conf);
    
        String host = uri.getHost();  //hadoop000
    
        this.dfs = new DFSClient(uri, conf, statistics);
        this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
        this.workingDir = getHomeDirectory();
    }

    DFSClient.java

    final ClientProtocol namenode; //重点属性:客户端与NameNode通信的PRC接口
    
    public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException {
        
        NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class);
        this.dtService = proxyInfo.getDelegationTokenService();
        this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB
    }

    NameNodeProxies.java

    public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
        Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface);
        return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);
    }
    
    public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
        UserGroupInformation ugi, boolean withRetries) throws IOException {
        Text dtService = SecurityUtil.buildTokenService(nnAddr);
    
        T proxy;
        if (xface == ClientProtocol.class) {
          proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);
        } ...
        return new ProxyAndInfo<T>(proxy, dtService);
    }
    
    private static ClientProtocol createNNProxyWithClientProtocol(
        InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException {
     
        //Client与NameNode的RPC交互接口
        final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
        ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
            ClientNamenodeProtocolPB.class, version, address, ugi, conf,
            NetUtils.getDefaultSocketFactory(conf),
            org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
                .getProxy();
    
        if (withRetries) { 
            //使用jdk的动态代理创建实例
            proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
              ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
                  ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy);
        }
        return new ClientNamenodeProtocolTranslatorPB(proxy);
    }

    RetryProxy.java

    public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
        return Proxy.newProxyInstance(
            proxyProvider.getInterface().getClassLoader(),
            new Class<?>[] { iface },
            new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
        );
    }

     获取FileSystem实例源码分析总结:

    1、FileSystem.get通过反射实例化了一个DistributedFileSystem;

    2、DistributedFileSystem中new DFSCilent()把他作为自己的成员变量;

    3、在DFSClient构造方法里面,调用了createProxy使用RPC机制得到了一个NameNode的代理对象,就可以和NameNode进行通信;

    4、整个流程:FileSystem.get()--> DistributedFileSystem.initialize() --> DFSClient(RPC.getProtocolProxy()) --> NameNode的代理。

  • 相关阅读:
    VirtualBox Linux服务vboxservicetemplate
    oracle 11g常用命令
    haproxy dataplaneapi
    使用jproflier 分析dremio
    cube.js 支持oceanbase 的mysql driver
    fastdfs 集群异常修复实践
    使用jHiccup 分析java 应用性能
    dremio mysql arp 扩展
    cube.js graphql 支持
    apache kyuubi 参考架构集成
  • 原文地址:https://www.cnblogs.com/luogankun/p/4131684.html
Copyright © 2011-2022 走看看