package com.claw.util.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import com.claw.util.string.StringUtil; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; /** */ public class HDFSUtil { private HDFSUtil() { } /** * 判断路径是否存在 * * @param conf * @param path * @return * @throws IOException */ public static boolean exits(Configuration conf, String path) throws IOException { FileSystem fs = FileSystem.get(conf); return fs.exists(new Path(path)); } /** * 创建文件 * * @param conf * @param filePath * @param contents * @throws IOException */ public static void createFile(Configuration conf, String filePath, byte[] contents) throws IOException { FileSystem fs = FileSystem.get(conf); Path path = new Path(filePath); FSDataOutputStream outputStream = fs.create(path); outputStream.write(contents); outputStream.close(); fs.close(); } /** * 创建文件 * * @param conf * @param filePath * @param fileContent * @throws IOException */ public static void createFile(Configuration conf, String filePath, String fileContent) throws IOException { createFile(conf, filePath, fileContent.getBytes()); } /** * @param conf * @param localFilePath * @param remoteFilePath * @throws IOException */ public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException { FileSystem fs = FileSystem.get(conf); Path localPath = new Path(localFilePath); Path remotePath = new Path(remoteFilePath); fs.copyFromLocalFile(true, true, localPath, remotePath); fs.close(); } /** * 删除目录或文件 * * @param conf * @param remoteFilePath * @param recursive * @return * @throws IOException */ public static boolean deleteFile(Configuration conf, String remoteFilePath, boolean recursive) throws IOException { FileSystem fs = FileSystem.get(conf); boolean result = fs.delete(new Path(remoteFilePath), recursive); fs.close(); return result; } /** * 删除目录或文件(如果有子目录,则级联删除) * * @param conf * @param remoteFilePath * @return * @throws IOException */ public static boolean deleteFile(Configuration conf, String remoteFilePath) throws IOException { return deleteFile(conf, remoteFilePath, true); } /** * 文件重命名 * * @param conf * @param oldFileName * @param newFileName * @return * @throws IOException */ public static boolean renameFile(Configuration conf, String oldFileName, String newFileName) throws IOException { FileSystem fs = FileSystem.get(conf); Path oldPath = new Path(oldFileName); Path newPath = new Path(newFileName); boolean result = fs.rename(oldPath, newPath); fs.close(); return result; } /** * 创建目录 * * @param conf * @param dirName * @return * @throws IOException */ public static boolean createDirectory(Configuration conf, String dirName) throws IOException { FileSystem fs = FileSystem.get(conf); Path dir = new Path(dirName); boolean result = fs.mkdirs(dir); fs.close(); return result; } /** * 列出指定路径下的所有文件(不包含目录) * * @param conf * @param basePath * @param recursive */ public static RemoteIterator<LocatedFileStatus> listFiles(FileSystem fs, String basePath, boolean recursive) throws IOException { RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator = fs.listFiles(new Path(basePath), recursive); return fileStatusRemoteIterator; } /** * 列出指定路径下的文件(非递归) * * @param conf * @param basePath * @return * @throws IOException */ public static RemoteIterator<LocatedFileStatus> listFiles(Configuration conf, String basePath) throws IOException { FileSystem fs = FileSystem.get(conf); RemoteIterator<LocatedFileStatus> remoteIterator = fs.listFiles(new Path(basePath), false); fs.close(); return remoteIterator; } /** * 列出指定目录下的文件子目录信息(非递归) * * @param conf * @param dirPath * @return * @throws IOException */ public static FileStatus[] listStatus(Configuration conf, String dirPath) throws IOException { FileSystem fs = FileSystem.get(conf); FileStatus[] fileStatuses = fs.listStatus(new Path(dirPath)); fs.close(); return fileStatuses; } /** * 读取文件内容 * * @param conf * @param filePath * @return * @throws IOException */ public static String readFile(Configuration conf, String filePath) throws IOException { String fileContent = null; FileSystem fs = FileSystem.get(conf); Path path = new Path(filePath); InputStream inputStream = null; ByteArrayOutputStream outputStream = null; try { inputStream = fs.open(path); outputStream = new ByteArrayOutputStream(inputStream.available()); IOUtils.copyBytes(inputStream, outputStream, conf); fileContent = outputStream.toString(); } finally { IOUtils.closeStream(inputStream); IOUtils.closeStream(outputStream); fs.close(); } return fileContent; } /** * 读取文件的URL * @param conf * @param filePath * @return * @throws IOException */ public static Map<String,String> readURL(Configuration conf, String filePath) throws IOException { List<String> list = new ArrayList<String>(); Map<String,String> map = new HashMap<String, String>(); FileSystem fs = FileSystem.get(conf); Path path = new Path(filePath); InputStream inputStream = null; inputStream = fs.open(path); InputStreamReader read = new InputStreamReader(inputStream,"UTF-8");//考虑到编码格式 BufferedReader bufferedReader = new BufferedReader(read); String lineTxt = null; while((lineTxt = bufferedReader.readLine()) != null){ List<String> lineList = StringUtil.getLine(lineTxt+" "); if(lineList!=null && lineList.size()==2){ String id = lineList.get(0); String url = lineList.get(1); if(!"".equals(url)){ map.put(id, url); // list.add(url); }else{ map.put(id, ""); } } } read.close(); return map; } }