- FileSystem对象为所有HDFS操作入口,因此首先会通过其get静态方法获取到fs,所有get方法最后都会调用
- 内部静态类Cache的get方法实际调用了getInternal(URI uri, Configuration conf, Key key)
- 创建FileSystem的方法为FileSystem私有静态方法,仅提供给自己活内部内如Cache使用
- 初始化fs中主要是创建DFSClient对象
- 以下就是构造DFSClient的核心内容,也应当是源码查看重心部分
- 创建远程对象的代理
- 最总得到的fs是远程对象代理的多层封装
get(URI uri, Configuration conf)完成基本基本配置校验后从CACHE获取FileSystem
public static FileSystem get(URI uri, Configuration conf) throws IOException{
String scheme = uri.getScheme();
String authority = uri.getAuthority();
...
return CACHE.get(uri,conf);
}
static class Cache {
private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
...
FileSystem get(URI uri, Configuration conf) throws IOException{
Key key = new Key(uri, conf);
return getInternal(uri, conf, key);
}
private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
FileSystem fs;
//从map里获取到key对应的fs为空时,再根据配置去创建新的fs并放入
synchronized (this) {
fs = map.get(key);
}
if (fs != null) {
return fs;
}
fs = createFileSystem(uri, conf);
synchronized (this) { // refetch the lock again
...
fs.key = key;
map.put(key, fs);
...
return fs;
}
}
...
}
private static FileSystem createFileSystem(URI uri, Configuration conf
) throws IOException {
Class<?> clazz = getFileSystemClass(uri.getScheme(), conf);
FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
fs.initialize(uri, conf);
return fs;
}
//FILE_SYSTEMS_LOADED默认为false,因此获取文件系统类型时需要先加载文件系统,再从缓存获取
public static Class<? extends FileSystem> getFileSystemClass(String scheme,
Configuration conf) throws IOException {
if (!FILE_SYSTEMS_LOADED) {
loadFileSystems();
}
Class<? extends FileSystem> clazz = null;
if (conf != null) {
clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null);
}
if (clazz == null) {
clazz = SERVICE_FILE_SYSTEMS.get(scheme);
}
if (clazz == null) {
throw new IOException("No FileSystem for scheme: " + scheme);
}
return clazz;
}
//这里使用到了ServiceLoader来加载所有的FileSystem子类
private static void loadFileSystems() {
synchronized (FileSystem.class) {
if (!FILE_SYSTEMS_LOADED) {
ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
Iterator<FileSystem> it = serviceLoader.iterator();
while (it.hasNext()) {
FileSystem fs = null;
try {
fs = it.next();
try {
SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
} catch (Exception e) {
LOG.warn("Cannot load: " + fs + " from " +
ClassUtil.findContainingJar(fs.getClass()), e);
}
} catch (ServiceConfigurationError ee) {
LOG.warn("Cannot load filesystem", ee);
}
}
FILE_SYSTEMS_LOADED = true;
}
}
}
ServiceLoader加载细节可查看 https://www.cnblogs.com/aspirant/p/10616704.html
最终在SERVICE_FILE_SYSTEMS内缓存着所有加载的具体文件系统类型有9个
viewfs swebhdfs file ftp har hsftp hdfs webhdfs hftp
以下为部分hadoop支持的部分文件系统简介
文件系统 |
URI方案 |
Java实现 |
定义 |
|
|
(org.apache.hadoop) |
|
Local |
file |
fs.LocalFileSystem |
支持有客户端校验和本地文件系统。带有校验和的本地系统文件在fs.RawLocalFileSystem中实现。 |
HDFS |
hdfs |
hdfs.DistributionFileSystem |
Hadoop的分布式文件系统。 |
HFTP |
hftp |
hdfs.HftpFileSystem |
支持通过HTTP方式以只读的方式访问HDFS,distcp经常用在不同的HDFS集群间复制数据。 |
HSFTP |
hsftp |
hdfs.HsftpFileSystem |
支持通过HTTPS方式以只读的方式访问HDFS。 |
FTP |
ftp |
fs.ftp.FtpFileSystem |
由FTP服务器支持的文件系统。 |
ViewFs |
viewfs |
fs.viewfs.ViewFileSystem |
用于联邦模,支持多个hadoop集群的namespace管理 |
SwebHdfs |
swebhdfs |
hdfs.web.SWebHdfsFileSystem |
基于https,用于web UI访问相关的文件系统 |
WebHdfs |
webhdfs |
hdfs.web.WebHdfsFileSystem |
基于http,用于web UI访问相关的文件系统 |
HAR |
har |
fs.HarFileSystem |
构建在Hadoop文件系统之上,对文件进行归档。Hadoop归档文件主要用来减少NameNode的内存使用。 |
KFS |
kfs |
fs.kfs.KosmosFileSystem |
Cloudstore(其前身是Kosmos文件系统)文件系统是类似于HDFS和Google的GFS文件系统,使用C++编写。 |
文件系统加载完后即可获取对应的class类型名字,即org.apache.hadoop.hdfs.DistributedFileSystem
根据该名字反射出实例对象FileSystem fs,作为HDFS的操作入口,还需要完成关键的初始化才能使用
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf); //设置指标统计观察者
setConf(conf);
String host = uri.getHost();
if (host == null) {
throw new IOException("Incomplete HDFS URI, no host: "+ uri);
}
homeDirPrefix = conf.get(
DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_KEY,
DFSConfigKeys.DFS_USER_HOME_DIR_PREFIX_DEFAULT);
this.dfs = new DFSClient(uri, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}
public DFSClient(URI nameNodeUri, Configuration conf,
FileSystem.Statistics stats)
throws IOException {
this(nameNodeUri, null, conf, stats);
}
public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode,
Configuration conf, FileSystem.Statistics stats)
throws IOException {
...
//属性样例authority = 192.8.0.14:9000 clientName = DFSClient_NONMAPREDUCE_303849151_1
this.authority = nameNodeUri == null? "null": nameNodeUri.getAuthority();
this.clientName = "DFSClient_" + dfsClientConf.taskId + "_" +
DFSUtil.getRandom().nextInt() + "_" + Thread.currentThread().getId();
...
if (proxyInfo != null) {
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
} else if (rpcNamenode != null) {
// This case is used for testing.
Preconditions.checkArgument(nameNodeUri == null);
this.namenode = rpcNamenode;
dtService = null;
} else {
Preconditions.checkArgument(nameNodeUri != null,
"null URI");
//此处创建远程对象的代理,调用本地代理对象的方法,即可让RPC框调用服务端真是对象方法
proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,
ClientProtocol.class, nnFallbackToSimpleAuth);
this.dtService = proxyInfo.getDelegationTokenService();
this.namenode = proxyInfo.getProxy();
}
...
}
public static <T> ProxyAndInfo<T> createProxy(Configuration conf,
URI nameNodeUri, Class<T> xface, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
//根据配置反射出NN的自动切换器,用于HA下从故障NN切换到正常NN,实现主备倒换
AbstractNNFailoverProxyProvider<T> failoverProxyProvider =
createFailoverProxyProvider(conf, nameNodeUri, xface, true,
fallbackToSimpleAuth);
if (failoverProxyProvider == null) {
// Non-HA case 本次先看非HA场景,后面HA同样以Debug方式跟踪
return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,
UserGroupInformation.getCurrentUser(), true, fallbackToSimpleAuth);
} else {
// HA case
...
return new ProxyAndInfo<T>(proxy, dtService, nnAddress);
}
}
public static <T> ProxyAndInfo<T> createNonHAProxy(
Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
UserGroupInformation ugi, boolean withRetries,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
Text dtService = SecurityUtil.buildTokenService(nnAddr);
T proxy;
if (xface == ClientProtocol.class) {
proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,
withRetries, fallbackToSimpleAuth);
} else if (...) {
...
} else {
...
}
return new ProxyAndInfo<T>(proxy, dtService, nnAddr);
}
private static ClientProtocol createNNProxyWithClientProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
//Hadoop-2.7.6支持两种RPC Engine:WritableRpcEngine和ProtobufRpcEngine
RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine.class);
final RetryPolicy defaultPolicy =
RetryUtils.getDefaultRetryPolicy(
conf,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
SafeModeException.class);
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
//使用RPC框架获取远程对象的代理
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
fallbackToSimpleAuth).getProxy();
//最后均返回一个proxy再次封装的PB至上层
if (withRetries) { // create the proxy with retries
Map<String, RetryPolicy> methodNameToPolicyMap
= new HashMap<String, RetryPolicy>();
ClientProtocol translatorProxy =
new ClientNamenodeProtocolTranslatorPB(proxy);
return (ClientProtocol) RetryProxy.create(
ClientProtocol.class,
new DefaultFailoverProxyProvider<ClientProtocol>(
ClientProtocol.class, translatorProxy),
methodNameToPolicyMap,
defaultPolicy);
} else {
return new ClientNamenodeProtocolTranslatorPB(proxy);
}
}
DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_303849151_1, ugi=tyxuan (auth:SIMPLE)]]