HDFS文件的基本操作:
package wjn; import java.io.BufferedInputStream; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.io.IOUtils; import org.apache.log4j.BasicConfigurator; public class ww { public static Scanner sc = new Scanner(System.in); /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { // TODO Auto-generated method stub main9(); } public static void main10() throws IOException{ String p1 = "/user/hadoop/text/text1"; String p2 = "/user/hadoop"; if(mv(p1,p2)){ System.out.println("文件移动成功"); }else{ System.out.println("文件移动失败"); } } public static void main9() throws IOException{ String hdfspath = "/user/hadoop/text2"; if(deleteFileFromHDFS(hdfspath)){ System.out.println("已删除文件"+hdfspath); }else{ System.out.println("删除文件失败"); } } public static void main7() throws IOException{ String hdfspath1 = "/user/hadoop"; boolean forcedelete = false; if(!ifex(hdfspath1)){ mkdir(hdfspath1); System.out.println("创建目录"+hdfspath1); }else{ if(isempty(hdfspath1)||forcedelete){ rmDir(hdfspath1); System.out.println("删除目录"+hdfspath1); }else{ System.out.println("目录不为空,不删除"); } } } public static void main6() throws IOException{ String hdfspath = "/user/hadoop/text2"; String hdfspath1 = "/user/hadoop"; if(ifex(hdfspath)){ deleteFileFromHDFS(hdfspath); System.out.println("该路径存在,删除路径"+hdfspath); }else{ if(!ifex(hdfspath1)){ mkdir(hdfspath1); System.out.println("创建文件夹"+hdfspath1); } touchz(hdfspath); System.out.println("创建路径"+hdfspath); } } public static void main5() throws IOException{ String hdfspath = "/user/hadoop"; System.out.println("所有文件信息如下"); lsDir(hdfspath); } public static void main4() throws IOException{ String hdfspath = "/user/hadoop/text2"; System.out.println("文件信息如下"); ls(hdfspath); } public static void main3() throws IOException{ String hdfspath = "/user/hadoop/text2"; cat(hdfspath); System.out.println("读取完成"); } public static void main2() throws IOException{ String localpath = "/home/hadoop/1234.txt"; String hdfspath = "/user/hadoop/text2"; download(hdfspath,localpath); System.out.println("文件下载成功"); } public static void main1() throws IOException{ String localpath = "/home/hadoop/123.txt"; String hdfspath = "/user/hadoop/text2"; if(ifex(hdfspath)){ System.out.println("文件存在,请选择追加(1)还是覆盖(2)"); int i = sc.nextInt(); if(i==1){ appendFileToHDFS(hdfspath,localpath); System.out.println("文件追加成功"); }else if(i==2){ deleteFileFromHDFS(hdfspath); update(localpath,hdfspath); System.out.println("文件覆盖成功"); }else{ System.out.println("输入有误"); } }else{ update(localpath,hdfspath); System.out.println("文件不存在,上传成功"); } } public static void update(String localpath , String hdfspath) throws IOException{ InputStream in = new BufferedInputStream(new FileInputStream(localpath)); FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration()); OutputStream out = fileSystem.create(new Path(hdfspath)); IOUtils.copyBytes(in, out, 4096, true); fileSystem.close(); } //判断hdfs中文件是否存在 public static boolean ifex(String hdfspath) throws IOException{ Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000"); conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fs = FileSystem.get(conf); if (fs.exists(new Path(hdfspath))){ fs.close(); return true; }else{ fs.close(); return false; } } //创建目录 public static void mkdir(String hdfspath) throws IOException{ FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration()); Path dirPath = new Path(hdfspath); fileSystem.mkdirs(dirPath); fileSystem.close(); } //创建文件 public static void touchz(String hdfspath) throws IOException{ FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration()); Path dirPath = new Path(hdfspath); FSDataOutputStream outputStream = fileSystem.create(dirPath); outputStream.close(); fileSystem.close(); } public static void appendFileToHDFS(String hdfsPath, String localFilePath) throws IOException { Configuration config = new Configuration(); config.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); config.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true"); FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), config); InputStream in = new BufferedInputStream(new FileInputStream(localFilePath)); FSDataOutputStream out = fileSystem.append(new Path(hdfsPath)); IOUtils.copyBytes(in, out, 4096, true); fileSystem.close(); } //删除文件 public static boolean deleteFileFromHDFS(String hdfsPath) throws IOException { FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration()); boolean result = fileSystem.deleteOnExit(new Path(hdfsPath)); fileSystem.close(); return result; } //删除目录 public static void rmDir(String hdfspath) throws IOException{ FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration()); fileSystem.delete(new Path(hdfspath),true); fileSystem.close(); } //判断目录是否为空 public static boolean isempty(String hdfspath) throws IOException{ FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration()); RemoteIterator<LocatedFileStatus> remoteIterator = fileSystem.listFiles(new Path(hdfspath), true); return !remoteIterator.hasNext(); } public static void download (String hdfsPath, String localPath) throws IOException { FileSystem fileSystem = FileSystem.get(URI.create(hdfsPath), new Configuration()); FSDataInputStream in = fileSystem.open(new Path(hdfsPath)); OutputStream out = new FileOutputStream(localPath); IOUtils.copyBytes(in, out, 4096, true); fileSystem.close(); } //根据hdfs路径输出其内容到终端 public static void cat(String hdfspath) throws IOException{ FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration()); FSDataInputStream in = fileSystem.open(new Path(hdfspath)); BufferedReader d = new BufferedReader(new InputStreamReader(in)); String line = null; while((line = d.readLine())!=null){ System.out.println(line); } d.close(); in.close(); } //显示hdfs中指定文件的信息 public static void ls(String hdfspath) throws IOException{ FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration()); Path path = new Path(hdfspath); FileStatus[] fileStatus = fileSystem.listStatus(path); for (FileStatus s : fileStatus){ System.out.println("路径:"+s.getPath().toString()); System.out.println("权限:"+s.getPermission().toString()); System.out.println("大小:"+s.getLen()); Long time = s.getModificationTime(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String date = format.format(time); System.out.println("时间:"+date); } } //显示文件夹下所有文件的信息 public static void lsDir(String hdfspath) throws IOException{ FileSystem fileSystem = FileSystem.get(URI.create(hdfspath), new Configuration()); Path dirPath = new Path(hdfspath); RemoteIterator<LocatedFileStatus> remoteIterator = fileSystem.listFiles(dirPath, true); while(remoteIterator.hasNext()){ FileStatus s = remoteIterator.next(); System.out.println("路径:"+s.getPath().toString()); System.out.println("权限:"+s.getPermission().toString()); System.out.println("大小:"+s.getLen()); Long time = s.getModificationTime(); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String date = format.format(time); System.out.println("时间:"+date); System.out.println(); } fileSystem.close(); } //移动文件 public static boolean mv(String path1,String path2) throws IOException{ FileSystem fileSystem = FileSystem.get(URI.create(path1), new Configuration()); Path p1 = new Path(path1); Path p2 = new Path(path2); boolean result = fileSystem.rename(p1, p2); fileSystem.close(); return result; } }