zoukankan      html  css  js  c++  java
  • Java API操作 上传文件

    因为Hadoop中关于文件操作类基本上都在“org.apache.hadoop.fs”包中,这些API的主要作用主要体现在以下操作上:打开文件、读写文件、删除文件。并且,Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是一个抽象类,只能通过get方法得到。
    
    下面,笔者就逐一的对这11个比较常用的API进行讲解。
    
    1.上传本地文件
    通过“FileSystem.copyFromLocalFile(Path src,Path dst)”可以将本地文件上传到HDFS的指定位置上。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class Upload_Files {
     public static void main(String[] args) throws IOException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      conf.set("fs.defaultFS", "hdfs://hdp02:9000");
      //2.获取hdfs的操作对象,得到一个FileSystem对象
      FileSystem fs=FileSystem.get(conf);
      //3.创建源目的文件路径和文件上传操作
      Path src=new Path("D:\hdfs.txt");
      Path dst=new Path("/");
      fs.copyFromLocalFile(src, dst);
      //4.关闭流
      fs.close();
      System.out.println("文件上传成功!");
    
          }
     }
    
    2.创建HDFS目录
    通过“fs.mkdirs”方法进行目录的创建。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class mkdir_list {
     public static void main(String[] args) throws IOException {
        //1.加载hdfs的配置信息
         Configuration conf=new Configuration();
         conf.set("fs.defaultFS", "hdfs://hdp02:9000");
        //2..获取hdfs的操作对象
         FileSystem fs=FileSystem.get(conf);
        //3.创建目录
         Path path=new Path("/list");
         fs.mkdirs(path);
        //4.关闭流
         fs.close();
         System.out.println("目录创建成功!");
    
           }
      }
    
    3.写文件
    通过“writeUTF()”方法可以实现对指定文件进行写操作。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Hdfs;
    import org.apache.hadoop.fs.Path;
    
    public class Write {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp02:9000"), conf, "hdp02");
      //3.文件路径
      Path File=new Path("hdfs://hdp02:9000/test/cao.txt");
      //4.创建FSDataOutputStream对象
      FSDataOutputStream out=fs.create(File);
      //5.写入数据
      out.writeUTF("Hello world!");
      out.close();
      System.out.println("数据写入成功!");
        }
     }
    
    4.读文件
    通过“ReadUTF()”方法可以实现对指定文件进行读操作。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class Read {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp01:9000"), conf, "hdp02");
      //3.读取文件的路径
      Path File=new Path("hdfs://hdp01:9000/test/cao.txt");
      //4.创建FSDataInputStream对象
      FSDataInputStream in=fs.open(File);
      String info = in.readUTF();
      System.out.println("数据读取成功!");
      System.out.println(info);
      in.close();
          }
     }
    
    5.重命名
    通过“fs.rename()”方法可以实现对指定文件进行重命名。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class ChangeName {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp01:9000"), conf, "hdp02");
      //3.对文件名进行操作
      Path old=new Path("/test/fur.txt");
      Path now=new Path("/test/fur01.txt");
      //4.调用hdfs的rename重命名方法,返回值为boolean类型
      boolean isRename=fs.rename(old, now);
      System.out.println("重命名成功!");
           }
     }
    
    6.删除文件
    通过“fs.delete()”方法可以实现对指定文件进行删除。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class DeleteFile {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp01:9000"), conf, "hdp02");
      //3.将要删除的文件路径
      String File="/test/cao.txt";
      //4.删除文件
      fs.delete(new Path(File), true);
      fs.close();
      System.out.println("文件删除成功!");
          }
     }
    
    7.删除目录
    通过“fs.delete()”方法可以实现对目录的(递归)删除。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class DeleteList {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp01:9000"), conf, "hdp02");
      //3.将要删除的目录路径
      Path list=new Path("/list");
      //4.递归删除
      boolean isDelete=fs.delete(list, true);
      fs.close();
      System.out.println("目录删除成功!");
          }
     }
    
    8.读取某个目录下的所有文件
    通过“FileStatus.getPath()”方法可以查看指定目录下的所有文件。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class CatFiles {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp01:9000"), conf, "hdp02");
      //3.创建要读取的文件路径
      Path listf=new Path("/tmp");
      //4.创建FileStatus对象,调用listStatus方法
      FileStatus stats[]=fs.listStatus(listf);
      for(int i=0;i<stats.length;i++){
       System.out.println(stats[i].getPath().toString());
      }
      fs.close();
      System.out.println("该目录下的文件查找完毕!");
          }
    }
    9.查看文件是否存在
    通过“FileSystem.exists(Path f)”可以查看指定文件是否存在。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class ifExists {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp01:9000"), conf, "hdp02");
      //3.要查找的文件路径
      Path File=new Path("/test/fur01.txt");
      //4.调用exists方法
      boolean isExists=fs.exists(File);
      System.out.println(isExists);
           }
      }
    
    10.查看文件最后修改时间
    通过“FileSystem.getModificationTime()”可以查看制定文件的最会修改时间(注意:这里的时间是以时间戳的形式显示的)。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class Modify_Time {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp01:9000"), conf, "hdp02");
      //3.指定文件的路径
      Path File=new Path("/test/fur01.txt");
      //4.创建FileStatus对象,调用listStatus方法
      FileStatus filestatus=fs.getFileStatus(File);
      //5.调用getModificationTime方法
      long time = filestatus.getModificationTime();
      System.out.println(time);
      fs.close();
            }
      }
    
    11.查看某个文件在HDFS集群中的位置
    通过“FileSystem.getFileBlockLocation(FileStatus file,long start,long len)”可以查看指定文件所在位置。具体代码如下:
    
    package HDFS.learnself;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    
    public class WhereFile {
     public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
      //1.加载hdfs的配置文件
      Configuration conf=new Configuration();
      //2.获取hdfs的操作对象
      FileSystem fs=FileSystem.get(new URI("hdfs://hdp01:9000"), conf, "hdp02");
      //3.需要查找文件的路径
      Path File=new Path("/test/fur01.txt");
      //4.创建FileStatus对象,调用getFileStatus方法
      FileStatus filestatus=fs.getFileStatus(File);
      //5.创建一个BlockLocation数组对象,调用getFileBlockLocations方法
      BlockLocation [] blkLocations=fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
      int blockLen=blkLocations.length;
      for(int i=0;i<blockLen;i++){
       String [] hosts=blkLocations[i].getHosts();
       System.out.println("blockID:"+i+"
    "+"location:"+hosts[0]);
          }
     }
    
    
    说明:
    1.在最开始的两个API(文件上传和创建目录)的时候,采用的是conf.set()的方法,但在后面的API操作中,统一的改为了FileSystem.get(uri,conf,user)的形式进行HDFS的配置的。主要是因为按照第一种方式,经常容易出现权限不足的异常,而采用后面的格式,一来防止出现权限异常问题,而来也改进了代码冗余问题。(推荐读者使用后面的编写方式)
    2.在FileSystem.get(uri,conf,user)中指定URI的时候,既写了“hdfs://hdp02:9000”,也写了“hdfs://hdp01:9000”。主要是因为搭建的是高可用集群,有两个主节点(hdp01和hdp02),而不巧的是,在编程的过程中,开始的hdp02主节点宕掉了,所以后面处于Active状态的是hdp01主节点。(请读者根据自己的实际情况编写)
    3.在FileSystem.get(uri,conf,user)中user是指集群登录的用户名。
    View Code

    Java API操作 上传文件

    package com.hdfs;
    
    
    
    import java.net.URI;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.junit.Before;
    import org.junit.Test;
    
    
    
    public class HdfsTest2 {
        
        //获取 FileSystem 对象
        private FileSystem fs=null;
        @SuppressWarnings("unused")
        private List<String> hdfsPathsLists;
        
        
       @Before
       
     
        public void init() throws Exception {
            Configuration con = new Configuration();
            con.setBoolean("dfs.support.append",true);
            fs = FileSystem.get(new URI("hdfs://192.168.119.128:9000"),
                    con,"root");
            
        }
        @Test
       public void testUploadFile() throws Exception{
            String src = "/home/cai/123.cfg";
            String hdfsDst="/javaApi/mk/dir1";
            fs.copyFromLocalFile(new Path(src),new Path(hdfsDst));
            System.out.println("upload success");
        }
    }
  • 相关阅读:
    mysql的root用户无法建库的问题
    jmeter连库获取sql结果存为参数给下一接口用
    webservice接口问题
    今日杂记-20190623
    接口测试
    pyenv和virtualenv管理python的版本(多个版本同时用)
    安装禅道
    经典题,类中的变量
    CSS简介
    python单元测试unittest
  • 原文地址:https://www.cnblogs.com/cainiao-chuanqi/p/12795793.html
Copyright © 2011-2022 走看看