1 package dada; 2 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileInputStream; 6 import java.io.IOException; 7 import java.io.InputStreamReader; 8 9 import org.apache.hadoop.conf.Configuration; 10 import org.apache.hadoop.fs.FSDataInputStream; 11 import org.apache.hadoop.fs.FSDataOutputStream; 12 import org.apache.hadoop.fs.FileSystem; 13 import org.apache.hadoop.fs.Path; 14 15 public class HDFSToll { 16 17 //路径是否存在 18 public static boolean testExist(Configuration conf,String path) throws IOException 19 { 20 FileSystem fs=FileSystem.get(conf); 21 return fs.exists(new Path(path)); 22 } 23 //创建目录 24 public static boolean mkdir (Configuration conf ,String remoteDir)throws IOException 25 { 26 FileSystem fs=FileSystem.get(conf); 27 Path dirPath=new Path(remoteDir); 28 boolean result=fs.mkdirs(dirPath); 29 fs.close(); 30 return result; 31 } 32 /** 33 * 删除目录 34 */ 35 public static boolean rmDir(Configuration conf, String remoteDir) throws IOException { 36 FileSystem fs = FileSystem.get(conf); 37 Path dirPath = new Path(remoteDir); 38 /* 第二个参数表示是否递归删除所有文件 */ 39 boolean result = fs.delete(dirPath, true); 40 fs.close(); 41 return result; 42 } 43 //创建文件 44 public static void touch(Configuration conf,String remoteFilePath )throws IOException 45 { 46 FileSystem fs=FileSystem.get(conf); 47 Path remotePath=new Path(remoteFilePath); 48 FSDataOutputStream outputStream =fs.create(remotePath); 49 outputStream.close(); 50 fs.close(); 51 } 52 //删除文件 53 public static boolean rm(Configuration conf,String remoteFilePath)throws IOException 54 { 55 FileSystem fs=FileSystem.get(conf); 56 Path remotePath=new Path(remoteFilePath); 57 boolean result=fs.delete(remotePath,false); 58 fs.close(); 59 return result; 60 } 61 //追加文件内容 到末尾 62 public static void appendContentToFile(Configuration conf,String content,String remoteFilePath)throws IOException 63 { 64 FileSystem fs=FileSystem.get(conf); 65 Path remotePath=new Path(remoteFilePath); 66 FSDataOutputStream out=fs.append(remotePath); 67 out.write(content.getBytes()); 68 out.close(); 69 fs.close(); 70 } 71 72 //追加文件内容到开头 73 public static void appendContentToFile1(Configuration conf,String content,String remoteFilePath)throws IOException 74 { 75 String localTmpPath = "/usr/local/hadoop/enen.txt"; 76 77 // 移动到本地 78 moveToLocalFile(conf, remoteFilePath, localTmpPath); 79 // 创建一个新文件 80 touch(conf, remoteFilePath); 81 // 先写入新内容 82 appendContentToFile(conf, content, remoteFilePath); 83 // 再写入原来内容 84 appendContentToFile(conf, localTmpPath, remoteFilePath); 85 86 System.out.println("已追加内容到文件开头: " + remoteFilePath); 87 } 88 /** 89 90 * 复制文件到指定路径 91 92 * 若路径已存在,则进行覆盖 93 94 */ 95 96 public static void copyFromLocalFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException { 97 98 FileSystem fs = FileSystem.get(conf); 99 100 Path localPath = new Path(localFilePath); 101 102 Path remotePath = new Path(remoteFilePath); 103 104 /* fs.copyFromLocalFile 第一个参数表示是否删除源文件,第二个参数表示是否覆盖 */ 105 106 fs.copyFromLocalFile(false, true, localPath, remotePath); 107 108 fs.close(); 109 110 } 111 112 113 114 115 //将文件1写入文件2 116 public static void appendFile1ToFile2(Configuration conf,String remoteFilePath,String remoteFilePath2)throws IOException 117 { 118 FileSystem fs=FileSystem.get(conf); 119 Path file=new Path(remoteFilePath); 120 FSDataInputStream getIt=fs.open(file); 121 BufferedReader d=new BufferedReader(new InputStreamReader(getIt)); 122 String content1=d.readLine(); 123 Path remotePath=new Path(remoteFilePath2); 124 FSDataOutputStream out=fs.append(remotePath); 125 out.write(content1.getBytes()); 126 d.close(); 127 out.close(); 128 fs.close(); 129 } 130 /** 131 132 * 追加文件内容 133 134 */ 135 public static void appendToFile(Configuration conf, String localFilePath, String remoteFilePath) throws IOException { 136 137 FileSystem fs = FileSystem.get(conf); 138 139 Path remotePath = new Path(remoteFilePath); 140 141 /* 创建一个文件读入流 */ 142 143 FileInputStream in = new FileInputStream(localFilePath); 144 145 /* 创建一个文件输出流,输出的内容将追加到文件末尾 */ 146 147 FSDataOutputStream out = fs.append(remotePath); 148 149 /* 读写文件内容 */ 150 151 byte[] data = new byte[1024]; 152 153 int read = -1; 154 155 while ( (read = in.read(data)) > 0 ) { 156 157 out.write(data, 0, read); 158 159 } 160 161 out.close(); 162 163 in.close(); 164 165 fs.close(); 166 167 } 168 /** 169 170 * 下载文件到本地 171 172 * 判断本地路径是否已存在,若已存在,则自动进行重命名 173 174 */ 175 176 public static void copyToLocal(Configuration conf, String remoteFilePath, String localFilePath) throws IOException { 177 178 FileSystem fs = FileSystem.get(conf); 179 180 Path remotePath = new Path(remoteFilePath); 181 182 File f = new File(localFilePath); 183 184 /* 如果文件名存在,自动重命名(在文件名后面加上 _0, _1 ...) */ 185 186 if (f.exists()) { 187 188 System.out.println(localFilePath + " 文件已存在."); 189 190 Integer i = 0; 191 192 while (true) { 193 194 f = new File(localFilePath + "_" + i.toString()); 195 196 if (!f.exists()) { 197 198 localFilePath = localFilePath + "_" + i.toString(); 199 200 System.out.println("将重新命名为: " + localFilePath); 201 202 break;//重命名文件 203 204 } 205 206 i++; 207 208 } 209 210 // System.out.println("将重新命名为: " + localFilePath); 211 212 } 213 214 else 215 216 System.out.println(localFilePath + " 文件不存在."); 217 218 219 220 // 下载文件到本地 221 222 Path localPath = new Path(localFilePath); 223 224 fs.copyToLocalFile(remotePath, localPath); 225 226 fs.close(); 227 228 } 229 /** 230 * 移动文件到本地 231 232 * 移动后,删除源文件 233 234 */ 235 public static void moveToLocalFile(Configuration conf, String remoteFilePath, String localFilePath) throws IOException { 236 237 FileSystem fs = FileSystem.get(conf); 238 239 Path remotePath = new Path(remoteFilePath); 240 241 Path localPath = new Path(localFilePath); 242 243 fs.moveToLocalFile(remotePath, localPath); 244 245 } 246 }