本文发表于本人博客。
这次来看看我们的客户端用url方式来连接JobTracker。我们已经搭建了伪分布环境,就知道了地址。现在我们查看HDFS上的文件,比如地址:hdfs://hadoop-master:9000/data/test.txt。看下面代码:
static final String PATH = "hdfs://hadoop-master:9000/data/test.txt"; public static void Test() throws Exception { URL url = new URL(PATH); InputStream in = url.openStream(); IOUtils.copyBytes(in, System.out, 1024, true); }
竟然hadoop支持的hdfs协议,我们先以url方式来访问,运行上面代码结果报错误:
unknown protocol: hdfs
看了这句话才知道原本的url是不支持这种hdfs://的,那现在可以更改下,问了朋友可以设置url的方式来解决,代码如下:
public static void Test() throws Exception { URL url = new URL(PATH); URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); InputStream in = url.openStream(); IOUtils.copyBytes(in, System.out, 1024, true); }
编译运行还是报错,分析了下这个url对象是不是没设置成功,因为是在new对象之后设置的,我在修改下:
public static void Test() throws Exception { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); URL url = new URL(PATH); InputStream in = url.openStream(); IOUtils.copyBytes(in, System.out, 1024, true); }
运行结果正确读取HDFS文件,现在单纯是读取而且是使用hadoop自带的IOUtils工具来实现,那既然用到hadoop的工具了有没有简单方便的API来操作呢,当然hadoop提供这个API,hadoop提供FileSystem来管理操作HDFS,看FileSystem的源码其实现了操作HDFS的方法,比如mkdirs(Path f)、copyFromLocalFile(boolean delSrc, Path src, Path dst)等等了,看下例子新建一个类HdfsTest:
public class HdfsTest { static final String ROOT = "hdfs://hadoop-master:9000/"; static final Configuration config = new Configuration(); /** * 创建文件夹 * @param path 相对路径。如"/test/" * @return 是否创建成功 * @throws Exception */ public static boolean Mkdir(String path) throws Exception { FileSystem fileSystem = getFileSystem(); boolean isCreate = fileSystem.mkdirs(new Path(path)); return isCreate; } /** * 删除文件夹 * @param path 文件夹路径 * @return 是否成功删除 */ public static boolean Removedir(String path) { boolean result = false; Path strPath = new Path(path); try { FileSystem fileSystem = getFileSystem(); if(fileSystem.isDirectory(strPath)){ result = fileSystem.delete(strPath); } else{ throw new NullPointerException("逻辑异常:删除文件夹属性出错,未能删除文件夹,如果删除文件应使用deleteFile(String path)"); } } catch (Exception e) { System.out.println("异常:" + e.getMessage()); } return result; } /** * 删除文件 * @param path 文件路径 * @return 是否成功删除 */ public static boolean deleteFile(String path) { boolean result = false; Path strPath = new Path(path); try { FileSystem fileSystem = getFileSystem(); if(fileSystem.isFile(strPath)){ result = fileSystem.delete(strPath); } else{ throw new NullPointerException("逻辑异常:删除文件属性出错,未能删除文件,如果删除文件夹应使用Removedir(String path)"); } } catch (Exception e) { System.out.println("异常:" + e.getMessage()); } return result; } /** * 对文件或文件夹列表 * @param path 文件或文件夹路径 * @return 返回FileStatus[] */ public static FileStatus[] List(String path){ return List(path,false); } /** * 对文件或文件夹列表 * @param path * @param isFull true:递归调用文件夹路径获取;false:单获取第一层不进行递归调用 * @return */ public static FileStatus[] List(String path,boolean isFull){ List<FileStatus> arrayList = new ArrayList<FileStatus>(); Path strPath = new Path(path); try { FileSystem fileSystem = getFileSystem(); FileStatus[] result = fileSystem.listStatus(strPath); for (int i = 0; i < result.length; i++) { FileStatus item = result[i]; arrayList.add(item); if(item.isDir()){ FileStatus[] results = List(item.getPath().toString(),true); for (int j = 0; j < results.length; j++) { FileStatus fileStatus = results[j]; arrayList.add(fileStatus); } } } } catch (Exception e) { System.out.println("异常:" + e.getMessage()); } FileStatus[] results = new FileStatus[arrayList.size()]; arrayList.toArray(results); return results; } /** * 上传文件 * @param src 本地磁盘文件 * @param tar 上传文件至HDFS路径 * @return 是否成功上传 */ public static boolean PutFile(String src,String tar) { boolean result = false; Path srcPath = new Path(src); Path tarPath = new Path(tar); try { FileSystem fileSystem = getFileSystem(); fileSystem.copyFromLocalFile(srcPath, tarPath); result = true; } catch (Exception e) { System.out.println("异常:" + e.getMessage()); } return result; } /** * 下载文件至本地磁盘 * @param path HDFS路径 * @return 返回本地磁盘路径 */ public static String DownFile(String path){ int index = path.lastIndexOf('.') > 0 ? path.lastIndexOf('.') : 0; String name = path.substring(index); SimpleDateFormat format=new SimpleDateFormat("yyyyMMddHHmmss"); String target =String.format("{0}\{1}{2}", System.getProperty("user.dir"),format.format(new Date()), name); Path strPath = new Path(path); Path tarPath = new Path(target); try { FileSystem fileSystem = getFileSystem(); fileSystem.copyToLocalFile(strPath, tarPath); } catch (Exception e) { System.out.println("异常:" + e.getMessage()); } return target; } /** * 获取FileSystem实例 * @return * @throws Exception */ public static FileSystem getFileSystem() throws Exception{ return FileSystem.get(new URI(ROOT), config); } }
在main函数中:
public static void main(String[] args) throws Exception { String strDir = "/data/"; String strDirback = "/data/back"; String strFile = "F:\05笔记.txt"; String strPutFilePath = "/data/05笔记.txt"; if(HdfsTest.Mkdir(strDir)){ if(HdfsTest.PutFile(strFile, strPutFilePath)){ if(HdfsTest.Mkdir(strDirback)){ FileStatus[] list = HdfsTest.List(strDir); for (int i = 0; i < list.length; i++) { FileStatus fileStatus = list[i]; final String name = fileStatus.getPath().getName(); final String path = fileStatus.getPath().toString(); final long length = fileStatus.getLen(); final String dir = fileStatus.isDir() ? "d" : "-"; final short replication = fileStatus.getReplication(); final String permission = fileStatus.getPermission().toString(); final String group = fileStatus.getGroup(); final String owner = fileStatus.getOwner(); System.out.println(dir + permission + " " + replication + " " + group + " " + owner + " " + path); } } } } }
这次先到这里。坚持记录点点滴滴!