zoukankan      html  css  js  c++  java
  • 掌握HDFS的Java API接口访问

         HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件)。HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的。

    1、获取文件系统

    复制代码
     1 /**
     2  * 获取文件系统
     3  * 
     4  * @return FileSystem
     5  */
     6 public static FileSystem getFileSystem() {
     7     //读取配置文件
     8     Configuration conf = new Configuration();
     9     // 文件系统
    10     FileSystem fs = null;
    11     
    12     String hdfsUri = HDFSUri;
    13     if(StringUtils.isBlank(hdfsUri)){
    14         // 返回默认文件系统  如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统
    15         try {
    16             fs = FileSystem.get(conf);
    17         } catch (IOException e) {
    18             logger.error("", e);
    19         }
    20     }else{
    21         // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统
    22         try {
    23             URI uri = new URI(hdfsUri.trim());
    24             fs = FileSystem.get(uri,conf);
    25         } catch (URISyntaxException | IOException e) {
    26             logger.error("", e);
    27         }
    28     }
    29         
    30     return fs;
    31 }
    复制代码

     2、创建文件目录

    复制代码
     1 /**
     2  * 创建文件目录
     3  * 
     4  * @param path
     5  */
     6 public static void mkdir(String path) {
     7     try {
     8         // 获取文件系统
     9         FileSystem fs = getFileSystem();
    10         
    11         String hdfsUri = HDFSUri;
    12         if(StringUtils.isNotBlank(hdfsUri)){
    13             path = hdfsUri + path;
    14         }
    15         
    16         // 创建目录
    17         fs.mkdirs(new Path(path));
    18         
    19         //释放资源
    20         fs.close();
    21     } catch (IllegalArgumentException | IOException e) {
    22         logger.error("", e);
    23     }
    24 }
    复制代码

    3、删除文件或者文件目录

    复制代码
     1 /**
     2  * 删除文件或者文件目录
     3  * 
     4  * @param path
     5  */
     6 public static void rmdir(String path) {
     7     try {
     8         // 返回FileSystem对象
     9         FileSystem fs = getFileSystem();
    10         
    11         String hdfsUri = HDFSUri;
    12         if(StringUtils.isNotBlank(hdfsUri)){
    13             path = hdfsUri + path;
    14         }
    15         
    16         // 删除文件或者文件目录  delete(Path f) 此方法已经弃用
    17         fs.delete(new Path(path),true);
    18         
    19         // 释放资源
    20         fs.close();
    21     } catch (IllegalArgumentException | IOException e) {
    22         logger.error("", e);
    23     }
    24 }
    复制代码

    3、根据filter获取目录下的文件

    复制代码
     1 /**
     2  * 根据filter获取目录下的文件
     3  * 
     4  * @param path
     5  * @param pathFilter
     6  * @return String[]
     7  */
     8 public static String[] ListFile(String path,PathFilter pathFilter) {
     9     String[] files = new String[0];
    10     
    11     try {
    12         // 返回FileSystem对象
    13         FileSystem fs = getFileSystem();
    14         
    15         String hdfsUri = HDFSUri;
    16         if(StringUtils.isNotBlank(hdfsUri)){
    17             path = hdfsUri + path;
    18         }
    19         
    20         FileStatus[] status;
    21         if(pathFilter != null){
    22             // 根据filter列出目录内容
    23             status = fs.listStatus(new Path(path),pathFilter);
    24         }else{
    25             // 列出目录内容
    26             status = fs.listStatus(new Path(path));
    27         }
    28         
    29         // 获取目录下的所有文件路径
    30         Path[] listedPaths = FileUtil.stat2Paths(status);
    31         // 转换String[]
    32         if (listedPaths != null && listedPaths.length > 0){
    33             files = new String[listedPaths.length];
    34             for (int i = 0; i < files.length; i++){
    35                 files[i] = listedPaths[i].toString();
    36             }
    37         }
    38         // 释放资源
    39         fs.close();
    40     } catch (IllegalArgumentException | IOException e) {
    41         logger.error("", e);
    42     }
    43     
    44     return files;
    45 }
    复制代码

    4、文件上传至 HDFS

    复制代码
     1 /**
     2  * 文件上传至 HDFS
     3  * 
     4  * @param delSrc
     5  * @param overwrite
     6  * @param srcFile
     7  * @param destPath
     8  */
     9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {
    10     // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt
    11     Path srcPath = new Path(srcFile);
    12     
    13     // 目的路径
    14     String hdfsUri = HDFSUri;
    15     if(StringUtils.isNotBlank(hdfsUri)){
    16         destPath = hdfsUri + destPath;
    17     }
    18     Path dstPath = new Path(destPath);
    19     
    20     // 实现文件上传
    21     try {
    22         // 获取FileSystem对象
    23         FileSystem fs = getFileSystem();
    24         fs.copyFromLocalFile(srcPath, dstPath);
    25         fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
    26         //释放资源
    27         fs.close();
    28     } catch (IOException e) {
    29         logger.error("", e);
    30     }
    31 }
    复制代码

    5、从 HDFS 下载文件

    复制代码
     1 /**
     2  * 从 HDFS 下载文件
     3  * 
     4  * @param srcFile
     5  * @param destPath
     6  */
     7 public static void getFile(String srcFile,String destPath) {
     8     // 源文件路径
     9     String hdfsUri = HDFSUri;
    10     if(StringUtils.isNotBlank(hdfsUri)){
    11         srcFile = hdfsUri + srcFile;
    12     }
    13     Path srcPath = new Path(srcFile);
    14     
    15     // 目的路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/
    16     Path dstPath = new Path(destPath);
    17     
    18     try {
    19         // 获取FileSystem对象
    20         FileSystem fs = getFileSystem();
    21         // 下载hdfs上的文件
    22         fs.copyToLocalFile(srcPath, dstPath);
    23         // 释放资源
    24         fs.close();
    25     } catch (IOException e) {
    26         logger.error("", e);
    27     }
    28 }
    复制代码

    6、获取 HDFS 集群节点信息

    复制代码
     1 /**
     2  * 获取 HDFS 集群节点信息
     3  * 
     4  * @return DatanodeInfo[]
     5  */
     6 public static DatanodeInfo[] getHDFSNodes() {
     7     // 获取所有节点
     8     DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];
     9     
    10     try {
    11         // 返回FileSystem对象
    12         FileSystem fs = getFileSystem();
    13         
    14         // 获取分布式文件系统
    15         DistributedFileSystem hdfs = (DistributedFileSystem)fs;
    16         
    17         dataNodeStats = hdfs.getDataNodeStats();
    18     } catch (IOException e) {
    19         logger.error("", e);
    20     }
    21     return dataNodeStats;
    22 }
    复制代码

    7、查找某个文件在 HDFS集群的位置

    复制代码
     1 /**
     2  * 查找某个文件在 HDFS集群的位置
     3  * 
     4  * @param filePath
     5  * @return BlockLocation[]
     6  */
     7 public static BlockLocation[] getFileBlockLocations(String filePath) {
     8     // 文件路径
     9     String hdfsUri = HDFSUri;
    10     if(StringUtils.isNotBlank(hdfsUri)){
    11         filePath = hdfsUri + filePath;
    12     }
    13     Path path = new Path(filePath);
    14     
    15     // 文件块位置列表
    16     BlockLocation[] blkLocations = new BlockLocation[0];
    17     try {
    18         // 返回FileSystem对象
    19         FileSystem fs = getFileSystem();
    20         // 获取文件目录 
    21         FileStatus filestatus = fs.getFileStatus(path);
    22         //获取文件块位置列表
    23         blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
    24     } catch (IOException e) {
    25         logger.error("", e);
    26     }
    27     return blkLocations;
    28 }
    复制代码

     8、文件重命名

    复制代码
     1 /**
     2  * 文件重命名
     3  * 
     4  * @param srcPath
     5  * @param dstPath
     6  */
     7 public boolean rename(String srcPath, String dstPath){
     8     boolean flag = false;
     9     try    {
    10         // 返回FileSystem对象
    11         FileSystem fs = getFileSystem();
    12         
    13         String hdfsUri = HDFSUri;
    14         if(StringUtils.isNotBlank(hdfsUri)){
    15             srcPath = hdfsUri + srcPath;
    16             dstPath = hdfsUri + dstPath;
    17         }
    18         
    19         flag = fs.rename(new Path(srcPath), new Path(dstPath));
    20     } catch (IOException e) {
    21         logger.error("{} rename to {} error.", srcPath, dstPath);
    22     }
    23     
    24     return flag;
    25 }
    复制代码

    9、判断目录是否存在

    复制代码
     1 /**
     2  * 判断目录是否存在
     3  * 
     4  * @param srcPath
     5  * @param dstPath
     6  */
     7 public boolean existDir(String filePath, boolean create){
     8     boolean flag = false;
     9     
    10     if (StringUtils.isEmpty(filePath)){
    11         return flag;
    12     }
    13     
    14     try{
    15         Path path = new Path(filePath);
    16         // FileSystem对象
    17         FileSystem fs = getFileSystem();
    18         
    19         if (create){
    20             if (!fs.exists(path)){
    21                 fs.mkdirs(path);
    22             }
    23         }
    24         
    25         if (fs.isDirectory(path)){
    26             flag = true;
    27         }
    28     }catch (Exception e){
    29         logger.error("", e);
    30     }
    31     
    32     return flag;
    33 }
    复制代码

         10  查看HDFS文件的最后修改时间  

    1. public void testgetModifyTime() throws Exception {  
    2.         Configuration conf = new Configuration();  
    3.         FileSystem hdfs = FileSystem.get(conf);  
    4.         Path dst = new Path(hdfsPath);  
    5.         FileStatus files[] = hdfs.listStatus(dst);  
    6. for (FileStatus file : files) {  
    7.             System.out.println(file.getPath() + " "  
    8.                     + file.getModificationTime());  
    9.             System.out.println(file.getPath() + " "  
    10.                     + new Date(file.getModificationTime()));  
    11.         } 

       

    1.    // 查看HDFS文件是否存在  
    2.   
    3.     public void testExists() throws Exception {  
    4.   
    5.         Configuration conf = new Configuration();  
    6.           
    7.         FileSystem hdfs = FileSystem.get(conf);  
    8.         Path dst = new Path(hdfsPath + "file01.txt");  
    9.   
    10.         boolean ok = hdfs.exists(dst);  
    11.         System.out.println(ok ? "文件存在" : "文件不存在");  
    12.     }  

        

    1.     // 获取HDFS集群上所有节点名称  
    2.     public void testGetHostName() throws Exception {  
    3.   
    4.         Configuration conf = new Configuration();  
    5.           
    6.         DistributedFileSystem hdfs = (DistributedFileSystem) FileSystem  
    7.                 .get(conf);  
    8.         DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();  
    9.   
    10.         for (DatanodeInfo dataNode : dataNodeStats) {  
    11.             System.out.println(dataNode.getHostName() + " "  
    12.                     + dataNode.getName());  
    13.         }  
    14.     }

      

  • 相关阅读:
    python里面的xlrd模块详解以及样例
    关于DOM的事件操作
    python正则表达式去除文本中间的换行符
    文本分类问题汇总
    pip安装问题
    3NF的无损连接和保持函数依赖的分解、BCNF的无损连接的分解
    Pyhton基本图形绘制
    软件过程模型
    常见算法的时间与空间复杂度
    随笔
  • 原文地址:https://www.cnblogs.com/tyzmzlf/p/7304954.html
Copyright © 2011-2022 走看看