zoukankan      html  css  js  c++  java
  • FileSystem实例化过程

    HDFS案例代码

    Configuration configuration = new Configuration();
    FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration);
        
    InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));
    OutputStream out = new FileOutputStream(new File("log4j_download.properties"));
    IOUtils.copyBytes(in, out, 4096, true); //最后一个参数表示完成拷贝之后关闭输入/出流

    FileSystem.java

    static final Cache CACHE = new Cache();
    
    public static FileSystem get(URI uri, Configuration conf) throws IOException {
        String scheme = uri.getScheme();   //hdfs
        String authority = uri.getAuthority();  //hadoop000:8020
    
        return CACHE.get(uri, conf);
    }
    
    FileSystem get(URI uri, Configuration conf) throws IOException{
        Key key = new Key(uri, conf);
        return getInternal(uri, conf, key);
    }
    
    private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
        FileSystem fs;
        synchronized (this) {
            fs = map.get(key);
        }
        
        //根据URI取得一个FileSystem实例,如果允许缓存,会中从缓存中取出,否则将调用createFileSystem创建一个新实例
        if (fs != null) { 
            return fs;
        }
        
        fs = createFileSystem(uri, conf);
        synchronized (this) { 
            FileSystem oldfs = map.get(key);
            ... //放入到CACHE中秋
            return fs;
        }
    }
    
    private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
        Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // 返回的是:org.apache.hadoop.hdfs.DistributedFileSystem
        FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
        fs.initialize(uri, conf); //初始化DistributedFileSystem
        return fs;
    }
    
    public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {
        if (!FILE_SYSTEMS_LOADED) { //文件系统是否被加载过,刚开始时为false
            loadFileSystems();
        }
        Class<? extends FileSystem> clazz = null;
        if (conf != null) {
            clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,此时我们并没有在core-default.xml和core-site.xml中配置该属性
        }
        if (clazz == null) {
            clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem
        }
        if (clazz == null) {
            throw new IOException("No FileSystem for scheme: " + scheme);
        }
        return clazz;
    }
    
    
    private static void loadFileSystems() {
        synchronized (FileSystem.class) {
            if (!FILE_SYSTEMS_LOADED) {
                ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
                for (FileSystem fs : serviceLoader) {
                    SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
                }
                FILE_SYSTEMS_LOADED = true; //标识为已经从系统中加载过
            }
        }
    }

    loadFileSystems后SERVICE_FILE_SYSTEMS存在如下值:

    file=class org.apache.hadoop.fs.LocalFileSystem, 
    ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem, 
    hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, 
    hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem, 
    webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, 
    s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem, 
    viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, 
    swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, 
    har=class org.apache.hadoop.fs.HarFileSystem, 
    s3=class org.apache.hadoop.fs.s3.S3FileSystem, 
    hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem

    DistributedFileSystem.java

    DFSClient dfs; //重点属性:客户端与服务端交互操作需要先拿到DFSClient
    
    @Override
    public void initialize(URI uri, Configuration conf) throws IOException {
        super.initialize(uri, conf);
        setConf(conf);
    
        String host = uri.getHost();  //hadoop000
    
        this.dfs = new DFSClient(uri, conf, statistics);
        this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
        this.workingDir = getHomeDirectory();
    }

    DFSClient.java

    final ClientProtocol namenode; //重点属性:客户端与NameNode通信的PRC接口
    
    public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException {
        
        NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class);
        this.dtService = proxyInfo.getDelegationTokenService();
        this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB
    }

    NameNodeProxies.java

    public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
        Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface);
        return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);
    }
    
    public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
        UserGroupInformation ugi, boolean withRetries) throws IOException {
        Text dtService = SecurityUtil.buildTokenService(nnAddr);
    
        T proxy;
        if (xface == ClientProtocol.class) {
          proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);
        } ...
        return new ProxyAndInfo<T>(proxy, dtService);
    }
    
    private static ClientProtocol createNNProxyWithClientProtocol(
        InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException {
     
        //Client与NameNode的RPC交互接口
        final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
        ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
            ClientNamenodeProtocolPB.class, version, address, ugi, conf,
            NetUtils.getDefaultSocketFactory(conf),
            org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
                .getProxy();
    
        if (withRetries) { 
            //使用jdk的动态代理创建实例
            proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
              ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
                  ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy);
        }
        return new ClientNamenodeProtocolTranslatorPB(proxy);
    }

    RetryProxy.java

    public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
        return Proxy.newProxyInstance(
            proxyProvider.getInterface().getClassLoader(),
            new Class<?>[] { iface },
            new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
        );
    }

     获取FileSystem实例源码分析总结:

    1、FileSystem.get通过反射实例化了一个DistributedFileSystem;

    2、DistributedFileSystem中new DFSCilent()把他作为自己的成员变量;

    3、在DFSClient构造方法里面,调用了createProxy使用RPC机制得到了一个NameNode的代理对象,就可以和NameNode进行通信;

    4、整个流程:FileSystem.get()--> DistributedFileSystem.initialize() --> DFSClient(RPC.getProtocolProxy()) --> NameNode的代理。

  • 相关阅读:
    mysql 历史版本下载
    mysql 5.7 版本 You must reset your password using ALTER USER statement before executing this statement报错处理
    5.7 zip 版本的安装 以及遇到的坑
    mysql 5.6zip版本的卸载与5.7 zip 版本的安装
    mysql数据库的备份与还原
    本地Navicat连接docker里的mysql
    docker修改数据库密码
    docker 在push镜像到本地registry出现的500 Internal Server Error
    linux 没有界面内容显示不全解决办法
    json与map互相转换
  • 原文地址:https://www.cnblogs.com/luogankun/p/4131684.html
Copyright © 2011-2022 走看看