写在前面:关于HDFS的方方面面在网上可以看到很多,因为hadoop算是比较新的技术,所以网上的很多hadoop作者都是在计算机行业叱咤多年的老手,对于hadoop的分析也相对高神,所以最开始时候对于我这个小清新来说简直是如天书一般(尤其是在没有详细了解java语言和linux系统的前提下),一个简单的操作可能都对我来说是云里雾里。因此在自己慢慢累积后,想从自己理解的角度来分析下hdfs,当然也是在学习前人的基础上自己的总结。
HDFS的基础知识:
1、HDFS通过流水线形式进行副本文件的复制,当某个客户端向HDFS文件写数据的时候,一开始是写入本地临时文件,假如该文件的Replication因子设置为3,那么客户端会从Namenode获取一个DataNode列表来存储副本。然后客户端开始向第一个Datanode传输数据,第一个Datanode一小部分一小部分(4K)来接收数据,将每个部分写入本地仓库,并同时传输该部分至第二个Datanode节点,第二个也这样传输至第三个Datanode节点。
2、当默认的数据块存储是64MB,但是一般可以通过设置为128MB,这个根据相应的实际情况进行设置,默认的路径在hdfs文件夹中的hdfs-default.xml中的dfs.block.size下
3、文件删除后其实并未从HDFS中删除掉,HDFS只是将这个文件重新命名并保存在/trash目录,同时有个默认的自动删除时间(6h)
4、NameNode通过DataNode传递的心跳包来分析DataNode的情况,并判断是否重新调整namespace中的信息,心跳包其实就是在一定时间内从DataNode返回的各种状态信息。
HDFS的源码分析:
首先看下Hadoop源码解压目录结构
然后说明下hadoop源代码的顶级包的结构、功能与位置(以上图的路径为基础)
1、org.apache.hadoop.tools 提供一些命令行工具 hadoop-0.20.2/src/tools/org/apache/hadoop/tools
2、org.apache.hadoop.record 根据数据描述语言自动生成其编解码函数 hadoop-0.20.2/src/core/org/apache/hadoop/record
3、org.apache.hadoop.http 用户可通过浏览器查看文件系统的状态信息 hadoop-0.20.2/src/core/org/apache/hadoop/http
4、org.apache.hadoop.net 封装DNS,Socket等网络功能 hadoop-0.20.2/src/core/org/apache/hadoop/net
5、org.apache.hadoop.fs(重要) 文件系统的抽象,为支持多种文件系统实现的统一访问接口 hadoop-0.20.2/src/core/org/apache/hadoop/fs
6、org.apache.hadoop.hdfs(重要) Hadoop分布式文件系统的实现 hadoop-0.20.2/src/hdfs/org/apache/hadoop/hdfs
7、org.apache.hadoop.ipc(重要) 协议接口 hadoop-0.20.2/src/core/org/apache/hadoop/ipc
8、org.apache.hadoop.mapreduce(重要) hadoop MR计算框架的实现 hadoop-0.20.2/src/mapred/org/apache/hadoop/mapreduce
9、org.apache.hadoop.io(重要) 数据的编/解码 hadoop-0.20.2/src/core/org/apache/hadoop/io
10、org.apache.hadoop.filecache 提供HDFS文件的本地缓存,用于加快MapReduce的数据访问速度 hadoop-0.20.2/src/core/org/apache/hadoop/filecache
11、org.apache.hadoop.log 提供HTTP访问日志的Servlet hadoop-0.20.2/src/core/org/apache/hadoop/log
12、org.apache.hadoop.metries 系统统计数据的收集 hadoop-0.20.2/src/core/org/apache/hadoop/metries
13、org.apache.hadoop.security 用户和用户组信息维护 hadoop-0.20.2/src/core/org/apache/hadoop/security
14、org.apache.hadoop.util 工具类 hadoop-0.20.2/src/core/org/apache/hadoop/util
以上14个包的依赖关系:
源码包 依赖包
tools mapreduce,fs,hdfs,ipc,io,security,conf,util
fs hdfs,ipc,io,http,net,metrics,security,conf,util
mapreduce filecache,fs,hdfs,ipc,io,net,metrics,security,conf,util
io ipc,fs,conf,util
record io
filecache fs,conf,util
http log,conf,util
log util
metrics util
net ipc,fs,conf,util
security io,conf,util
util mapred,fs,io,conf
HDFS主要作用是用于存储和读取数据,那么这两个过程中必然涉及到数据的读/写操作,因此分析HDFS的机制从读写过程来实现更好。
1、任何读写过程首先需要一定的通讯机制,尤其是在HDFS的分布式这样架构下。其中有几个协议接口很重要,其继承关系如下:
<1> VersionedProtocol接口位于org.apache.hadoop.ipc包中,是Hadoop的最顶层协议接口的抽象,是使用Hadoop RPC机制的所有协议的超类。
<2> ClientProtocol接口位于org.apache.hadoop.hdfs.protocol包中,是用户进程(包括客户端进程与Datanode进程)与Namenode进程之间进行通讯所使用的协议。例如:(1)客户端进程需要向Datanode数据节点复制数据块,需要与Namenode进程通信,获取Datanode节点列表
(2)Datanode进程向Namenode进程发送心跳状态报告和块状态报告需要与Namenode进程交互
<3> NamenodeProtocol接口位于org.apache.hadoop.server.protocol包中,定义了Secondary Namenode与Namenode进行通信所需要的操作。
<4> ClientDatanodeProtocol接口定义了数据恢复的方法,当客户端进程需要与Datanode进程进行通信时,需要给予该协议。
<5> DatanodeProtocol接口位于org.apache.hadoop.hdfs.server.protocol包中,当Datanode进程与Namenode进程进行通信时需要基于此协议,比如发送心跳报告和块状态报告。而一般来说,Namenode不直接与Datanode进行RPC调用,如果一个Namenode需要与Datanode通信,唯一的方式就是通过调用该协议接口定义的方法
<6> InterDatanodeProtocol接口位于org.apache.hadoop.hdfs.server.protocol包中,该协议接口用于Datanode进程之间进行通信,比如客户端进程启动复制数据块,此时可能需要在Datanode节点之间进行块副本的流水线复制操作。
了解相应协议接口的使用,则开始从读写操作来详细分析HDFS的操作机制。
读操作:
1、通过DistributedFileSystem中的Open方法打开文件,返回的是FsDataInputStream对象,对象的实例化时调用的DFSClient中的Open方法进行具体文件的打开
2、通过DFSClient的内部类DFSInputStream的OpenInfo方法中的getLocatedBlocks得到相应数据块在Datanode的位置
3、通过FSDataInputStream类的Seek和getPos方法获取读取请求
4、然后再利用FSDataInputStream的read方法进行数据的读取,并将Datanode相应数据块读取值Client中。
5、read数据块的过程中是根据DFSInputStream与Datanode新建连接的顺序读取的,并按距离由近及远进行数据块的读取
因此在读的过程要设置dfs.read.prefetch.size数据块的大小(在hfds-site.xml设置)
写操作:
1、通过DistributedFileSystem类中的create创建文件,返回的是FsDataOutputStream对象,对象的实例化时调用的DFSClient中的Create方法进行具体文件的打开
2、通过DFSClient类的内部类DFSOutputStream的构造函数中的namenode.create函数来创建Namenode中的所存储的数据位置相应信息
3、通过FsDataOutputStream类中的write方法和PositionCache类来进行写操作(即是说明相应写数据的位置)
4、通过DFSOutputStream构造函数中的streamer.start()将数据包写入HDFS中,而写的过程中不断的从Packet队列取出待发送的Packet包给Datanode
因此可以通过dfs.block.size写入的块大小(默认64MB),dfs.write.packet.size(通过DFSOutputStream类中内部类设置每次写packet包时的大小),io.bytes.per.checksum校验数据块的大小,dfs.replication.num复制块的个数。
总结:读写过程是利用Java的反射原理返回一个文件系统的实例,实例化相应的文件系统,大致代码如下

