分布式文件系统HDFS中对文件/目录的相关操作代码,整理了一下,大概包括以下部分:
- 文件夹的新建、删除、重命名
- 文件夹中子文件和目录的统计
- 文件的新建及显示文件内容
- 文件在local和remote间的相互复制
- 定位文件在HDFS中的位置,以及副本存放的主机
- HDFS资源使用情况
1. 新建文件夹
public void mkdirs(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); if (!fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); }
2. 删除文件夹
public void rmr(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); }
3. 文件重命名
public void rename(String src, String dst) throws IOException { Path name1 = new Path(src); Path name2 = new Path(dst); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.rename(name1, name2); System.out.println("Rename: from " + src + " to " + dst); fs.close(); }
4. 列出文件夹中的子文件及目录
public void ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("=========================================================="); for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d ", f.getPath(), f.isDirectory(), f.getLen()); } System.out.println("=========================================================="); fs.close(); }
5. 创建文件,并添加内容
public void createFile(String file, String content) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); byte[] buff = content.getBytes(); FSDataOutputStream os = null; try { os = fs.create(new Path(file)); os.write(buff, 0, buff.length); System.out.println("Create: " + file); } finally { if (os != null) os.close(); } fs.close(); }
6. 将local数据复制到remote
public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); }
7. 将remote数据下载到local
public void download(String remote, String local) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyToLocalFile(path, new Path(local)); System.out.println("download: from" + remote + " to " + local); fs.close(); }
8. 显示文件内容
public String cat(String remoteFile) throws IOException { Path path = new Path(remoteFile); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FSDataInputStream fsdis = null; System.out.println("cat: " + remoteFile); OutputStream baos = new ByteArrayOutputStream(); String str = null; try { fsdis = fs.open(path); IOUtils.copyBytes(fsdis, baos, 4096, false); str = baos.toString(); } finally { IOUtils.closeStream(fsdis); fs.close(); } System.out.println(str); return str; }
9. 定位一个文件在HDFS中存储的位置,以及多个副本存储在集群哪些节点上
public void location() throws IOException { String folder = hdfsPath + "create/"; String file = "t2.txt"; FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration()); FileStatus f = fs.getFileStatus(new Path(folder + file)); BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen()); System.out.println("File Location: " + folder + file); for (BlockLocation bl : list) { String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println("host:" + host); } } fs.close(); }
10. 获取HDFS集群存储资源使用情况
public void getTotalCapacity() { try { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FsStatus fsStatus = fs.getStatus(); System.out.println("总容量:" + fsStatus.getCapacity()); System.out.println("使用容量:" + fsStatus.getUsed()); System.out.println("剩余容量:" + fsStatus.getRemaining()); } catch (IOException e) { e.printStackTrace(); } }
完整代码
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; /* * HDFS工具类 * */ public class Hdfs { private static final String HDFS = "hdfs://10.20.14.47:8020/"; public Hdfs(Configuration conf) { this(HDFS, conf); } public Hdfs(String hdfs, Configuration conf) { this.hdfsPath = hdfs; this.conf = conf; } private String hdfsPath; private Configuration conf; public static void main(String[] args) throws IOException { JobConf conf = config(); Hdfs hdfs = new Hdfs(conf); hdfs.createFile("/create/t2.txt", "12"); hdfs.location(); } public static JobConf config() { JobConf conf = new JobConf(Hdfs.class); conf.setJobName("HdfsDAO"); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); return conf; } /* * 创建文件夹 */ public void mkdirs(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); if (!fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); } /* * 删除文件夹 */ public void rmr(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); } /* * 文件重命名 */ public void rename(String src, String dst) throws IOException { Path name1 = new Path(src); Path name2 = new Path(dst); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.rename(name1, name2); System.out.println("Rename: from " + src + " to " + dst); fs.close(); } /* * 列出文件夹中的子文件及目录 */ public void ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("=========================================================="); for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d ", f.getPath(), f.isDirectory(), f.getLen()); } System.out.println("=========================================================="); fs.close(); } /* * 创建文件,并添加内容 */ public void createFile(String file, String content) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); byte[] buff = content.getBytes(); FSDataOutputStream os = null; try { os = fs.create(new Path(file)); os.write(buff, 0, buff.length); System.out.println("Create: " + file); } finally { if (os != null) os.close(); } fs.close(); } /* * 将local的数据复制到remote */ public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); } /* * 将remote数据下载到local */ public void download(String remote, String local) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyToLocalFile(path, new Path(local)); System.out.println("download: from" + remote + " to " + local); fs.close(); } /* * 显示文件内容 */ public String cat(String remoteFile) throws IOException { Path path = new Path(remoteFile); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FSDataInputStream fsdis = null; System.out.println("cat: " + remoteFile); OutputStream baos = new ByteArrayOutputStream(); String str = null; try { fsdis = fs.open(path); IOUtils.copyBytes(fsdis, baos, 4096, false); str = baos.toString(); } finally { IOUtils.closeStream(fsdis); fs.close(); } System.out.println(str); return str; } /* * 定位一个文件在HDFS中存储的位置,以及多个副本存储在集群哪些节点上 */ public void location() throws IOException { String folder = hdfsPath + "create/"; String file = "t2.txt"; FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration()); FileStatus f = fs.getFileStatus(new Path(folder + file)); BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen()); System.out.println("File Location: " + folder + file); for (BlockLocation bl : list) { String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println("host:" + host); } } fs.close(); } /* * 获取HDFS资源使用情况 */ public void getTotalCapacity() { try { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FsStatus fsStatus = fs.getStatus(); System.out.println("总容量:" + fsStatus.getCapacity()); System.out.println("使用容量:" + fsStatus.getUsed()); System.out.println("剩余容量:" + fsStatus.getRemaining()); } catch (IOException e) { e.printStackTrace(); } } /* * 获取某文件中包含的目录数,文件数,及占用空间大小 */ public void getContentSummary(String path) { ContentSummary cs = null; try { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); cs = fs.getContentSummary(new Path(path)); } catch (Exception e) { e.printStackTrace(); } // 目录数 Long directoryCount = cs.getDirectoryCount(); // 文件数 Long fileCount = cs.getFileCount(); // 占用空间 Long length = cs.getLength(); System.out.println("目录数:" + directoryCount); System.out.println("文件数:" + fileCount); System.out.println("占用空间:" + length); } }