package com.picc.test; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; import java.net.URL; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.fs.Path; public class UrlCat { static{ URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) { InputStream in =null; try {//new FileInputStream("/home/liucheng/file/student.txt"); in= new URL("hdfs://localhost:9000/user/liucheng/input/student.txt").openStream(); } catch (IOException e) { e.printStackTrace(); } BufferedReader read = new BufferedReader(new InputStreamReader(in)); String line=null; try { while((line=read.readLine())!=null){ System.out.println("result:"+line.trim()); } } catch (IOException e) { e.printStackTrace(); } try { read.close(); } catch (IOException e) { e.printStackTrace(); } run("hdfs://localhost:9000/user/liucheng/input/student.txt"); } public static void run(String hdfs){ Configuration conf = new Configuration(); InputStream in =null; try { FileSystem fs = FileSystem.get(URI.create(hdfs), conf); in = fs.open(new Path(hdfs)); BufferedReader read = new BufferedReader(new InputStreamReader(in)); String line=null; try { while((line=read.readLine())!=null){ System.out.println("result:"+line.trim()); } } catch (IOException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); }finally{ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } } }
示例是读取hdfs 的Input 文件夹下的student.txt 文件。示例。
java 操作 hdfs
package com.picc.test; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.URI; import java.net.URL; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.fs.Path; public class UrlCat { static{ URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) { run("hdfs://localhost:9000/user/liucheng/input/file01.txt"); //doubleCat("hdfs://localhost:9000/user/liucheng/input/student.txt"); // statusTest("hdfs://localhost:9000/user/liucheng/input"); } public static void run(String hdfs){ Configuration conf = new Configuration(); InputStream in =null; try { FileSystem fs = FileSystem.get(URI.create(hdfs), conf); in = fs.open(new Path(hdfs)); BufferedReader read = new BufferedReader(new InputStreamReader(in)); String line=null; try { while((line=read.readLine())!=null){ System.out.println("result:"+line.trim()); } } catch (IOException e) { e.printStackTrace(); } } catch (IOException e) { e.printStackTrace(); }finally{ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } } //FSDataInputStream extents DataInputStream extents InputStream /** * 其实doubleCat 和 getInputStreamtoText 方法需要放到一起 * 在有异常时,把位置保存到一个properties文件中 * @param hdfs */ public static void doubleCat(String hdfs){ Configuration conf = new Configuration(); FSDataInputStream fsdis =null; try { FileSystem fs = FileSystem.get(URI.create(hdfs),conf); fsdis= fs.open(new Path(hdfs)); long rs = fsdis.skip(1000);//跳过1000个位置 fsdis.seek(rs);//查找rs的位子 String result = getInputStreamtoText(fsdis); System.out.println("result:"+result); } catch (IOException e) { e.printStackTrace(); }finally{ if(fsdis!=null){ try { fsdis.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * inputStream to Text * @param in * @return */ public static String getInputStreamtoText(InputStream in ){ BufferedReader read = new BufferedReader(new InputStreamReader(in)); String line=null; StringBuffer result=new StringBuffer(); try { while((line=read.readLine())!=null){ // System.out.println("result:"+line.trim()); String lineSeparator =System.getProperty("line.separator"); result.append(line).append(lineSeparator); } } catch (IOException e) { e.printStackTrace(); }finally{ try { in.close(); } catch (IOException e) { e.printStackTrace(); } } return result.toString(); } public static void statusTest(String url){ Configuration conf = new Configuration(); try { FileSystem fs = FileSystem.get(URI.create(url), conf); Path [] paths = new Path[url.length()]; for(int i=0;i<paths.length;i++){ // paths[i]=new Path[String.url]; FileStatus [] filestatus = fs.listStatus(paths[i]); Path [] listedPaths = FileUtil.stat2Paths(filestatus); for(Path p:listedPaths){ System.out.println(p); } } // /tmp/hadoop-liucheng/dfs/name/current/edits // bin/mysqld } catch (IOException e) { e.printStackTrace(); } } }