1 private static FileSytem createFileSytem(URI uri,Configuration conf) throws IOException{ 2 Class<?> class = conf.getClass("fs." + uri.getScheme() + ".impl",null); 3 if(class == null){ 4 throw new IOException("No FileSystem for scheme:" + uri.getScheme()); 5 } 6 FileSystem fs = (FileSytem)RelectionUtils.newInstance(class,conf); 7 fs.initialize(uri,conf); 8 return fs; 9 }
只要实现了org.apache.hadoop.fs.FileSytem接口,就可以增加一种hadoop能够访问的文件系统。而本文中的DistributedFileSytem正是继承自FileSystem
所以由以上可见,HDFS中的关键类有FileSystem,DistributedFileSytem,DFSClient,FsDataOutputStream,DFSOutputStream,FsDataInputStream,DFSInputStream,Configuration这几个类。
最后再贴出HDFS编程的大致模板结构:

//1、HDFS文件的创建,将本地文件系统的文件复制到HDFS上 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(srcFilePath); path dstPath = new Path(dstFilePath); fs.copyFromLocalFile(srcFilePath,dstFilePath); //2、在HDFS上创建一个文件 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path srcPath = new Path(hdfsFileName); FSDataOutputStream outputStream = fs.create(path); outputStream.write(buff,0,buff.length); //3、删除HDFS上的一个文件 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(hdfsFileName); boolean isDeleted = hs.deleteOnExit(path); //4、获取一个文件在HDFS的存储位置 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path(hdfsFileName); FileStatus fstatus = fs.getFileStatus(path); BlockLocation[] blkLocation = fs.getFileBlockLocations(fstatus,0,fstatus.getLen()); for(int i=0;i<blkLocation.length;++i){ String[] hosts = blkLocation[i].getHosts(); } //5、获取集群中所有节点的主机名 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); DistributedFileSystem dfs = (DistributedFileSystem)fs; DatanodeInfo[] datanodeStatus = dfs.getDatanodeStats(); String[] names = new String[datanodeStatus.length]; for(int i=o;i<datanodeStatus.length;++i){ names[i] = datanodeStatus[i].getHostName(); } //6、在原HDFS的一个文件中追加内容 public void appendToHdfs(String hdfsFileName,String content) throws Exception{ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(hdfsFileName),conf); Path path = new Path(hdfsFileName); FSDataOutputStream outputStream = fs.append(path); int readlen = content.getBytes().length; while(-1 != readlen){ outputStream.write(content.getBytes(),0,readlen); } } //7、输出指定文件目录的所有文件的文件名及文件大小等信息 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(hdfsFileName),conf); Path path = new Path(hdfsFileName); FileStatus fstatus[] = fs.listStatus(path); int size = fstatus.length; for(int i=0;i<size;++i){ String FileName = fstatus[i].getPath().getName(); int FileSize = fstatus[i].getLen(); }