zoukankan      html  css  js  c++  java
  • Hadoop 核心编程之 HDFS 的文件操作

    前言

    本文并没有打算介绍 HDFS 的读写流程,虽然这是一块比较重要的内容。如果你感兴趣,可以去搜索相关资料。如果一遍没有看懂,请看第二遍。
    本文还是以代码为主,并附以简短的说明,帮助你理解代码的逻辑,以及一些注意事项。你可以将本文的代码封装成一个工具类,这样以后需要调用时候,就可以复用了。


    版权说明

    著作权归作者所有。
    商业转载请联系作者获得授权,非商业转载请注明出处。
    本文作者:Q-WHai
    发表日期: 2016年6月21日
    本文链接:http://blog.csdn.net/lemon_tree12138/article/details/51728359
    来源:CSDN
    更多内容:分类 >> 大数据之 Hadoop


    HDFS 读写 API

    上传本地文件到 HDFS

    public static void uploadFileFromLocal(String localPath, String hdfsPath) throws IOException {
        InputStream in = new BufferedInputStream(new FileInputStream(localPath));
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
        OutputStream out = fileSystem.create(new Path(hdfsPath));
    
        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
    }

    此处使用了一个十分方便的方法 IOUtils.copyBytes()。调用这个文件,你可以很方便地将输入流写入到输出流,而且不需要你人为去控制缓冲区,也不需要人为控制循环读取输入源。IOUtils.copyBytes() 中的第 4 个参数表示是否关闭流对象,也就是输入输出流对象。一般来说打成 true 就好了。


    从 HDFS 下载文件到本地

    通过上面上传文件的例子,我们可以很容易地写出下载文件的代码。如下:

    public static void downloadFileToLocal (String hdfsPath, String localPath) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
    
        FSDataInputStream in = fileSystem.open(new Path(hdfsPath));
        OutputStream out = new FileOutputStream(localPath);
    
        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
    }

    从 HDFS 下载文件到本地

    上面是的下载文件已经很好用了,现在再来看看另外一种下载文件的方法。调用的是 FileUtil.copy() 方法。

    public static void downloadFileToLocalNew (String hdfsSourceFileFullName, String localFileFullName) throws IOException {
        Configuration config = new Configuration();
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsSourceFileFullName), config);
        FileUtil.copy(fileSystem, new Path(hdfsSourceFileFullName), new File(localFileFullName), false, config);
    
        fileSystem.close();
    }

    按行读取 HDFS 文件内容

    在 HDFS 里面应该是没有直接提供按行读取文件的 API(如果有,后面我们再更新),但是 JDK 中提供相关的 API,那就是 BufferedReader。这里你可以结合刚学 Java 时使用的用户在控制台向程序输入,当时除了 Scanner 就是 BufferedReader 了,很方便。

    public static List<String> readFileHDFSByLine (Configuration config, String hdfsFileFullName) throws IOException {
        List<String> result = new ArrayList<>();
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsFileFullName), config);
        FSDataInputStream dataInputStream = fileSystem.open(new Path(hdfsFileFullName));  
        BufferedReader reader = null;
        String line;
        try {
            reader = new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8"));
            while ((line = reader.readLine()) != null) {
                result.add(line);
            }
        } finally {
            if (reader != null) {
                reader.close();
            }
        }
    
        return result;
    }

    向 HDFS 中的文件追加内容

    public static void appendLabelToHDFS(String hdfsPath, String content) throws IOException {
        Configuration config = new Configuration();
        config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
    
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), config);
        FSDataOutputStream out = fileSystem.append(new Path(hdfsPath));
    
        int readLen = content.getBytes().length;
        if (-1 != readLen) {
            out.write(content.getBytes(), 0, readLen);
        }
        out.close();
        fileSystem.close();
    }

    此处,如果你不想动态设置 Configuration,那么你就需要在配置文件中配置此两项内容。
    补充说明
    如果你需要对文件进行追加内容操作,那么在 hdfs-site.xml 配置文件中需要设置如下属性。

    <property>
        <name>dfs.support.append</name>
        <value>true</value>
    </property>

    向 HDFS 中的文件追加文件

    通过上面追加字符串的操作,你可能会想到这里可以先读取文件内容到字符串,再进行追加字符串操作。这样的确是可以的。不过可以看到上的输出是一个输出流,那么这里就不需要再读取到字符串了。文件是可以直接对到文件流上的嘛。所以向 HDFS 文件中追加文件的操作如下:

    public static void appendFileToHDFS(String hdfsPath, String localFilePath) throws IOException {
        Configuration config = new Configuration();
        config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
    
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), config);
    
        InputStream in = new BufferedInputStream(new FileInputStream(localFilePath));
        FSDataOutputStream out = fileSystem.append(new Path(hdfsPath));
    
        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
    }

    向 HDFS 文件中写入内容

    此处对 HDFS 文件的更改是覆盖式的,也就是会把之前的内容全部删除。

    public static void writeLabelToHDFS(String hdfsPath, String content) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
        FSDataOutputStream out = fileSystem.create(new Path(hdfsPath));
    
        int readLen = content.getBytes().length;
        if (-1 != readLen) {
            out.write(content.getBytes(), 0, readLen);
        }
        out.close();
        fileSystem.close();
    }

    删除 HDFS 中文件

    此删除操作是删除一个已存在的文件,从代码中的方法命名就可以看出来。不过,如果 HDFS 中不存在此文件,也不会抛出异常。

    public static void deleteFileFromHDFS(String hdfsPath) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
        fileSystem.deleteOnExit(new Path(hdfsPath));
        fileSystem.close();
    }

    读取 HDFS 某一目录下的所有文件

    这里只是读取目录下的文件,并不包含目录。

    public static void readFilesOnlyInDirectoryFromHDFS(String hdfsFolderName) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsFolderName), new Configuration());
        FileStatus fileList[] = fileSystem.listStatus(new Path(hdfsFolderName));
    
        for (FileStatus fileStatus : fileList) {
            if (fileStatus.isDirectory()) {
                continue;
            }
            System.out.println("FileName: " + fileStatus.getPath().getName() + "		Size: " + fileStatus.getLen());
        }
    
        fileSystem.close();
    }

    读取 HDFS 某一目录下的所有文件

    此方法是参考上面的 readFilesOnlyInDirectoryFromHDFS() 方法来的,只是这里也会去读取子目录下的所有文件。所以使用了一个递归,并且为了更好地封装,这里将递归的逻辑与调用分开了,这样做的目的是避免产生过多的 Configuration 对象。

    public static void listHDFSFiles (String hdfsFileFullName) throws IOException {
        Configuration config = new Configuration();
        listHDFSFiles(config, hdfsFileFullName);
    }
    
    private static void listHDFSFiles (Configuration config, String hdfsFileFullName) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsFileFullName), config);
    
        FileStatus[] fileStatus = fileSystem.listStatus(new Path(hdfsFileFullName));
        for (FileStatus statusItem : fileStatus) {
            if (statusItem.isDirectory()) {
                listHDFSFiles(config, statusItem.getPath().toString());
            }
            System.out.println("FileName: " + statusItem.getPath() + "		Size: " + statusItem.getLen());
        }
        fileSystem.close();
    }

    获取某一文件在 HDFS 中实际保存的节点

    此方法可以展示 HDFS 中的某一个文件在 HDFS 文件系统中被保存的所有 DataNode。

    public static void getFileLocal(String hdfsFileFullName) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsFileFullName), new Configuration());
        FileStatus status = fileSystem.getFileStatus(new Path(hdfsFileFullName));
        BlockLocation[] locations = fileSystem.getFileBlockLocations(status, 0, status.getLen());
    
        for (int i = 0; i < locations.length; i++) {
            String[] hosts = locations[i].getHosts();
            for (String host : hosts) {
                System.out.println("block_" + i + "_location:" + host);
            }
        }
    }

    获得 HDFS 中所有的节点信息

    如果你不知道 HDFS 文件系统中有哪些文件,单纯的想知道我的 HDFS 文件系统中有哪些 DataNode。那么可以把上面的 hdfsFileFullName 写成 HDFS 的根目录就可以了。比如我的设置如下:

    public static void getHDFSNode() throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create("hdfs://master:9000/"), new Configuration());
    
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) fileSystem;
        DatanodeInfo[] dataNodeStats = distributedFileSystem.getDataNodeStats();
    
        for (int i = 0; i < dataNodeStats.length; i++) {
            System.out.println("DataNode_" + i + "_Node:" + dataNodeStats[i].getHostName());
        }
    }

  • 相关阅读:
    表的简单增删改查
    数据库基础入门语句
    exports与module.exports的区别
    Spring入门——简介
    Mybatis之动态SQL揭秘
    Mybatis的核心组成部分-SQL映射文件揭秘
    Mybatis框架简介、搭建及核心元素揭秘
    实战讲解:SSM+Maven开发APP信息管理平台-developer版
    OpenCV结构简介
    在Linux服务器上安装lxml
  • 原文地址:https://www.cnblogs.com/fengju/p/6335975.html
Copyright © 2011-2022 走看看