1. 读取HDFS文件
1.1 字符读取HDFS上的文件
Configuration conf = new Configuration(); Path path = new Path(pathstr); FileSystem fs = FileSystem.get(conf); FSDataInputStream fsin= fs.open(path); BufferedReader br =null; String line ; try{ br = new BufferedReader(new InputStreamReader(fsin)); while ((line = br.readLine()) != null) { System.out.println(line); } }finally{ br.close(); }
1.2 字节流读取HDFS文件内容(API)
1.2.1 字节数组读取
public void readFileByAPI() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.8.156:9000/"); FileSystem fileSystem = FileSystem.get(conf); Path path = new Path("/user/compass/readme.txt"); FSDataInputStream fsDataInputStream = fileSystem.open(path); byte[] bytes = new byte[1024]; int len = -1; ByteArrayOutputStream stream = new ByteArrayOutputStream(); while ((len = fsDataInputStream.read(bytes)) != -1) { stream.write(bytes, 0, len); } fsDataInputStream.close(); stream.close(); System.out.println(new String(stream.toByteArray())); }
1.2.1 hadoop工具类读取
public void readFileByAPI() throws Exception { Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://192.168.8.156:9000/"); FileSystem fileSystem = FileSystem.get(conf); Path path = new Path("/user/compass/readme.txt"); FSDataInputStream fsDataInputStream = fileSystem.open(path); ByteArrayOutputStream stream = new ByteArrayOutputStream(); IOUtils.copyBytes(fsDataInputStream, stream, 1024); System.out.println(new String(stream.toByteArray())); }
1.3 URL流读取HDFS文件(不常用)
public void readFileByURL() throws Exception { //url流处理器工程 URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); URL url = new URL("hdfs://192.168.8.156:9000/user/readme.txt"); URLConnection connection = url.openConnection(); InputStream inputStream = connection.getInputStream(); // 返回这个输入流中可以被读的剩下的bytes字节的估计值 byte[] b = new byte[inputStream.available()]; while (inputStream.read(b) != -1) inputStream.read(b); inputStream.close(); String string = new String(b); System.out.println(string); }
2. 写入HDFS文件
2.1 字节写入HDFS文件
Configuration conf = new Configuration(); Path path = new Path(path); FileSystem fs = FileSystem.get(conf); FSDataOutputStream out = fs.create(src); out.write(sb.toString().getBytes()); out.close();
2.2 HDFS 文件中追加(append)数据
public static boolean appendRTData(String src, String drc) { // src源文件 drc 追加文件 boolean flag = false; Configuration conf = new Configuration(); FileSystem fs = null; try { fs = FileSystem.get(URI.create(src), conf); InputStream in = new BufferedInputStream(new FileInputStream(drc)); OutputStream out = fs.append(new Path(src)); IOUtils.copyBytes(in, out, 4096, true); } catch (IOException e) { e.printStackTrace(); } return flag; }