zoukankan      html  css  js  c++  java
  • HDFS学习之客户端I/O流操作

    除了上一篇提到的通过在客户端调用封装好的API可以直接堆HDFS进行读写操作,那如果想要自己实现上述的API操作也是可以的,就是通过I/O流的方式实现对HDFS的读和写,其中涉及的几个类有:

    org.apache.hadoop.fs.FSDataOutputStream
    org.apache.hadoop.fs.FSDataInputStream
    
    java.io.FileInputStream
    java.io.FileOutputStream

    前两个类是hadoop自己所拥有的I/O输入输出流类,它们面向的是HDFS文件系统,主要用于向HDFS写入和读取;后两个是Java的标准输入输出流类,它们面向的是我们本地的文件系统,主要用于从本地读取文件信息或者向本地的文件写入信息。当我们想要从本地上传到hdfs或者从hdfs下载文件到本地,都需要这两种输入输出流相结合才能完成。那我们看具体的操作。

    首先需要注意的是,FSDataOutputStream和FSDataInputputStream进行实例化并不是通过new关键字来实现,而是通过FileStream这个类对象和HDFS完成连接以后,通过传递参数Path来返回一个具体的实例。

    一、文件的上传操作

        @Test
        public void putFileHDFS() throws URISyntaxException, IOException, InterruptedException {
            // 获取文件系统
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000");
            // 创建输入流
            FileInputStream fis = new FileInputStream(new File("F:/ProgramTest/testfile/fym.txt"));
            // 获取输出流
            FSDataOutputStream fos = fs.create(new Path("/fym.txt"));
            // 流对拷
            IOUtils.copyBytes(fis,fos,conf);
            // 关闭资源
            IOUtils.closeStream(fos);
            IOUtils.closeStream(fis);
            fs.close();
        }

    二、向HDFS中存储的文件进行写操作

    此时需要注意,FSDataOutputStream获取实例的方法有两个,即:

    1、create方法,该方法会创建一个空文件在hdfs上,然后可以向该文件进行顺序写入内容,但是,如果之前要创建的文件已经存在了,该方法默认的是覆盖已有的文件,重新写入现有的内容,之前的信息都会被抹掉;

    2、append方法,该方法会打开一个已有文件,并在文件的末尾进行追加数据,而不会覆盖之前已有的内容。

    我们来看一下两种写操作:

        @Test
        public void writeFile() throws URISyntaxException, IOException, InterruptedException {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000");
            FSDataOutputStream fos = fs.create(new Path("/jaja.txt"));
            fos.write(123);
            fos.write("/tkaka/t".getBytes("UTF-8"));
            fos.writeBytes("sdada");
            fos.flush();
            fos.close();
        }
        @Test
        public void appendFile() throws URISyntaxException, IOException, InterruptedException {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000");
            Path path = new Path("/kaka.txt");
            FSDataOutputStream fos = null;
            if (!fs.exists(path)){
                fos = fs.create(path, false);
            }else {
                fos = fs.append(path);
            }
            fos.writeUTF("我是中国人");
            fos.write("哈哈".getBytes("UTF-8"));
            fos.writeBytes("
    ");
            fos.close();
            fs.close();
        }

    三、下载文件

        @Test
        public void downloadFile() throws URISyntaxException, IOException, InterruptedException {
            // 获取文件系统
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000");
    
            // 获取输入流
            FSDataInputStream fis = fs.open(new Path("/kaka.txt"));
            // 获取输出流
            FileOutputStream fos = new FileOutputStream(new File("F:/ProgramTest/testfile/kaka.txt"));
    
            // 流的对拷
            IOUtils.copyBytes(fis,fos,conf);
            IOUtils.closeStream(fos);
            IOUtils.closeStream(fis);
            fs.close();
        }

    四、定位文件读取

    在hdfs中,存储块的默认大小是128M,如果存储的文件大于128M,那么该文件在集群中就要分块存储了,那我们现在就来分块读取HDFS上的大文件(jdk安装包)

        @Test
        public void readFile_seek_01() throws URISyntaxException, IOException, InterruptedException {
            // 获取文件系统
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000");
            // 获取输入流
            FSDataInputStream fis = fs.open(new Path("/fym/pyx/kak/jdk-8u144-linux-x64.tar.gz"));
            // 获取输出流
            FileOutputStream fos = new FileOutputStream(new File("F:/ProgramTest/testfile/jdk-8u144-linux-x64.tar.gz.part1"));
            // 流的拷贝
            byte[] bytes = new byte[1024];
            for (int i = 0; i < 1024 * 128; i++) {
                fis.read(bytes);
                fos.write(bytes);
            }
            // 关闭资源
            IOUtils.closeStream(fis);
            IOUtils.closeStream(fos);
            fs.close();
        }
        @Test
        public void readFile_seek_02() throws URISyntaxException, IOException, InterruptedException {
            // 获取文件系统
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://192.168.182.101:9000"), conf, "fym000");
            // 获取输入流
            FSDataInputStream fis = fs.open(new Path("/fym/pyx/kak/jdk-8u144-linux-x64.tar.gz"));
            // 定位输入数据位置
            fis.seek(1024*1024*128);
            // 创建输出流
            FileOutputStream fos = new FileOutputStream(new File("F:/ProgramTest/testfile/jdk-8u144-linux-x64.tar.gz.part2"));
    
            // 流的对拷
            IOUtils.copyBytes(fis,fos,conf);
            IOUtils.closeStream(fos);
            IOUtils.closeStream(fis);
            fs.close();
        }

    下载完成之后,在cmd窗口下,进入到两块文件的路径下,对这两个块文件进行合并,命令如下:

    type jdk-8u144-linux-x64.tar.gz.part2 >> jdk-8u144-linux-x64.tar.gz.part1

    然后会发现,part1的大小发生了变化,将其重命名为jdk-8u144-linux-x64.tar.gz,然后解压,会发现这个安装包十分的完整。

  • 相关阅读:
    五层原理体系结构的简单分析
    Simple Factory 简单工厂模式(静态工厂)
    css一个图片包含多个图片|网站侧栏导航
    百度地图、高德地图的数据从哪里得到的?
    浏览器开发
    开发一款浏览器内核需要学习哪些方面的知识?
    使用PowerDesigner进行数据库建模入门
    How to create a search engine
    合并两个有序数组
    STL中的algorithm
  • 原文地址:https://www.cnblogs.com/yxym2016/p/12951153.html
Copyright © 2011-2022 走看看