zoukankan      html  css  js  c++  java
  • HDFSjava API实验

    package com.gw;

    import java.io.IOException;
    import java.io.InputStream;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FsStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hdfs.DistributedFileSystem;
    import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
    import org.apache.hadoop.io.IOUtils;

    /**
     * @author wangweifeng
     * @dercription: 实现往HDFS中新建文件夹、删除文件夹、创建文件、删除文件 获取HDFS集群上所有节点名称信息
     *               、文件重命名、读取HDFS文件中的内容、 上传本地文件到HDFS中、下载HDFS中的文件本地文件系统、列出目录下所有文件
     *               (如果是目录则显示,层次显示目录内的文件)
     */

    public class HdfsUtil {
        
        // 初始化配置参数
        static Configuration conf = new Configuration();
        static {
            String path = "/home/Hadoop/hadoop/etc/hadoop/";
            conf.addResource(new Path(path + "core-site.xml"));
            conf.addResource(new Path(path + "hdfs-site.xml"));
            conf.addResource(new Path(path + "mapred-site.xml"));
        }
        //获取FileSystem
        public static FileSystem getFs() throws IOException {
            FileSystem fs = FileSystem.get(conf);
            return fs;
        }
        
        

        // 实现往HDFS中新建文件夹
        public static void mkDir(String path) throws IOException {
            FileSystem fs = getFs();
            Path srcPath = new Path(path);
            boolean isok = fs.mkdirs(srcPath);
            if (isok) {
                System.out.println("create dir Ok!");
            } else {
                System.out.println("create dir failure!");
            }
            fs.close();
        }

        // 删除HDFS中的文件夹或是文件
        public static void delete(String path) throws IOException {
            FileSystem fs = getFs();
            Path srcPath = new Path(path);
            boolean isok = fs.delete(srcPath, true);
            if (isok) {
                System.out.println("delete ok!");
            } else {
                System.out.println("delete failure!");
            }
            fs.close();
        }

        // 在HDFS中创建文件并写入内容
        public static void createFile(String path, byte[] contents)
                throws IOException {
            FileSystem fs = getFs();
            Path dstPath = new Path(path); // 目标路径

            // 打开一个输出流
            FSDataOutputStream outputStream = fs.create(dstPath);
            outputStream.write(contents);
            outputStream.close();
            fs.close();
            System.out.println("file is created sucess!");

        }

        // 在HDFS中创建文件并写入内容
        public static void createFile(String path, String contents)
                throws IOException {
            createFile(path, contents.getBytes("UTF-8"));
        }

        // 对HDFS中的文件重命名
        public static void reNameFile(String oldName, String newName)
                throws IOException {
            FileSystem fs = getFs();
            Path oldNamePath = new Path(oldName);
            Path newNamePath = new Path(newName);
            boolean isok = fs.rename(oldNamePath, newNamePath);
            if (isok) {
                System.out.println("rename ok!");
            } else {
                System.out.println("rename failure");
            }
            fs.close();
        }

        // 读取HDFS中的文件内容并打印到标准输出
        public static void readFilePrint(String path) throws IOException {
            FileSystem fs = getFs();
            Path srcPath = new Path(path);
            // 打开一个输入流
            InputStream in = fs.open(srcPath);
            try {
                fs.open(srcPath);
                IOUtils.copyBytes(in, System.out, 4096, false); // 复制到标准输出流
            } finally {
                IOUtils.closeStream(in);
            }
        }

        // 读取文件内容
        public static byte[] readFile(String path) throws IOException {
            FileSystem fs = getFs();
            if (isExist(path)) {
                Path srcPath = new Path(path);
                FSDataInputStream is = fs.open(srcPath);
                FileStatus stat = fs.getFileStatus(srcPath);
                byte[] buffer = new byte[(int) stat.getLen()];
                is.readFully(0, buffer);
                is.close();
                fs.close();
                return buffer;
            } else {
                throw new IOException("the file is not found .");
            }
        }

        // 上传
        public static void upLoadFromLoacl(String HDFS_Path, String Loacl_Path)
                throws IOException {
            FileSystem fs = getFs();
            Path srcPath = new Path(Loacl_Path);
            Path desPath = new Path(HDFS_Path);
            fs.copyFromLocalFile(srcPath, desPath);

            // 打印文件路径
            System.out.println("Upload to " + conf.get("fs.default.name"));
            System.out.println("------------list files------------" + " ");

            FileStatus[] fileStatus = fs.listStatus(desPath);
            for (FileStatus status : fileStatus) {
                System.out.println(status.getPath());
            }
            fs.close();
        }

        // 下载
        public static void downLoadToLoacl(String HDFS_Path, String Loacl_Path)
                throws IOException {
            FileSystem fs = getFs();
            Path srcPath = new Path(HDFS_Path);
            Path desPath = new Path(Loacl_Path);
            fs.copyToLocalFile(srcPath, desPath);
            fs.close();
            System.out.println("download to sucess! ");
        }

        // 判断文件或文件夹是否存在,如果存在则返回true,否在返回false
        public static boolean isExist(String path) throws IOException {
            FileSystem fs = getFs();
            Path srcPath = new Path(path);
            boolean isExist = false;
            if (fs.isDirectory(srcPath)) {
                isExist = true;
            } else if (fs.isFile(srcPath)) {
                isExist = true;
            }
            return isExist;
        }

        // 获取集群中所有数据节点的状态
        public static void getDateNodeInfo() throws IOException {
            FileSystem fs = getFs();
            DistributedFileSystem hdfs = (DistributedFileSystem) fs;

            DatanodeInfo[] dataNodeInfo = hdfs.getDataNodeStats();
            for (int i = 0; i < dataNodeInfo.length; i++) {
                System.out.println("DataNode_" + i + "_Name:"
                        + dataNodeInfo[i].getHostName() + "DataNode_" + i + "Ip:"
                        + dataNodeInfo[i].getInfoAddr());
            }
        }

        public static void main(String[] args) throws IOException {
            System.out.println("........开始测试HDFS......");

            // 1、创建一个文件夹命名为HDTest2,并在该文件夹中创建一个文件夹test以便测试删除
            //mkDir("/HDTest2");
            //mkDir("/HDTest2/test");
            
            // 2、删除/HDTest2/test
            //delete("/HDTest2/test");

            // 3、在HDFS中创建一个文件,并写入"helloworld"
            //createFile("/HDTest2/helloworld.txt", "helloworld of HDFS!");

            // 4、将/HDTest2/helloworld.txt 命名为/HDTest2/helloworld.ini
            //reNameFile("/HDTest2/helloworld.txt", "/HDTest2/helloworld.ini");

            // 5、将文件内容打印出来
         //readFilePrint("/HDTest2/helloworld.ini");

            // 6、将文件内容读出来,并创建 /HDTest2/test.txt文件
            //createFile("/HDTest2/test.txt", readFile("/HDTest2/helloworld.ini"));

            // 7、将本机桌面my.cnf文件上传到/HDTest2
            //upLoadFromLoacl("/HDTest2/", "/home/Hadoop/Desktop/my.cnf");

            // 8、将/HDTest2/test.txt下载到桌面
            //downLoadToLoacl("/HDTest2/test.txt", "/home/Hadoop/Desktop");

            // 9、打印DataNode的信息
            //getDateNodeInfo();

            
        }
            
    }

  • 相关阅读:
    网站添加share.js一键分享
    tp5利用phpExecl导出
    项目可能需要用到的公共方法
    拖拽文件上传
    推荐Alipay和Watch 支付 yansongda SDK
    在画布中添加二维码加文字 和 压缩多图片到一个压缩包中
    redis使用
    微信公众号网页授权登录
    第三方登入及详细操作
    订单并发问题及解决方案
  • 原文地址:https://www.cnblogs.com/richelle009/p/4456029.html
Copyright © 2011-2022 走看看