对于MapReduce等框架来说,需要有一套更底层的API来获取某个指定文件中的一部分数据,而不是一整个文件
因此使用流的方式来操作 HDFS上的文件,可以实现读取指定偏移量范围的数据
1.客户端测试类代码:
package cn.bigdata.hdfs; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.junit.Before; public class HdfsStreamAcess { //获取客户端操作hdfs的实例对象 private FileSystem fs = null; Configuration conf = null; @Before public void inin() throws IOException, InterruptedException, URISyntaxException{ conf = new Configuration(); //拿到一个文件系统操作的客户端实例对象,最后一个参数为用户名 fs = FileSystem.get(new URI("hdfs://shizhan2:9000"),conf,"root"); } }
2.流式上传文件:
//流式上传文件 @Test public void testUploadWithStream() throws IllegalArgumentException, IOException{ //true:该文件夹存在就覆盖 IOUtils:工具类 FSDataOutputStream outputstream = fs.create(new Path("/angelababy.love"), true); FileInputStream input = new FileInputStream("c:/xxx.txt"); IOUtils.copy(input, outputstream); }
3.流式下载文件:
//流式下载文件 @Test public void testDownloadWithStream() throws Exception{ FSDataInputStream in = fs.open(new Path("/angelababy.love")); FileOutputStream out = new FileOutputStream("d:/access_stream.log"); IOUtils.copy(in, out); }
4.流式读取指定长度的文件:
//文件的随机读写 @Test public void testRandomAccess() throws Exception{ FSDataInputStream in = fs.open(new Path("/regist-copy.log")); FileOutputStream out = new FileOutputStream("d:/random_stream.log"); IOUtils.copyLarge(in, out, 1*1024*1024, 1*1024*1024); // 从1M位置开始读,读1M }
hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度,用于上层分布式运算框架并发处理数据
5.控制台打印HDFS文件内容:
@Test public void testCat() throws Exception{ FSDataInputStream in = fs.open(new Path("/angelababy.love")); IOUtils.copy(in,System.out); }
6.递归列出指定目录下所有子文件夹中的文件:
@Test public void testLs() throws Exception { RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true); while(listFiles.hasNext()){ LocatedFileStatus fileStatus = listFiles.next(); System.out.println("blocksize: " +fileStatus.getBlockSize()); System.out.println("owner: " +fileStatus.getOwner()); System.out.println("Replication: " +fileStatus.getReplication()); System.out.println("Permission: " +fileStatus.getPermission()); System.out.println("Name: " +fileStatus.getPath().getName()); System.out.println("------------------"); BlockLocation[] blockLocations = fileStatus.getBlockLocations(); for(BlockLocation b:blockLocations){ System.out.println("块起始偏移量: " +b.getOffset()); System.out.println("块长度:" + b.getLength()); //块所在的datanode节点 String[] datanodes = b.getHosts(); for(String dn:datanodes){ System.out.println("datanode:" + dn); } } } }
7.获取文件块信息:
@Test public void testGetFileBlock() throws Exception{ FileStatus fileStatus = fs.getFileStatus(new Path("/pcre-8.35.tar.gz")); BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); for (BlockLocation bl : blockLocations) { System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset()); String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println(host); } } }