一、HDFS dao接口
package cn.mk.dao; import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; public interface HDFSDao { public boolean mkDirs(String path) throws IOException; public boolean rmr(String path) throws IOException; public boolean rmdir(String path) throws IOException; public boolean rename(String src, String dst) throws IOException; public FileStatus[] ls(String path) throws FileNotFoundException, IOException; public boolean createFile(String file, String content) throws IOException; public void copyFile(String local, String remote) throws IOException; public void download(String remote, String local) throws IOException; public String cat(String remoteFile) throws IOException; public BlockLocation[] location(String path) throws IOException; boolean createNewFile(String file, String content) throws IOException; }
二、HDFS dao实现类
package cn.mk.dao.impl; import java.io.ByteArrayOutputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import cn.mk.dao.HDFSDao; public class HDFSDaoImpl implements HDFSDao { private static final String HDFS_PATH = "hdfs://master:9000/"; private final FileSystem fileSystem; public HDFSDaoImpl(Configuration conf) throws IOException { this(HDFS_PATH, conf); } public HDFSDaoImpl(String hdfs, Configuration conf) throws IOException { fileSystem = FileSystem.get(URI.create(hdfs), conf); } @Override public boolean mkDirs(String path) throws IOException { Path p = new Path(path); if (!fileSystem.exists(p)) { return fileSystem.mkdirs(p); } return false; } @Override public boolean rmr(String path) throws IOException { Path p = new Path(path); return fileSystem.delete(p, true); } @Override public boolean rmdir(String path) throws IOException { Path p = new Path(path); return fileSystem.delete(p, true); } @Override public boolean rename(String src, String dst) throws IOException { Path p1 = new Path(src); Path p2 = new Path(dst); return fileSystem.rename(p1, p2); } @Override public FileStatus[] ls(String path) throws FileNotFoundException, IOException { Path p = new Path(path); return fileSystem.listStatus(p); } @Override public boolean createFile(String file, String content) throws IOException { Path p = new Path(file); return fileSystem.createNewFile(p); } @Override public boolean createNewFile(String file, String content) throws IOException { Path p = new Path(file); boolean b= fileSystem.createNewFile(p); if(!b) return false; FSDataOutputStream os = null; try { byte[] bytes=content.getBytes(); os = fileSystem.create(p); os.write(bytes, 0,bytes.length); }finally{ if(os!=null) os.close(); } return true; } @Override public void copyFile(String local, String remote) throws IOException { Path p1 =new Path(local); Path p2 =new Path(remote); fileSystem.copyFromLocalFile(p1, p2); } @Override public void download(String remote, String local) throws IOException { Path p1 =new Path(local); Path p2 =new Path(remote); fileSystem.copyToLocalFile(p2, p1); } @Override public String cat(String remoteFile) throws IOException { Path p =new Path(remoteFile); FSDataInputStream in = null; String content=null; try { in=fileSystem.open(p); ByteArrayOutputStream bos =new ByteArrayOutputStream(); IOUtils.copyBytes(in, bos, 4096,false); } catch (Exception e) { IOUtils.closeStream(in); } return content; } @Override public BlockLocation[] location(String path) throws IOException { Path p =new Path(path); FileStatus fStatus=fileSystem.getFileStatus(p); return fileSystem.getFileBlockLocations(fStatus, 0, fStatus.getLen()); } @Override protected void finalize() throws Throwable { fileSystem.close(); } }