1. 读取HDFS文件
1.1 hdfs文件字节数组读取
public void readFileByAPI() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.default.name", "hdfs://192.168.8.156:9000/");
FileSystem fileSystem = FileSystem.get(conf);
Path path = new Path("/user/compass/readme.txt");
FSDataInputStream fsDataInputStream = fileSystem.open(path);
byte[] bytes = new byte[1024];
int len = -1;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
while ((len = fsDataInputStream.read(bytes)) != -1) {
stream.write(bytes, 0, len);
}
fsDataInputStream.close();
stream.close();
System.out.println(new String(stream.toByteArray()));
}
1.2 下载hdfs文件到本地
public void getFile(String dst, String local) {
try {
if (!filesystem.exists(new Path(dst))) {
System.out.println("文件不存在!");
} else {
filesystem.copyToLocalFile(new Path(dst), new Path(local));
System.out.println("下载成功!");
}
} catch (IOException e) {
System.out.println("下载失败!");
e.printStackTrace();
}
}
2. 写入HDFS文件
2.1 将字符串写入hdfs新文件中
def wirteToHdfs(hdfs: FileSystem, json: String, blockPath: String): Unit = {
val path = new Path(blockPath)
val fileOutputStream: FSDataOutputStream = hdfs.create(path)
fileOutputStream.writeBytes(json)
if (fileOutputStream != null) {
fileOutputStream.close()
}
}
2.2 上传本地到文件hdfs
@Test
public void copyFromLocalFile() throws Exception{
Path localpath = new Path("/home/pumbaa/Downloads/Anaconda3-5.0.1-Linux-x86_64.sh");
Path destpath = new Path("/hdfsapi/test");
fileSystem.copyFromLocalFile(localpath, destpath);
}
@Test
public void copyFromLocalBigFile() throws Exception{
Path localpath = new Path("/home/pumbaa/Downloads/Anaconda3-5.0.1-Linux-x86_64.sh");
Path destpath = new Path("/hdfsapi/test");
fileSystem.copyFromLocalFile(localpath, destpath);
InputStream in = new BufferedInputStream(
new FileInputStream(
new File("/home/pumbaa/Downloads/ideaIU-2017.3.4.tar.gz")));
FSDataOutputStream output = fileSystem.create(new Path("/hdfsapi/test/ideaIU-2017.3.4.tar.gz"),
new Progressable() {
public void progress() {
System.out.print("*");
}
});
IOUtils.copyBytes(in, output, 4096);
}
3.文件操作
3.1创建目录
/**
* 创建目录
* @param path 创建目录的地址(例:/hadoop/)
* @throws IOException
*/
public void mkdir(String path) throws IOException {
//创建hdfs目录
if(filesystem.exists(new Path(path)))
{
System.out.println("目录已存在");
}
else
{
boolean result=filesystem.mkdirs(new Path(path));
System.out.println(result);
}
}
3.2创建文件
/**
* 创建文件
* @param path hdfs文件地址(例:/hadoop/abc.txt)
* @throws IOException
*/
public void create(String path) throws IOException{
//创建文件
if(filesystem.exists(new Path(path)))
{
System.out.println("文件已存在");
}
else
{
FSDataOutputStream outputStream= filesystem.create(new Path(path));
System.out.println("文件创建成功");
}
}
3.3查看文件内容
/**
* 查看文件内容
* @param dst hdfs文件地址(例:/hadoop/abc.txt)
* @throws IOException
*/
public void read(String dst) throws IOException {
if(filesystem.exists(new Path(dst)))
{
FSDataInputStream inputstream=filesystem.open(new Path(dst));
InputStreamReader isr=new InputStreamReader(inputstream);
BufferedReader br=new BufferedReader(isr);
String str=br.readLine();
while(str!=null){
System.out.println(str);
str=br.readLine();
}
br.close();
isr.close();
inputstream.close();
}
else
{
System.out.println("文件不存在");
}
}
3.4文件重命名,移动文件
/**
* 将dst1重命名为dst2,也可以进行文件的移动
* @param oldpath 旧名
* @param newpath 新名
*/
public void moveFile(String oldpath, String newpath) {
Path path1 = new Path(oldpath);
Path path2 = new Path(newpath);
try {
if (!filesystem.exists(path1)) {
System.out.println(oldpath + " 文件不存在!");
return;
}
if (filesystem.exists(path2)) {
System.out.println(newpath + "已存在!");
return;
}
// 将文件进行重命名,可以起到移动文件的作用
filesystem.rename(path1, path2);
System.out.println("文件已重命名!");
} catch (IOException e) {
e.printStackTrace();
}
}
3.5显示目录下所有文件
/**
* 显示目录下所有文件
* @param dst
*/
public void listStatus(String dst) {
try {
if (!filesystem.exists(new Path(dst))) {
System.out.println("目录不存在!");
return;
}
// 得到文件的状态
FileStatus[] status = filesystem.listStatus(new Path(dst));
for (FileStatus s : status) {
System.out.println(s.getPath().getName());
}
} catch (IllegalArgumentException | IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
3.6删除hdfs文件
/**
* 删除hdfs中的文件
* @param dst
*/
public void deleteFile(String dst) {
try {
if (!filesystem.exists(new Path(dst))) {
System.out.println("文件不存在!");
} else {
filesystem.delete(new Path(dst), true);
System.out.println("删除成功!");
}
} catch (IOException e) {
System.out.println("删除失败!");
e.printStackTrace();
}
}