package com.hy.hdfs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; public class HDFSCommand { public static final Logger log = LoggerFactory.getLogger(HDFSCommand.class); public static void main(String[] args) throws Exception { String hdfsURI = "hdfs://10.1.23.240:9000"; String srcPath = "D:" + File.separator + "readme.txt"; String descPath = "/xhy"; String data = "haohaohaohaohao 善字 善生 善行 守善 愿善"; Configuration conf = new Configuration(); copyFromLocalFile(hdfsURI, srcPath, descPath, conf); uploadFile(hdfsURI, data, descPath, conf); RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = listFile(hdfsURI, descPath, conf, true); while (locatedFileStatusRemoteIterator.hasNext()) { LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); System.out.println("listFile:" + next.toString()); } FileStatus[] fileStatuses = listFileAndFolder(hdfsURI, descPath, conf); for (FileStatus f : fileStatuses) { System.out.println("listFileAndFolder:" + f.toString()); } } /** * 本地指定路径文件上传到hdfs * * @param hdfsURI * @param srcPath * @param descPath * @param conf */ public static void copyFromLocalFile(String hdfsURI, String srcPath, String descPath, Configuration conf) throws URISyntaxException, IOException { log.info(">> copyFromLocalFile, srcPath is {}, descPath is {}", srcPath, descPath); FileSystem fs = FileSystem.get(new URI(hdfsURI), conf); fs.copyFromLocalFile(new Path(srcPath), new Path(descPath)); log.info("<< copyFromLocalFile success"); fs.close(); /* * 底层是通过 * fs.open(new Path(srcPath), 4096); * fs.create(new Path(descPath)); * IOUtils.copyBytes(in, out, conf, true); */ } /** * 将数据写入到hdfs * * @param hdfsURI * @param data * @param descPath * @param conf */ public static void uploadFile(String hdfsURI, String data, String descPath, Configuration conf) throws Exception { log.info(">> uploadFile, descPath is {}, data is {}", descPath, data); FileSystem fs = FileSystem.get(new URI(hdfsURI), conf); /*FSDataOutputStream fsOutputStream = fs.create(new Path(descPath), new Progressable() { @Override public void progress() { log.info("<< 写入hdfs成功,文件路径为:{}", descPath); } });*/ FSDataOutputStream fsOutputStream = fs.create(new Path(descPath), () -> log.info("<< 写入hdfs成功,文件路径为:{}", descPath)); fsOutputStream.write(data.getBytes(), 0, data.getBytes().length); /* * 以下几种方式会出现中文乱码 * fsOutputStream.writeBytes(data); * fsOutputStream.writeUTF(data); * fsOutputStream.writeChars(data); */ fsOutputStream.close(); fs.close(); } /** * 查找hdfs指定路径下的文件 * * @param hdfsURI * @param path * @param conf * @param recursive 是否递归查找 * @throws Exception */ public static RemoteIterator<LocatedFileStatus> listFile(String hdfsURI, String path, Configuration conf, boolean recursive) throws Exception { log.info(">> listFile, path is {}, recursive is {}", path, recursive); FileSystem fs = FileSystem.get(new URI(hdfsURI), conf); RemoteIterator<LocatedFileStatus> result = fs.listFiles(new Path(path), recursive); log.info("<< listFile, result is {}", result); return result; } /** * 查找hdfs指定路径下的文件和文件夹 * * @param hdfsURI * @param path * @param conf */ public static FileStatus[] listFileAndFolder(String hdfsURI, String path, Configuration conf) throws Exception { log.info(">> listFileAndFolder, path is {}", path); FileSystem fs = FileSystem.get(new URI(hdfsURI), conf); FileStatus[] result = fs.listStatus(new Path(path)); log.info("<< listFileAndFolder, result is {}", result.toString()); return result; // 方法二 } /** * 创建文件夹 * * @param hdfsURI * @param path * @param conf * @throws Exception */ public static void mkDir(String hdfsURI, String path, Configuration conf) throws Exception { log.info(">> mkDir, path is {}", path); FileSystem fs = FileSystem.get(new URI(hdfsURI), conf); boolean result = fs.mkdirs(new Path(path)); if (result) { log.info("<< mkDir {} success", path); } else { log.error("<< mkDir {} error", path); } } /** * 删除指定路径 * * @param hdfsURI * @param path * @param conf * @throws IOException */ public static void delete(String hdfsURI, String path, Configuration conf) throws IOException { log.info(">> delete, path is {}", path); conf.set("fs.defaultFS", hdfsURI); FileSystem fs = FileSystem.get(conf); if (!fs.exists(new Path(path))) { log.info("<< delete {} error, path no exists", path); return; } boolean result = fs.delete(new Path(path), true); if (result) { log.info("<< delete {} success", path); } else { log.error("<< delete {} error", path); } } /** * 从hdfs上面下载 * * @param hdfsURI * @param srcPath * @param descPath * @param conf * @throws Exception */ public static void downloadFile(String hdfsURI, String srcPath, String descPath, Configuration conf) throws Exception { log.info(">> downloadFile, srcPath is {}, descPath is {}", srcPath, descPath); FileSystem fs = FileSystem.get(new URI(hdfsURI), conf); FSDataInputStream in = fs.open(new Path(srcPath)); OutputStream out = new FileOutputStream(new File(descPath)); IOUtils.copyBytes(in, out, conf); } public static void catFile(String hdfsURI, String path, Configuration conf) throws Exception { log.info(">> catFile, path is {}", path); FileSystem fs = FileSystem.get(URI.create(hdfsURI), conf); FSDataInputStream in = fs.open(new Path(path)); try { IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); fs.close(); } } }