zoukankan      html  css  js  c++  java
  • Hadoop 核心编程之 HDFS 的文件操作

    前言

    本文并没有打算介绍 HDFS 的读写流程,虽然这是一块比较重要的内容。如果你感兴趣,可以去搜索相关资料。如果一遍没有看懂,请看第二遍。
    本文还是以代码为主,并附以简短的说明,帮助你理解代码的逻辑,以及一些注意事项。你可以将本文的代码封装成一个工具类,这样以后需要调用时候,就可以复用了。


    版权说明

    著作权归作者所有。
    商业转载请联系作者获得授权,非商业转载请注明出处。
    本文作者:Q-WHai
    发表日期: 2016年6月21日
    本文链接:http://blog.csdn.net/lemon_tree12138/article/details/51728359
    来源:CSDN
    更多内容:分类 >> 大数据之 Hadoop


    HDFS 读写 API

    上传本地文件到 HDFS

    public static void uploadFileFromLocal(String localPath, String hdfsPath) throws IOException {
        InputStream in = new BufferedInputStream(new FileInputStream(localPath));
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
        OutputStream out = fileSystem.create(new Path(hdfsPath));
    
        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
    }

    此处使用了一个十分方便的方法 IOUtils.copyBytes()。调用这个文件,你可以很方便地将输入流写入到输出流,而且不需要你人为去控制缓冲区,也不需要人为控制循环读取输入源。IOUtils.copyBytes() 中的第 4 个参数表示是否关闭流对象,也就是输入输出流对象。一般来说打成 true 就好了。


    从 HDFS 下载文件到本地

    通过上面上传文件的例子,我们可以很容易地写出下载文件的代码。如下:

    public static void downloadFileToLocal (String hdfsPath, String localPath) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
    
        FSDataInputStream in = fileSystem.open(new Path(hdfsPath));
        OutputStream out = new FileOutputStream(localPath);
    
        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
    }

    从 HDFS 下载文件到本地

    上面是的下载文件已经很好用了,现在再来看看另外一种下载文件的方法。调用的是 FileUtil.copy() 方法。

    public static void downloadFileToLocalNew (String hdfsSourceFileFullName, String localFileFullName) throws IOException {
        Configuration config = new Configuration();
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsSourceFileFullName), config);
        FileUtil.copy(fileSystem, new Path(hdfsSourceFileFullName), new File(localFileFullName), false, config);
    
        fileSystem.close();
    }

    按行读取 HDFS 文件内容

    在 HDFS 里面应该是没有直接提供按行读取文件的 API(如果有,后面我们再更新),但是 JDK 中提供相关的 API,那就是 BufferedReader。这里你可以结合刚学 Java 时使用的用户在控制台向程序输入,当时除了 Scanner 就是 BufferedReader 了,很方便。

    public static List<String> readFileHDFSByLine (Configuration config, String hdfsFileFullName) throws IOException {
        List<String> result = new ArrayList<>();
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsFileFullName), config);
        FSDataInputStream dataInputStream = fileSystem.open(new Path(hdfsFileFullName));  
        BufferedReader reader = null;
        String line;
        try {
            reader = new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8"));
            while ((line = reader.readLine()) != null) {
                result.add(line);
            }
        } finally {
            if (reader != null) {
                reader.close();
            }
        }
    
        return result;
    }

    向 HDFS 中的文件追加内容

    public static void appendLabelToHDFS(String hdfsPath, String content) throws IOException {
        Configuration config = new Configuration();
        config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
    
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), config);
        FSDataOutputStream out = fileSystem.append(new Path(hdfsPath));
    
        int readLen = content.getBytes().length;
        if (-1 != readLen) {
            out.write(content.getBytes(), 0, readLen);
        }
        out.close();
        fileSystem.close();
    }

    此处,如果你不想动态设置 Configuration,那么你就需要在配置文件中配置此两项内容。
    补充说明
    如果你需要对文件进行追加内容操作,那么在 hdfs-site.xml 配置文件中需要设置如下属性。

    <property>
        <name>dfs.support.append</name>
        <value>true</value>
    </property>

    向 HDFS 中的文件追加文件

    通过上面追加字符串的操作,你可能会想到这里可以先读取文件内容到字符串,再进行追加字符串操作。这样的确是可以的。不过可以看到上的输出是一个输出流,那么这里就不需要再读取到字符串了。文件是可以直接对到文件流上的嘛。所以向 HDFS 文件中追加文件的操作如下:

    public static void appendFileToHDFS(String hdfsPath, String localFilePath) throws IOException {
        Configuration config = new Configuration();
        config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
        config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
    
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), config);
    
        InputStream in = new BufferedInputStream(new FileInputStream(localFilePath));
        FSDataOutputStream out = fileSystem.append(new Path(hdfsPath));
    
        IOUtils.copyBytes(in, out, 4096, true);
        fileSystem.close();
    }

    向 HDFS 文件中写入内容

    此处对 HDFS 文件的更改是覆盖式的,也就是会把之前的内容全部删除。

    public static void writeLabelToHDFS(String hdfsPath, String content) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
        FSDataOutputStream out = fileSystem.create(new Path(hdfsPath));
    
        int readLen = content.getBytes().length;
        if (-1 != readLen) {
            out.write(content.getBytes(), 0, readLen);
        }
        out.close();
        fileSystem.close();
    }

    删除 HDFS 中文件

    此删除操作是删除一个已存在的文件,从代码中的方法命名就可以看出来。不过,如果 HDFS 中不存在此文件,也不会抛出异常。

    public static void deleteFileFromHDFS(String hdfsPath) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration());
        fileSystem.deleteOnExit(new Path(hdfsPath));
        fileSystem.close();
    }

    读取 HDFS 某一目录下的所有文件

    这里只是读取目录下的文件,并不包含目录。

    public static void readFilesOnlyInDirectoryFromHDFS(String hdfsFolderName) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsFolderName), new Configuration());
        FileStatus fileList[] = fileSystem.listStatus(new Path(hdfsFolderName));
    
        for (FileStatus fileStatus : fileList) {
            if (fileStatus.isDirectory()) {
                continue;
            }
            System.out.println("FileName: " + fileStatus.getPath().getName() + "		Size: " + fileStatus.getLen());
        }
    
        fileSystem.close();
    }

    读取 HDFS 某一目录下的所有文件

    此方法是参考上面的 readFilesOnlyInDirectoryFromHDFS() 方法来的,只是这里也会去读取子目录下的所有文件。所以使用了一个递归,并且为了更好地封装,这里将递归的逻辑与调用分开了,这样做的目的是避免产生过多的 Configuration 对象。

    public static void listHDFSFiles (String hdfsFileFullName) throws IOException {
        Configuration config = new Configuration();
        listHDFSFiles(config, hdfsFileFullName);
    }
    
    private static void listHDFSFiles (Configuration config, String hdfsFileFullName) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsFileFullName), config);
    
        FileStatus[] fileStatus = fileSystem.listStatus(new Path(hdfsFileFullName));
        for (FileStatus statusItem : fileStatus) {
            if (statusItem.isDirectory()) {
                listHDFSFiles(config, statusItem.getPath().toString());
            }
            System.out.println("FileName: " + statusItem.getPath() + "		Size: " + statusItem.getLen());
        }
        fileSystem.close();
    }

    获取某一文件在 HDFS 中实际保存的节点

    此方法可以展示 HDFS 中的某一个文件在 HDFS 文件系统中被保存的所有 DataNode。

    public static void getFileLocal(String hdfsFileFullName) throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create(hdfsFileFullName), new Configuration());
        FileStatus status = fileSystem.getFileStatus(new Path(hdfsFileFullName));
        BlockLocation[] locations = fileSystem.getFileBlockLocations(status, 0, status.getLen());
    
        for (int i = 0; i < locations.length; i++) {
            String[] hosts = locations[i].getHosts();
            for (String host : hosts) {
                System.out.println("block_" + i + "_location:" + host);
            }
        }
    }

    获得 HDFS 中所有的节点信息

    如果你不知道 HDFS 文件系统中有哪些文件,单纯的想知道我的 HDFS 文件系统中有哪些 DataNode。那么可以把上面的 hdfsFileFullName 写成 HDFS 的根目录就可以了。比如我的设置如下:

    public static void getHDFSNode() throws IOException {
        FileSystem fileSystem = FileSystem.get(URI.create("hdfs://master:9000/"), new Configuration());
    
        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) fileSystem;
        DatanodeInfo[] dataNodeStats = distributedFileSystem.getDataNodeStats();
    
        for (int i = 0; i < dataNodeStats.length; i++) {
            System.out.println("DataNode_" + i + "_Node:" + dataNodeStats[i].getHostName());
        }
    }

  • 相关阅读:
    hdu acm 2844 Coins 解题报告
    hdu 1963 Investment 解题报告
    codeforces 454B. Little Pony and Sort by Shift 解题报告
    广大暑假训练1 E题 Paid Roads(poj 3411) 解题报告
    hdu acm 2191 悼念512汶川大地震遇难同胞——珍惜现在,感恩生活
    hdu acm 1114 Piggy-Bank 解题报告
    poj 2531 Network Saboteur 解题报告
    数据库范式
    ngnix 配置CI框架 与 CI的简单使用
    Vundle的安装
  • 原文地址:https://www.cnblogs.com/fengju/p/6335975.html
Copyright © 2011-2022 走看看