zoukankan      html  css  js  c++  java
  • HDFS文件目录操作代码

    分布式文件系统HDFS中对文件/目录的相关操作代码,整理了一下,大概包括以下部分:

    • 文件夹的新建、删除、重命名
    • 文件夹中子文件和目录的统计
    • 文件的新建及显示文件内容
    • 文件在local和remote间的相互复制
    • 定位文件在HDFS中的位置,以及副本存放的主机
    • HDFS资源使用情况

    1. 新建文件夹

    public void mkdirs(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        if (!fs.exists(path)) {
            fs.mkdirs(path);
            System.out.println("Create: " + folder);
        }
        fs.close();
    }

    2. 删除文件夹

    public void rmr(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.deleteOnExit(path);
        System.out.println("Delete: " + folder);
        fs.close();
    }

    3. 文件重命名

    public void rename(String src, String dst) throws IOException {
        Path name1 = new Path(src);
        Path name2 = new Path(dst);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.rename(name1, name2);
        System.out.println("Rename: from " + src + " to " + dst);
        fs.close();
    }

    4. 列出文件夹中的子文件及目录

    public void ls(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FileStatus[] list = fs.listStatus(path);
        
        System.out.println("ls: " + folder);
        System.out.println("==========================================================");
        for (FileStatus f : list) {
            System.out.printf("name: %s, folder: %s, size: %d
    ", f.getPath(), f.isDirectory(), f.getLen());
        }
        System.out.println("==========================================================");
        fs.close();
    }

    5. 创建文件,并添加内容

    public void createFile(String file, String content) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        byte[] buff = content.getBytes();
        FSDataOutputStream os = null;
        try {
            os = fs.create(new Path(file));
            os.write(buff, 0, buff.length);
            System.out.println("Create: " + file);
        } finally {
            if (os != null)
                os.close();
        }
        fs.close();
    }

    6. 将local数据复制到remote

    public void copyFile(String local, String remote) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyFromLocalFile(new Path(local), new Path(remote));
        System.out.println("copy from: " + local + " to " + remote);
        fs.close();
    }

    7. 将remote数据下载到local

    public void download(String remote, String local) throws IOException {
        Path path = new Path(remote);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyToLocalFile(path, new Path(local));
        System.out.println("download: from" + remote + " to " + local);
        fs.close();
    }

    8. 显示文件内容

        public String cat(String remoteFile) throws IOException {
            Path path = new Path(remoteFile);
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            FSDataInputStream fsdis = null;
            System.out.println("cat: " + remoteFile);
    
            OutputStream baos = new ByteArrayOutputStream();
            String str = null;
    
            try {
                fsdis = fs.open(path);
                IOUtils.copyBytes(fsdis, baos, 4096, false);
                str = baos.toString();
            } finally {
                IOUtils.closeStream(fsdis);
                fs.close();
            }
            System.out.println(str);
            return str;
        }

    9. 定位一个文件在HDFS中存储的位置,以及多个副本存储在集群哪些节点上

    public void location() throws IOException {
        String folder = hdfsPath + "create/";
        String file = "t2.txt";
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration());
        FileStatus f = fs.getFileStatus(new Path(folder + file));
        BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());
    
        System.out.println("File Location: " + folder + file);
        for (BlockLocation bl : list) {
            String[] hosts = bl.getHosts();
            for (String host : hosts) {
                System.out.println("host:" + host);
            }
        }
        fs.close();
    }

    10. 获取HDFS集群存储资源使用情况

    public void getTotalCapacity() {
        try {
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            FsStatus fsStatus = fs.getStatus();
            System.out.println("总容量:" + fsStatus.getCapacity());
            System.out.println("使用容量:" + fsStatus.getUsed());
            System.out.println("剩余容量:" + fsStatus.getRemaining());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    完整代码

    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.ContentSummary;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FsStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.mapred.JobConf;
    
    /*
    * HDFS工具类
    * 
    */
    public class Hdfs {
    
        private static final String HDFS = "hdfs://10.20.14.47:8020/";
    
        public Hdfs(Configuration conf) {
            this(HDFS, conf);
        }
    
        public Hdfs(String hdfs, Configuration conf) {
            this.hdfsPath = hdfs;
            this.conf = conf;
        }
    
        private String hdfsPath;
        private Configuration conf;
    
        public static void main(String[] args) throws IOException {
            JobConf conf = config();
            Hdfs hdfs = new Hdfs(conf);
            hdfs.createFile("/create/t2.txt", "12");
            hdfs.location();
        }
    
        public static JobConf config() {
            JobConf conf = new JobConf(Hdfs.class);
            conf.setJobName("HdfsDAO");
            conf.addResource("classpath:/hadoop/core-site.xml");
            conf.addResource("classpath:/hadoop/hdfs-site.xml");
            conf.addResource("classpath:/hadoop/mapred-site.xml");
            return conf;
        }
        
        /*
         * 创建文件夹
         */
        public void mkdirs(String folder) throws IOException {
            Path path = new Path(folder);
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            if (!fs.exists(path)) {
                fs.mkdirs(path);
                System.out.println("Create: " + folder);
            }
            fs.close();
        }
    
        /*
         * 删除文件夹
         */
        public void rmr(String folder) throws IOException {
            Path path = new Path(folder);
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            fs.deleteOnExit(path);
            System.out.println("Delete: " + folder);
            fs.close();
        }
    
        /*
         * 文件重命名
         */
        public void rename(String src, String dst) throws IOException {
            Path name1 = new Path(src);
            Path name2 = new Path(dst);
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            fs.rename(name1, name2);
            System.out.println("Rename: from " + src + " to " + dst);
            fs.close();
        }
    
        /*
         * 列出文件夹中的子文件及目录
         */
        public void ls(String folder) throws IOException {
            Path path = new Path(folder);
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            FileStatus[] list = fs.listStatus(path);
            
            System.out.println("ls: " + folder);
            System.out.println("==========================================================");
            for (FileStatus f : list) {
                System.out.printf("name: %s, folder: %s, size: %d
    ", f.getPath(), f.isDirectory(), f.getLen());
            }
            System.out.println("==========================================================");
            fs.close();
        }
    
        /*
         * 创建文件,并添加内容
         */
        public void createFile(String file, String content) throws IOException {
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            byte[] buff = content.getBytes();
            FSDataOutputStream os = null;
            try {
                os = fs.create(new Path(file));
                os.write(buff, 0, buff.length);
                System.out.println("Create: " + file);
            } finally {
                if (os != null)
                    os.close();
            }
            fs.close();
        }
    
        /*
         * 将local的数据复制到remote
         */
        public void copyFile(String local, String remote) throws IOException {
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            fs.copyFromLocalFile(new Path(local), new Path(remote));
            System.out.println("copy from: " + local + " to " + remote);
            fs.close();
        }
    
        /*
         * 将remote数据下载到local
         */
        public void download(String remote, String local) throws IOException {
            Path path = new Path(remote);
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            fs.copyToLocalFile(path, new Path(local));
            System.out.println("download: from" + remote + " to " + local);
            fs.close();
        }
    
        /*
         * 显示文件内容
         */
        public String cat(String remoteFile) throws IOException {
            Path path = new Path(remoteFile);
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            FSDataInputStream fsdis = null;
            System.out.println("cat: " + remoteFile);
    
            OutputStream baos = new ByteArrayOutputStream();
            String str = null;
    
            try {
                fsdis = fs.open(path);
                IOUtils.copyBytes(fsdis, baos, 4096, false);
                str = baos.toString();
            } finally {
                IOUtils.closeStream(fsdis);
                fs.close();
            }
            System.out.println(str);
            return str;
        }
    
        /*
         * 定位一个文件在HDFS中存储的位置,以及多个副本存储在集群哪些节点上
         */
        public void location() throws IOException {
            String folder = hdfsPath + "create/";
            String file = "t2.txt";
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration());
            FileStatus f = fs.getFileStatus(new Path(folder + file));
            BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());
    
            System.out.println("File Location: " + folder + file);
            for (BlockLocation bl : list) {
                String[] hosts = bl.getHosts();
                for (String host : hosts) {
                    System.out.println("host:" + host);
                }
            }
            fs.close();
        }
        
        /*
         * 获取HDFS资源使用情况
         */
        public void getTotalCapacity() {
            try {
                FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
                FsStatus fsStatus = fs.getStatus();
                System.out.println("总容量:" + fsStatus.getCapacity());
                System.out.println("使用容量:" + fsStatus.getUsed());
                System.out.println("剩余容量:" + fsStatus.getRemaining());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        
        /*
         * 获取某文件中包含的目录数,文件数,及占用空间大小
         */
        public void getContentSummary(String path) {
            ContentSummary cs = null;
            try {
                FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
                cs = fs.getContentSummary(new Path(path));
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            // 目录数
            Long directoryCount = cs.getDirectoryCount();
            // 文件数
            Long fileCount = cs.getFileCount();
            // 占用空间
            Long length = cs.getLength();
            
            System.out.println("目录数:" + directoryCount);
            System.out.println("文件数:" + fileCount);
            System.out.println("占用空间:" + length);
        }
    }
    View Code
  • 相关阅读:
    图像的剪切
    DOS指令大全(二)
    扫描进程
    数据库名、数据库实例、全局数据库名、服务名、SID等的区别
    ORA29807: specified operator does not exist
    TCP/IP网络编程的几个网站
    漂在等待离职的日子(三)
    入职第一天
    入职一周
    漂在等待离职的日子(八)
  • 原文地址:https://www.cnblogs.com/lemonu/p/9768834.html
Copyright © 2011-2022 走看看