HDFS:Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的。HDFS是Apache Hadoop Core项目的一部分。
HDFS是一个的主从结构,一个HDFS集群是由一个名字节点,它是一个管理文件命名空间和调节客户端访问文件的主服务器,当然还有一些数据节点,通常是一个节点一个机器,它来管理对应节点的存储。HDFS对外开放文件命名空间并允许用户数据以文件形式存储。 内部机制是将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如打开,关闭,重命名等等。它同时确定块与数据节点的映射。数据节点来负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建,删除,和来自名字节点的块复制指令。
1、获取文件系统 2、通过文件系统打开文件 3、将文件内容输出
public static void read(Path path) throws IOException{
FileSystem hdfs = HdfsUtils.getFilesystem(); //步骤 1
FSDataInputStream fsDataInputStream = hdfs.open(path); //步骤 2
IOUtils.copyBytes(fsDataInputStream, System.out, 4096,false); //步骤 3
}
获取文件系统对象
要从HDFS上读取文件,必须先得到一个FileSystem。HDFS本身就是一个文件系统,所以,我们得到一个文件系统后就可以对HDFS进行相关操作。获取文件系统的步骤可以分为以下2步。 1、读取配置文件。 2、获取文件系统。 读取配置文件:Configuration类有三个构造器,无参数的构造器表示直接加载默认资源,也可以指定一个boolean参数来关闭加载默认值,或直接使用另外一个Configuration对象来初始化。
package com.yq.common;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
public class HdfsUtils {
public static FileSystem getFilesystem(){
FileSystem hdfs=null;
Configuration conf=new Configuration();
try{
URI uri = new URI("hdfs://localhost:9000");
hdfs = FileSystem.get(uri,conf);
}
catch(Exception ex){
//
}
return hdfs;
}
}
打开文件
FSDataInputStream fsDataInputStream = hdfs.open(path);
打开文件其实就是创建一个文件输入流,跟踪文件系统的open方法,可以找到源码
public FSDataInputStream open(Path f) throws IOException {
return open(f, getConf().getInt("io.file.buffer.size", 4096));
}
再跟踪open方法,找到以下抽象方法。
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;
//这个方法在DistributedFileSystem类有实现,如下
@Override
public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
statistics.incrementReadOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
public FSDataInputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
return new HdfsDataInputStream(
dfs.open(getPathName(p), bufferSize, verifyChecksum));
}
@Override
public FSDataInputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.open(p, bufferSize);
}
}.resolve(this, absF);
}
在返回结果的时候,创建了一个FileSystemLinkResolver对象,并实现了此类的两个抽象方法。doCall方法和next方法都在resolve方法里用到了,而next方法只是在resolve方法异常捕获时才调用。 跟踪doCall方法,doCall方法里的open()方法有3个参数,src表示要打开的文件路径,buffersize表示缓冲大小,verifyChecksum表示是否校验和,的源代码如下。
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException, UnresolvedLinkException {
checkOpen();
// Get block info from namenode
return new DFSInputStream(this, src, buffersize, verifyChecksum);
}
checkOpen方法表示检查文件系统是否已经打开,如果没有打开,则抛出异常(FileSystemclosed)。 然后返回一个分布式文件系统输入流(DFSInputStream),此处调用的构造方法源代码如下。
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
) throws IOException, UnresolvedLinkException {
this.dfsClient = dfsClient;
this.verifyChecksum = verifyChecksum;
this.buffersize = buffersize;
this.src = src;
this.cachingStrategy =
dfsClient.getDefaultReadCachingStrategy();
openInfo();
}
这个方法先是做了一些准备工作,然后调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取已打开的文件信息。其源代码如下。
/**
* Grab the open-file info from namenode
*/
synchronized void openInfo() throws IOException, UnresolvedLinkException {
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
} else {
break;
}
retriesForLastBlockLength--;
}
if (retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
此方法有调用fetchLocatedBlocksAndGetLastBlockLength()方法获取块的位置信息。
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("newInfo = " + newInfo);
}
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}
if (locatedBlocks != null) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0) {
// if the length is zero, then no data has been written to
// datanode. So no need to wait for the locations.
return 0;
}
return -1;
}
final long len = readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}
currentNode = null;
return lastBlockBeingWrittenLength;
}
getLocatedBlocks方法可以获取块的位置信息。LocatedBlocks类是许多块的位置信息的集合。因为从此类的源码可以发现有这个一个私有属性:
private final List<LocatedBlock> blocks; // array of blocks with prioritized locations
通过文件名,FSDataInputStream类可以获取相文件内容,也可以充当namenode与datanode桥梁。
将文件内容在标准输出显示
因为之前已经获得了一个FSDataInputStream,所以,我们可以调用方法copyBytes将FSDataInputStream拷贝到标准输出流System.out显示。
public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close)
throws IOException {
try {
copyBytes(in, out, buffSize);
if(close) {
out.close();
out = null;
in.close();
in = null;
}
} finally {
if(close) {
closeStream(out);
closeStream(in);
}
}
}
此方法里又调用了另外一个copyBytes方法,作用同样是从一个流拷贝到另外一个流。
public static void copyBytes(InputStream in, OutputStream out, int buffSize)
throws IOException {
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
}
}
先从输入流中读取buffSize大小的数据到缓冲里面,然后将缓冲里的数据写入到输出流out里。一直循环,直到从输入流中读到缓冲里的字节长度为0,表示输入流里的数据已经读取完毕