package study.bigdata.hdfs; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.ArrayList; import java.util.List; /** * Created by lq on 2017/7/5. */ public class HDFSUtils { //mmini1 is cluster //public static String hdfsURI="hdfs://mini1:9000"; //public static String localURI="D:/test/hadoop-2.6.5/hadoopData"; //public static String uri="D:/test/hadoop-2.6.5"; //public static String URI="D:/test/hadoop-2.6.5/hadoopData"; public static String URI="hdfs://mini1:9000"; /** * 获取配置 * @return */ public static Configuration getConf(){ Configuration conf = new Configuration(); conf.set("fs.defaultFS",URI); //conf.set("hadoop.home.dir","/home/hadoop/apps/hadoop-2.6.5/tmp"); //conf.set("hadoop.home.dir","D:/test/hadoop-2.6.5"); System.setProperty("HADOOP_USER_NAME","hadoop"); return conf; } /** * 创建文件夹 * @param dir 文件夹全路径 * @return * @throws IOException */ public static boolean mkdir(String dir) throws IOException { if(StringUtils.isBlank(dir)){ return false; } FileSystem fs = FileSystem.get(getConf()); if(!fs.exists(new Path(dir))){ return fs.mkdirs(new Path(dir)); } fs.close(); return false; } /** * 删除文件夹 * @param dir 文件夹全路径 * @return * @throws IOException */ public static boolean deleteDir(String dir) throws IOException { if (StringUtils.isBlank(dir)) return false; FileSystem fs = FileSystem.get(getConf()); boolean res = fs.delete(new Path(dir),true); fs.close(); return res; } /** * 查询指定路径下的文件夹或者文件 * @param dir 指定全路径 * @return * @throws IOException */ public static List<String> listAll(String dir) throws IOException { List<String> names = new ArrayList<String>(); if(StringUtils.isBlank(dir)){ return names; } FileSystem fs = FileSystem.get(getConf()); FileStatus[] statuses = fs.listStatus(new Path(dir)); for(int i=0;i<statuses.length;i++){ if(statuses[i].isFile()){ //regular file names.add(statuses[i].getPath().toString()); } else if(statuses[i].isDirectory()){ //dir names.add(statuses[i].getPath().toString()); }else if(statuses[i].isSymlink()){ //is a sysm link in linux names.add(statuses[i].getPath().toString()); } } System.out.println(names); fs.close(); return names; } /** * 从本地复制到hdfs上 * @param localFile 本地文件路径 * @param hdfsFile hdfs文件路径 * @return * @throws IOException */ public static boolean copyFromLocalFile(String localFile,String hdfsFile) throws IOException { if(StringUtils.isBlank(localFile) || StringUtils.isBlank(hdfsFile)){ return false; } FileSystem hdfs = FileSystem.get(getConf()); Path src = new Path(localFile); Path dst = new Path(hdfsFile); hdfs.copyFromLocalFile(src,dst); hdfs.close(); return true; } /** * 在hdfs上创建一个文件 * @param newFile 新文件 * @param content 内容 * @return * @throws IOException */ public static boolean create(String newFile,String content) throws IOException { //if(StringUtils.isBlank(newFile) || StringUtils.isBlank(content)){ if(StringUtils.isBlank(newFile) || content==null){ return false; } FileSystem hdfs = FileSystem.get(getConf()); FSDataOutputStream os = hdfs.create(new Path(newFile)); os.write(content.getBytes("UTF-8")); os.close(); hdfs.close(); return true; } /** * 删除hdfs 上的文件 * @param hdfsFile * @param recursive true 递归删除 false,只删除当前的 * @return * @throws IOException */ public static boolean delete(String hdfsFile,boolean recursive) throws IOException { if(StringUtils.isBlank(hdfsFile)) { return false; } FileSystem hdfs = FileSystem.get(getConf()); //recursive boolean res = hdfs.delete(new Path(hdfsFile),recursive); hdfs.close(); return res; } /** * 读取hdfs文件 * @param hdfsFile * @return * @throws Exception */ public static byte[] readHDFS(String hdfsFile) throws Exception { if(StringUtils.isBlank(hdfsFile)){ return null; } FileSystem fs = FileSystem.get(getConf()); // check if the file exists Path path = new Path(hdfsFile); if (fs.exists(path)) { FSDataInputStream is = fs.open(path); // get the file info to create the buffer FileStatus stat = fs.getFileStatus(path); // create the buffer byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); fs.close(); return buffer; } else { throw new Exception("the file is not found ."); } } /** * 向指定文件追加 * @param hdfsFile * @param content * @return * @throws Exception */ public static boolean append(String hdfsFile,String content) throws Exception { if(StringUtils.isBlank(hdfsFile)) return false; if(content==null) return false; // solve the problem when appending at single datanode hadoop env // conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER"); //conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true"); FileSystem fs = FileSystem.get(getConf()); // check if the file exists Path path = new Path(hdfsFile); if (fs.exists(path)) { try { InputStream in = new ByteArrayInputStream(content.getBytes()); OutputStream out = fs.append(new Path(hdfsFile)); IOUtils.copyBytes(in, out, 4096, true); out.close(); in.close(); fs.close(); return true; } catch (Exception ex) { fs.close(); throw ex; } } else { return HDFSUtils.create(hdfsFile, content); } } @Test public void testappend() throws Exception { append(URI+"/study/input/qq.dat","hello kitty hello kitty"); } @Test public void testreadHDFS() throws Exception { System.out.println(new String(readHDFS(URI+"/study/input/qq.dat"))); } @Test public void testdelete() throws IOException { delete(URI+"/study/inputx",true); } @Test public void testcreate() throws IOException { create(URI+"/input/word2.txt",""); } @Test public void testcopyFromLocalFile() throws IOException { copyFromLocalFile("D:\test\hadoop-2.6.5\hadoopData\word.txt",URI+"/inputx"); } @Test public void testlistAll() throws IOException { listAll(URI+"/"); } @Test public void testdeldir() throws IOException { deleteDir(URI+"/input/test2"); } @Test public void testmkdir() throws IOException { mkdir(URI+"/input"); } }
http://blog.csdn.net/daxiang12092205/article/details/52717470