zoukankan      html  css  js  c++  java
  • HDFS FileSystem(二) 读写操作

    1. 读取HDFS文件

    1.1 hdfs文件字节数组读取

     public void readFileByAPI() throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.default.name", "hdfs://192.168.8.156:9000/");
            FileSystem fileSystem = FileSystem.get(conf);
            Path path = new Path("/user/compass/readme.txt");
     
            FSDataInputStream fsDataInputStream = fileSystem.open(path);
            byte[] bytes = new byte[1024];
            int len = -1;
            ByteArrayOutputStream stream = new ByteArrayOutputStream();
     
            while ((len = fsDataInputStream.read(bytes)) != -1) {
                stream.write(bytes, 0, len);
            }
            fsDataInputStream.close();
            stream.close();
            System.out.println(new String(stream.toByteArray()));
     
        }

    1.2 下载hdfs文件到本地

    public void getFile(String dst, String local) {
         try {
             if (!filesystem.exists(new Path(dst))) {
                 System.out.println("文件不存在!");
             } else {
                 filesystem.copyToLocalFile(new Path(dst), new Path(local));
                 System.out.println("下载成功!");
             }
         } catch (IOException e) {
             System.out.println("下载失败!");
             e.printStackTrace();
         }
     }

    2. 写入HDFS文件

    2.1 将字符串写入hdfs新文件中

    def wirteToHdfs(hdfs: FileSystem, json: String, blockPath: String): Unit = {
        val path = new Path(blockPath)
        val fileOutputStream: FSDataOutputStream = hdfs.create(path)
        fileOutputStream.writeBytes(json)
        if (fileOutputStream != null) {
          fileOutputStream.close()
        }
      }

    2.2 上传本地到文件hdfs

    
    
    @Test
        public void copyFromLocalFile() throws Exception{
            Path localpath = new Path("/home/pumbaa/Downloads/Anaconda3-5.0.1-Linux-x86_64.sh");
            Path destpath = new Path("/hdfsapi/test");
            fileSystem.copyFromLocalFile(localpath, destpath);
        }
     
    @Test
        public void copyFromLocalBigFile() throws Exception{
            Path localpath = new Path("/home/pumbaa/Downloads/Anaconda3-5.0.1-Linux-x86_64.sh");
            Path destpath = new Path("/hdfsapi/test");
            fileSystem.copyFromLocalFile(localpath, destpath);
            InputStream in = new BufferedInputStream(
              new FileInputStream(
                      new File("/home/pumbaa/Downloads/ideaIU-2017.3.4.tar.gz")));
    
            FSDataOutputStream output = fileSystem.create(new Path("/hdfsapi/test/ideaIU-2017.3.4.tar.gz"),
                    new Progressable() {
                        public void progress() {
                            System.out.print("*");
                        }
                    });
            IOUtils.copyBytes(in, output, 4096);
        }

    3.文件操作

    3.1创建目录

    /**
     * 创建目录
     * @param path 创建目录的地址(例:/hadoop/)
     * @throws IOException
     */
    public void mkdir(String path) throws IOException {
        //创建hdfs目录
        if(filesystem.exists(new Path(path)))
        {
            System.out.println("目录已存在");
        }
        else
        {
            boolean result=filesystem.mkdirs(new Path(path));
            System.out.println(result);
        }
    }

    3.2创建文件

    /**
     * 创建文件
     * @param path hdfs文件地址(例:/hadoop/abc.txt)
     * @throws IOException
     */
    public  void create(String path) throws IOException{
        //创建文件
        if(filesystem.exists(new Path(path)))
        {
            System.out.println("文件已存在");
        }
        else
        {
            FSDataOutputStream outputStream=  filesystem.create(new Path(path));
            System.out.println("文件创建成功");
        }
    }

    3.3查看文件内容

    /**
     * 查看文件内容
     * @param dst hdfs文件地址(例:/hadoop/abc.txt)
     * @throws IOException
     */
    public void read(String dst) throws IOException {
        if(filesystem.exists(new Path(dst)))
        {
            FSDataInputStream inputstream=filesystem.open(new Path(dst));
            InputStreamReader isr=new InputStreamReader(inputstream);
            BufferedReader br=new BufferedReader(isr);
            String str=br.readLine();
            while(str!=null){
                System.out.println(str);
                str=br.readLine();
            }
            br.close();
            isr.close();
            inputstream.close();
        }
       else
        {
            System.out.println("文件不存在");
        }
    }

    3.4文件重命名,移动文件

    /**
     * 将dst1重命名为dst2,也可以进行文件的移动
     * @param oldpath 旧名
     * @param newpath 新名
     */
    public void moveFile(String oldpath, String newpath) {
        Path path1 = new Path(oldpath);
        Path path2 = new Path(newpath);
        try {
            if (!filesystem.exists(path1)) {
                System.out.println(oldpath + " 文件不存在!");
                return;
            }
            if (filesystem.exists(path2)) {
                System.out.println(newpath + "已存在!");
                return;
            }
            // 将文件进行重命名,可以起到移动文件的作用
            filesystem.rename(path1, path2);
            System.out.println("文件已重命名!");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    3.5显示目录下所有文件

    /**
     * 显示目录下所有文件
     * @param dst
     */
    public void listStatus(String dst) {
        try {
            if (!filesystem.exists(new Path(dst))) {
                System.out.println("目录不存在!");
                return;
            }
            // 得到文件的状态
            FileStatus[] status = filesystem.listStatus(new Path(dst));
            for (FileStatus s : status) {
                System.out.println(s.getPath().getName());
            }
    
        } catch (IllegalArgumentException | IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    3.6删除hdfs文件

    /**
     * 删除hdfs中的文件
     * @param dst
     */
    public void deleteFile(String dst) {
        try {
            if (!filesystem.exists(new Path(dst))) {
                System.out.println("文件不存在!");
            } else {
                filesystem.delete(new Path(dst), true);
                System.out.println("删除成功!");
            }
        } catch (IOException e) {
            System.out.println("删除失败!");
            e.printStackTrace();
        }
    }
  • 相关阅读:
    sqlserver中递归写法
    keytools命令生成证书
    java中sql语句快速处理
    select * 替换写法
    oracle行转列
    oracle中查看当前用户的表结构、主键、索引
    Servlet三种实现方式
    【python之旅】python的面向对象
    【python之旅】python的模块
    【python之旅】python的基础三
  • 原文地址:https://www.cnblogs.com/chong-zuo3322/p/13181069.html
Copyright © 2011-2022 走看看