zoukankan      html  css  js  c++  java
  • hafs的java_api(转载)

    转载自:http://www.cuiweiyou.com/1405.html

    public class TestHdfs {
            private FileSystem hdfs;
            private Configuration conf;
            
          @Test
          // 1 创建空文件
          public void testCreateNewFile() throws Exception {
              conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");    // core-site.xml中到配置
            conf.set("mapred.jop.tracker", "localhost:9001");
            
            hdfs = FileSystem.get(conf);
            //创建空文件
            hdfs.createNewFile(new Path("hdfs:/newfile.txt"));
            hdfs.close();
          }
          
          @Test
          // 2 遍历指定目录及其子目录内的文件
          public void testListFiles() throws Exception {
            // 1.配置器
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            // 2.文件系统
            hdfs = FileSystem.get(conf);
            // 3.遍历HDFS上的文件
            RemoteIterator<LocatedFileStatus> fs = hdfs.listFiles(new Path("hdfs:/"), true);
            while(fs.hasNext()){
              System.out.println(fs.next());
            }
            hdfs.close();
          }
         
          @Test
          // 3 查看目录/文件状态
            public void listStatus() throws Exception {
                // 1.创建配置器 
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            
            // 2.创建文件系统 
            hdfs = FileSystem.get(conf);
            // 3.遍历HDFS上的文件和目录 
            FileStatus[] fs = hdfs.listStatus(new Path("hdfs:/")); 
            if (fs.length > 0) { 
              for (FileStatus f : fs) { 
                showDir(f);
              }
            }else{
                System.out.println("nothing...");
            }
            hdfs.close();
            }
          
          // 4 判断是目录,还是文件
          private void showDir(FileStatus fs) throws Exception {
            Path path = fs.getPath();
            System.out.println(path);
            // 如果是目录
            if (fs.isDirectory()) {
              FileStatus[] f = hdfs.listStatus(path);
              if (f.length > 0) {
                for (FileStatus file : f) {
                  showDir(file);
                }
              }
            }
            // 如果是文件
            if (fs.isFile()){
                //fs.getXXX();
              long time = fs.getModificationTime();   
              System.out.println("HDFS文件的最后修改时间:"+new Date(time));
            }
          }
          
          @Test
          // 5 判断存在
          public void testExists() throws Exception {
            // 1.创建配置器  
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            //2.创建文件系统  
            hdfs = FileSystem.get(conf);  
            //3.创建可供hadoop使用的文件系统路径
            Path file = new Path("hdfs:/test.txt");  
            // 4.判断文件是否存在(文件目标路径)  
            System.out.println("文件存在:" + hdfs.exists(file));
            hdfs.close();
          }
          
          @Test
          // 6 向文件写入数据
          public void testAppend() throws Exception {
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            /* 在hdfs-site.xml中配置了没啥用
        org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException): 
            Failed to APPEND_FILE /newfile.txt for DFSClient_NONMAPREDUCE_-610333487_1 on 127.0.0.1 
            because this file lease is currently owned by DFSClient_NONMAPREDUCE_1541357537_1 on 127.0.0.1
            */
            conf.set("dfs.client.block.write.replace-datanode-on-failure.policy" ,"NEVER" );
            conf.set("dfs.client.block.write.replace-datanode-on-failure.enable" ,"true" );
            /*
        java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. 
        (Nodes: current=[DatanodeInfoWithStorage[127.0.0.1:50010,DS-b1c29ca4-24f7-4447-a12b-5ae261663431,DISK]], 
        original=[DatanodeInfoWithStorage[127.0.0.1:50010,DS-b1c29ca4-24f7-4447-a12b-5ae261663431,DISK]]). 
        The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
                 */
            conf.set("dfs.support.append", "true");
            
                hdfs = FileSystem.get(conf);
                Path path = new Path("hdfs:/newfile.txt");
                if (!hdfs.exists(path)) {
                    hdfs.create(path);
                    hdfs.close();
                    hdfs = FileSystem.get(conf);
                } 
                
                FSDataOutputStream out = hdfs.append(path);
                out.write("4每次执行次方法只能有一个write语句?!!!
    ".getBytes("UTF-8"));
                out.write("5每次执行次方法只能有一个write语句?!!!
    ".getBytes("UTF-8"));
                out.close();
                
                hdfs.close();
          }
          
          @Test
          // 7 读文件
          public void testOpen() throws Exception {
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            
            hdfs = FileSystem.get(conf);
            Path path = new Path("hdfs:/newfile.txt");
            FSDataInputStream is = hdfs.open(path);
            FileStatus stat = hdfs.getFileStatus(path);
            byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
            is.readFully(0, buffer);
            is.close();
            hdfs.close();
            System.out.println(new String(buffer));
          }
          
          @Test
          // 8 重命名文件
          public void testRename() throws Exception {
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            hdfs = FileSystem.get(conf);
            
            //重命名:fs.rename(源文件,新文件)
            boolean rename = hdfs.rename(new Path("/newfile.txt"), new Path("/test.txt"));
            System.out.println(rename);
            
            hdfs.close();
          }
          
          @Test
          // 8 删除目录/文件
          public void testDelete() throws Exception {
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            hdfs = FileSystem.get(conf);
            
            //判断删除(路径,true=非空也删除。false=非空时不删除,抛RemoteException、IOException异常)
            boolean delete = hdfs.delete(new Path("hdfs:/test.txt"), true);
            System.out.println("执行删除:"+delete);
            //FileSystem关闭时执行
            boolean exit = hdfs.deleteOnExit(new Path("/out.txt"));
            System.out.println("执行删除:"+exit);
            
            hdfs.close();
          }
          
            @Test
            // 10 创建目录/文件。父目录不存在则直接创建
          public void testCreate() throws Exception {
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            hdfs = FileSystem.get(conf);
            
            // 使用HDFS数据输出流(写)对象 在HDSF上根目录创建一个文件夹,其内再创建文件
            FSDataOutputStream out = hdfs.create(new Path("hdfs:/vigiles/eminem.txt"));
            // 在文件中写入一行数据,必须使用UTF-8
            out.write("痞子阿姆,Hello !".getBytes("UTF-8"));
            out.flush();
            
            out = hdfs.create(new Path("/vigiles/alizee.txt"));
            out.write("艾莉婕,Hello !".getBytes("UTF-8"));
            
            out.close();
            
            FSDataInputStream is = hdfs.open(new Path("hdfs:/vigiles/alizee.txt"));
            FileStatus stat = hdfs.getFileStatus(new Path("hdfs:/vigiles/alizee.txt"));
            byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
            is.readFully(0, buffer);
            is.close();
            hdfs.close();
            System.out.println(new String(buffer));
          }
            
          @Test
          // 11 创建目录
          public void testMkdir() throws Exception {
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            hdfs = FileSystem.get(conf);
            //创建目录
            hdfs.mkdirs(new Path("hdfs:/eminem1"));
            hdfs.mkdirs(new Path("hdfs:/eminem2"));
            hdfs.mkdirs(new Path("hdfs:/eminem3"));
            hdfs.close();
          }
          
          @Test
          // 12 文件备份状态
          public void testGetFileBlockLocations() throws Exception {
            //1.配置器
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            //2.文件系统
            hdfs = FileSystem.get(conf);
            //3.已存在的,必须是文件
            Path path = new Path("hdfs:/vigiles/alizee.txt");
            //4.文件状态
            FileStatus status = hdfs.getFileStatus(path);
            //5.文件块
            //BlockLocation[] blockLocations = fs.getFileBlockLocations(status, 0, status.getLen());  //方法1,传入文件的FileStatus
            BlockLocation[] blockLocations = hdfs.getFileBlockLocations(path, 0, status.getLen());  //方法2,传入文件的Path 
            int blockLen = blockLocations.length;
            System.err.println("块数量:"+blockLen);  //如果文件不够大,就不会分块,即得到1
            // 遍历文件块到信息
            for (int i = 0; i < blockLen; i++) {
              //得到块文件大小
              long sizes = blockLocations[i].getLength();
              System.err.println("块大小:"+sizes);
              
              //按照备份数量得到全部主机名
              String[] hosts = blockLocations[i].getHosts();
              for (String host : hosts) {
                System.err.println("主机名:"+host);
              }
              
              //按照备份数量得到全部主机名
              String[] names = blockLocations[i].getNames();
              for (String name : names) {
                System.err.println("IP:"+ name);
              }
            }
            
            hdfs.close();
          }
         
          @Test
          // 13 上传文件
          public void testCopyFromLocalFile() throws Exception {
            // 1.创建配置器  
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            //2.创建文件系统  
            hdfs = FileSystem.get(conf);  
            //3.创建可供hadoop使用的文件系统路径  
            Path src = new Path("file:/root/xx.txt"); //本地目录/文件  
            Path dst = new Path("hdfs:/");  //目标目录/文件 
            // 4.拷贝本地文件上传(本地文件,目标路径)  
            hdfs.copyFromLocalFile(src, dst);  
            System.out.println("文件上传成功至:" + conf.get("fs.default.name"));  
            // 5.列出HDFS上的文件  
            FileStatus[] fs = hdfs.listStatus(dst);  
            for (FileStatus f : fs) {   
              System.out.println(f.getPath());  
            }
            
            hdfs.close();
          }
          
          @Test
          // 14 下载文件到本地
          public void testCopyToLocalFile() throws Exception {
            conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://localhost:9000");
            conf.set("mapred.jop.tracker", "localhost:9001");
            
            hdfs = FileSystem.get(conf);  
            //创建HDFS源路径和本地目标路径
            Path src = new Path("hdfs:/xx.txt");  //目标目录/文件 
            Path dst = new Path("file:/root/桌面/new.txt"); //本地目录/文件  
            //拷贝本地文件上传(本地文件,目标路径)  
            hdfs.copyToLocalFile(src, dst);
            
            hdfs.close();
          }
        }
  • 相关阅读:
    Spring IoC和AOP使用扩展(二)
    Spring核心概念(一)
    MyBatis的动态SQL(五)
    MyBatis的SQL映射文件(四)
    初始myBatis(三)
    初始myBatis(二)
    微信小程序学习九 事件系统
    微信小程序学习八 wxs
    微信小程序学习七 视图层wxml语法
    微信小程序学习六 模块化
  • 原文地址:https://www.cnblogs.com/chenyansong/p/5514639.html
Copyright © 2011-2022 走看看