package HDFS; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; /** * @Author:Dapeng * @Discription: * @Date:Created in 上午 10:37 2018/10/25 0025 */ public class HDFSAPI { /** * 生成文件系统 * */ public static FileSystem getHadoopFileSystem() { Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://192.168.52.135:9000"); // conf.set("dfs.replication","1"); FileSystem fileSystem = null; try { fileSystem = FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } return fileSystem; } /** * 创建目录 * */ public static boolean createPath(String pathName) { boolean result = false; //获取文件系统 FileSystem fileSystem = getHadoopFileSystem(); //调用 makdirs创建目录 Path path = new Path(pathName); try { result = fileSystem.mkdirs(path); } catch (IOException e) { e.printStackTrace(); }finally { //将文件系统关闭 close(fileSystem); } return result; } /** * 创建文件写数据 * */ public static boolean createFile(String fileName,String content){ boolean result = false; FileSystem fileSystem = getHadoopFileSystem(); Path path = new Path(fileName); try { //创建文件的函数 FSDataOutputStream fsDataOutputStream = fileSystem.create(path); //写入数据 fsDataOutputStream.writeUTF(content); result = true; } catch (IOException e) { e.printStackTrace(); }finally { close(fileSystem); } return result; } /** * 上传已有的文件 * * */ public static boolean putFileToHDFS2(String srcPathName,String dstPathName){ boolean result = false; //获得文件系统 FileSystem fileSystem = getHadoopFileSystem(); //调用copyFromLocalFile try { fileSystem.copyFromLocalFile(new Path(srcPathName),new Path(dstPathName)); result = true; } catch (IOException e) { e.printStackTrace(); }finally { //关闭文件系统 close(fileSystem); } return result; } /** * @Author:Dapeng * @Description: 上传文件通过输入输出流 */ public static boolean putFileToHDFS(String srcPathName , String dstPathName){ boolean result = false; //获取文件系统 FileSystem fileSystem = getHadoopFileSystem(); //上传 try { //创建一个到HDFS的输出流 FSDataOutputStream out = fileSystem.create(new Path(dstPathName)); //创建一个本地java的输入流 FileInputStream in = new FileInputStream(srcPathName); //通过一个IOUtilsd的copyBytes方法传递数据流 IOUtils.copyBytes(in,out,4096,true); result = true; } catch (IOException e) { e.printStackTrace(); }finally { //关闭文件系统 close(fileSystem); } return result; } /** * @Author:Dapeng * @Description: 文件元数据 */ public static FileStatus[] list(String dstPathName){ FileStatus[] list = null; //获取文件系统 FileSystem fileSystem = getHadoopFileSystem(); //获取元数据信息 try { list = fileSystem.listStatus(new Path(dstPathName)); for(FileStatus fileStatus : list) { //可以返回相关的一些信息 // fileStatus.isDirectory(); // fileStatus.isFile(); // fileStatus.getPath().toString(); // fileStatus.getReplication(); System.out.println(fileStatus); } } catch (IOException e) { e.printStackTrace(); }finally{ //关闭文件系统 close(fileSystem); } return list; } /** * @Author:Dapeng * @Description: 下载文件通过输入输出流 */ public static boolean getFileFromHDFS(String srcPathName , String dstPathName){ boolean result = false; //获取文件系统 FileSystem fileSystem = getHadoopFileSystem(); //下载文件 try { //读入到内存 FSDataInputStream in = fileSystem.open(new Path(srcPathName)); //写入到文件 FileOutputStream out = new FileOutputStream(dstPathName); //通过工具类链接两者 IOUtils.copyBytes(in,out,4096,true); result = true; } catch (IOException e) { e.printStackTrace(); }finally{ //关闭文件系统 close(fileSystem); } return result; } /** * @Author:Dapeng * @Description: 删除文件 */ public static boolean delete(String path){ boolean result = false; //获取文件系统 FileSystem fileSystem = getHadoopFileSystem(); //删除,第二个参数是否递归的删除 try { fileSystem.delete(new Path(path) ,true); result = true; } catch (IOException e) { e.printStackTrace(); }finally{ close(fileSystem); } return result; //关闭文件系统 } /** * @Author:Dapeng * @Description: 关闭文件系统 */ public static void close(FileSystem fileSystem){ try { fileSystem.close(); } catch (IOException e) { e.printStackTrace(); } } }