import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; //利用JavaAPI来访问HDFS的文件与目录 public class HDFSTest { public static void main(String[] args) throws Exception { try { getHDFSNode(); makeDir(); writeToHDFS(); uploadToHdfs(); listHdfs(); readFromHdfs(); deleteFromHdfs(); deleteDir(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("SUCCESS"); } } // HDFS集群上所有节点名称信息 public static void getHDFSNode() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/"), conf); DistributedFileSystem dfs = (DistributedFileSystem) fs; DatanodeInfo[] dataNodeStats = dfs.getDataNodeStats(); for (int i = 0; i < dataNodeStats.length; i++) { System.out.println("DataNode_" + i + "_Node:" + dataNodeStats[i].getHostName()); } } /** 遍历HDFS上的文件和目录 */ private static void listHdfs() throws FileNotFoundException, IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/"), conf); FileStatus fileList[] = fs.listStatus(new Path("/")); for (int i = 0; i < fileList.length; i++) { System.out.println("name:" + fileList[i].getPath().getName() + " size:" + fileList[i].getLen()); } fs.close(); } public static void writeToHDFS() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf); Path path = new Path("/test/write"); FSDataOutputStream out = fs.create(path); out.writeUTF("Hello HDFS!"); out.close(); fs.close(); } // 创建HDFS目录 public static void makeDir() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf); Path path = new Path("/test"); fs.mkdirs(path); fs.close(); } // 删除HDFS目录 public static void deleteDir() throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path path = new Path("/test"); fs.delete(path, true); fs.close(); } /** 上传文件到HDFS上去 */ private static void uploadToHdfs() throws FileNotFoundException, IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/"), conf); String localSrc = "/etc/profile"; String hdfsDst = "/profile"; Path src = new Path(localSrc); Path dst = new Path(hdfsDst); fs.copyFromLocalFile(src, dst); fs.close(); } /** 从HDFS上读取文件 */ private static void readFromHdfs() throws FileNotFoundException, IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000/"), conf); String dst = "/profile"; FSDataInputStream hdfsInStream = fs.open(new Path(dst)); OutputStream out = new FileOutputStream("/home/manhua/profile"); byte[] ioBuffer = new byte[1024]; int readLen = hdfsInStream.read(ioBuffer); if (fs.exists(new Path(dst))) { while (-1 != readLen) { out.write(ioBuffer, 0, readLen); readLen = hdfsInStream.read(ioBuffer); } out.close(); hdfsInStream.close(); fs.close(); } } /** 从HDFS上删除文件 */ private static void deleteFromHdfs() throws FileNotFoundException, IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:9000"), conf); String dst = "/profile"; fs.deleteOnExit(new Path(dst)); fs.close(); } }