除了上一篇提到的通过在客户端调用封装好的API可以直接堆HDFS进行读写操作,那如果想要自己实现上述的API操作也是可以的,就是通过I/O流的方式实现对HDFS的读和写,其中涉及的几个类有:
org.apache.hadoop.fs.FSDataOutputStream
org.apache.hadoop.fs.FSDataInputStream
java.io.FileInputStream
java.io.FileOutputStream
前两个类是hadoop自己所拥有的I/O输入输出流类,它们面向的是HDFS文件系统,主要用于向HDFS写入和读取;后两个是Java的标准输入输出流类,它们面向的是我们本地的文件系统,主要用于从本地读取文件信息或者向本地的文件写入信息。当我们想要从本地上传到hdfs或者从hdfs下载文件到本地,都需要这两种输入输出流相结合才能完成。那我们看具体的操作。
首先需要注意的是,FSDataOutputStream和FSDataInputputStream进行实例化并不是通过new关键字来实现,而是通过FileStream这个类对象和HDFS完成连接以后,通过传递参数Path来返回一个具体的实例。
一、文件的上传操作
@Test public void putFileHDFS() throws URISyntaxException, IOException, InterruptedException { // 获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000"); // 创建输入流 FileInputStream fis = new FileInputStream(new File("F:/ProgramTest/testfile/fym.txt")); // 获取输出流 FSDataOutputStream fos = fs.create(new Path("/fym.txt")); // 流对拷 IOUtils.copyBytes(fis,fos,conf); // 关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
二、向HDFS中存储的文件进行写操作
此时需要注意,FSDataOutputStream获取实例的方法有两个,即:
1、create方法,该方法会创建一个空文件在hdfs上,然后可以向该文件进行顺序写入内容,但是,如果之前要创建的文件已经存在了,该方法默认的是覆盖已有的文件,重新写入现有的内容,之前的信息都会被抹掉;
2、append方法,该方法会打开一个已有文件,并在文件的末尾进行追加数据,而不会覆盖之前已有的内容。
我们来看一下两种写操作:
@Test public void writeFile() throws URISyntaxException, IOException, InterruptedException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000"); FSDataOutputStream fos = fs.create(new Path("/jaja.txt")); fos.write(123); fos.write("/tkaka/t".getBytes("UTF-8")); fos.writeBytes("sdada"); fos.flush(); fos.close(); }
@Test public void appendFile() throws URISyntaxException, IOException, InterruptedException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000"); Path path = new Path("/kaka.txt"); FSDataOutputStream fos = null; if (!fs.exists(path)){ fos = fs.create(path, false); }else { fos = fs.append(path); } fos.writeUTF("我是中国人"); fos.write("哈哈".getBytes("UTF-8")); fos.writeBytes(" "); fos.close(); fs.close(); }
三、下载文件
@Test public void downloadFile() throws URISyntaxException, IOException, InterruptedException { // 获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000"); // 获取输入流 FSDataInputStream fis = fs.open(new Path("/kaka.txt")); // 获取输出流 FileOutputStream fos = new FileOutputStream(new File("F:/ProgramTest/testfile/kaka.txt")); // 流的对拷 IOUtils.copyBytes(fis,fos,conf); IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
四、定位文件读取
在hdfs中,存储块的默认大小是128M,如果存储的文件大于128M,那么该文件在集群中就要分块存储了,那我们现在就来分块读取HDFS上的大文件(jdk安装包)
@Test public void readFile_seek_01() throws URISyntaxException, IOException, InterruptedException { // 获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000"); // 获取输入流 FSDataInputStream fis = fs.open(new Path("/fym/pyx/kak/jdk-8u144-linux-x64.tar.gz")); // 获取输出流 FileOutputStream fos = new FileOutputStream(new File("F:/ProgramTest/testfile/jdk-8u144-linux-x64.tar.gz.part1")); // 流的拷贝 byte[] bytes = new byte[1024]; for (int i = 0; i < 1024 * 128; i++) { fis.read(bytes); fos.write(bytes); } // 关闭资源 IOUtils.closeStream(fis); IOUtils.closeStream(fos); fs.close(); } @Test public void readFile_seek_02() throws URISyntaxException, IOException, InterruptedException { // 获取文件系统 Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000"); // 获取输入流 FSDataInputStream fis = fs.open(new Path("/fym/pyx/kak/jdk-8u144-linux-x64.tar.gz")); // 定位输入数据位置 fis.seek(1024*1024*128); // 创建输出流 FileOutputStream fos = new FileOutputStream(new File("F:/ProgramTest/testfile/jdk-8u144-linux-x64.tar.gz.part2")); // 流的对拷 IOUtils.copyBytes(fis,fos,conf); IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); }
下载完成之后,在cmd窗口下,进入到两块文件的路径下,对这两个块文件进行合并,命令如下:
type jdk-8u144-linux-x64.tar.gz.part2 >> jdk-8u144-linux-x64.tar.gz.part1
然后会发现,part1的大小发生了变化,将其重命名为jdk-8u144-linux-x64.tar.gz,然后解压,会发现这个安装包十分的完整。