上面我们学的API操作HDFS系统都是框架封装好的。那么如果我们想自己实现上述API的操作该怎么实现呢?我们可以采用IO流的方式实现数据的上传和下载。
1、上传本地文件到 HDFS ;
public static Configuration conf = new Configuration(); // 1、 本地文件上传 HDFS 系统 @Test public void putIOTest() throws IOException, InterruptedException, URISyntaxException { // 1、获取 HDFS 客户端对象 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "hui"); // 2、获取 I/O 流 FileInputStream fis = new FileInputStream(new File("E:/workspaced/data/zhouzhiruo.txt")); // 3、获取输出流 FSDataOutputStream fos = fs.create(new Path("/yttlj/gmd/wyx.txt")); // 4、流的对拷 IOUtils.copyBytes(fis, fos, conf); // 5、关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); System.out.println(" ok "); }
2、从 HDFS 文件系统获取数据到本地
@Test public void getIOTest() throws IOException, InterruptedException, URISyntaxException { // 1、获取 HDFS 客户端对象 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "hui"); // 2、获取 I/O 流 FSDataInputStream fis = fs.open(new Path("/yttlj/gmd/wyx.txt")); // 3、获取输出流 FileOutputStream fos = new FileOutputStream(new File("E:/workspaced/data/wyx_fw.txt")); // 4、流的对拷 IOUtils.copyBytes(fis, fos, conf); // 5、关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); System.out.println(" ok "); }
3、从 HDFS 文件系统定位读取数据(只读取 128 M)
@Test public void getIO1blockTest() throws IOException, InterruptedException, URISyntaxException { // 1、获取 HDFS 客户端对象 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "hui"); // 2、获取 I/O 流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz")); // 3、获取输出流 FileOutputStream fos = new FileOutputStream(new File("E:/workspaced/data/hadoop-2.7.2.tar.gz.part1")); // 4、使用最传统方式读取 128 M 数据 byte[] buf = new byte[1014]; for (int i = 0; i < 1024 * 128; i++) { fis.read(buf); fos.write(buf); } // 5、关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); System.out.println(" ok "); }
4、从 HDFS 文件系统定位读取数据(从128 M 开始读取)
@Test public void getIO2blockTest() throws IOException, InterruptedException, URISyntaxException { // 1、获取 HDFS 客户端对象 FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "hui"); // 2、获取 I/O 流 FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz")); // ***** 设置指定读取起点 ****** fis.seek(1024 * 1024 * 128); // 3、获取输出流 FileOutputStream fos = new FileOutputStream(new File("E:/workspaced/data/hadoop-2.7.2.tar.gz.part2")); // 4、流数据 copy IOUtils.copyBytes(fis, fos, conf); // 5、关闭资源 IOUtils.closeStream(fos); IOUtils.closeStream(fis); fs.close(); System.out.println(" ok "); }