获取hdfs文件下所有文件getAllFiles 遍历 spark读取
1 package com.spark.demo
2
3 import java.io.IOException
4 import java.net.URI
5
6 import org.apache.hadoop.conf.Configuration
7 import org.apache.hadoop.fs._
8
9 object HdfsUtil {
10
11 val conf: Configuration = new Configuration()
12 var fs: FileSystem = null
13 var files: RemoteIterator[LocatedFileStatus] = null
14
15
16 def getFiles(HDFSPath: String) = {
17 try {
18 fs = FileSystem.get( new URI( HDFSPath ), conf )
19 } catch {
20 case e: IOException => {
21 e.printStackTrace
22 }
23 }
24 files
25 }
26
27 def getFiles(HDFSPath: String, targetPath: String) = {
28 try {
29 fs = FileSystem.get( new URI( HDFSPath ), conf )
30 // 返回指定路径下所有的文件
31 files = fs.listFiles( new Path( targetPath ), false )
32 } catch {
33 case e: IOException => {
34 e.printStackTrace
35 }
36 }
37 files
38 }
39
40 def mkdir(finalPath: String) = {
41 fs.create( new Path( finalPath ) )
42 }
43
44 def rename(oldPath: String, finalPath: String) = {
45 fs.rename( new Path( oldPath ), new Path( finalPath ) )
46 }
47
48 def exist(existPath: String): Boolean = {
49 fs.exists( new Path( existPath ) )
50 }
51
52 def delete(deletePath: String) = {
53 fs.delete( new Path( deletePath ), true )
54 }
55
56 def read(readPath: String) = {
57 fs.open( new Path( readPath ) )
58 }
59 def getAllFiles(path:String): Array[FileStatus] ={
60 val fs = FileSystem.get(URI.create(path), conf)
61 val files= fs.listStatus(new Path(path))
62 for(file<-files){
63 println( file.getPath.getName)
64 println(file.getPath.toString)
65 }
66 files
67
68 }
69
70 def main(args: Array[String]): Unit = {
71 getAllFiles("hdfs://10.10.4.1:8020/ibc/datalogs/apachelogs/archive/2018")
72 }
73
74 def close() = {
75 try {
76 if (fs != null) {
77 fs.close()
78 }
79 } catch {
80 case e: IOException => {
81 e.printStackTrace
82 }
83 }
84 }
85
86
87
88
89 }