zoukankan      html  css  js  c++  java
  • Hadoop 之 HDFS API操作

    1. 文件上传

    @Slf4j
    public class HDFSClient {
    
          @Test
          public void testCopyFromLocalFile() throws Exception{
            
            // 1. 获取fs对象
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://服务器IP地址:9000"), conf, "root");
            
            // 2. 执行上传API
            fs.copyFromLocalFile(new Path("D:\yase.txt"), new Path("/0526/noodles/testUpload.txt"));
            
            // 3.关闭资源
            fs.close();
            
            // 4. 程序结束
            log.info("文件上传成功!");
          }
    }
    

    2. 操作过程中遇到的BUG

    • "File /0526/noodles/testUpload.txt could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation."
    ### 将log4j.properties的级别更改为DEBUG
    
    java.net.ConnectException: Connection timed out: no further information
    	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
    	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
    	at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:259)
    	at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1692)
    	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1648)
    	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:704)
    
    • 原因:放置Hadoop的服务器,无法访问DataNode服务
      • 可以使用以下语句测试:telnet 192.168.1.102 8020
    • 修改方法:
    ## hdfs-site.xml中配置 或者更换服务器
    <property>
        <name>dfs.datanode.address</name>
        <value>0.0.0.0:可用端口</value>
    </property>
    

    3. 文件下载/文件夹删除/文件重命名

    @Slf4j
    public class HDFSClient {
    
          @Test
          public void testCopyToLocalFile() throws Exception{
            
            // 1. 获取fs对象
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://10.110.147.193:9000"), conf, "root");
            
            // 2. 执行下载操作
            // 参数说明:
            //        1. boolean delSrc 是否将原文件删除
            //        2. Path  src 要下载的文件路径
            //        3. Path  dst 将文件下载到的路径
            //        4. boolean useRawLocalFileSystem  是否开启文件校验
            fs.copyToLocalFile(false, new Path("/0526/noodles/testUpload.txt"), new Path("D:\\download.txt"), true);
    
            // 2.1 文件夹删除
            // fs.delete(new Path("/0526/"), true);
    
            // 2.2 文件重命名
            // fs.rename(new Path("/0526/noodles/testUpload.txt"), new Path("/0526/noodles/abc.txt"));
            
            // 3.关闭资源
            fs.close();
            
            // 4. 程序结束
            log.info("文件下载成功!");
          }
    }
    

    4. 查看文件详情

    • 查看文件名称,权限,长度,块信息
    @Slf4j
    public class HDFSClient {
    
          @Test
          public void testListFiles() throws Exception{
            
            // 1. 获取fs对象
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get(new URI("hdfs://10.110.147.193:9000"), conf, "root");
            
            // 2. 查看文件详情
            RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
            
            while(listFiles.hasNext()) {
              LocatedFileStatus status = listFiles.next();
              
              // 查看文件名称,权限,长度,块信息
              log.info(status.toString());
            }
            
            // 3.关闭资源
            fs.close();
          }
    }
    

    5. 判断是文件还是文件夹

      @Test
    	public void testListStatus() throws Exception{
    		
    		// 1. 获取fs对象
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://10.110.147.193:9000"), conf, "root");
    		
    		// 2. 判断是文件还是文件夹
    		FileStatus[] listStatus = fs.listStatus(new Path("/"));
    		
    		for (FileStatus fileStatus : listStatus) {
    			// 如果是文件
    			if (fileStatus.isFile()) {
    				System.out.println("f:" + fileStatus.getPath().getName());
    			} else {
    				System.out.println("d:" + fileStatus.getPath().getName());
    			}
    		}
    		
    		// 3.关闭资源
    		fs.close();
    	}
    

    6.文件IO流操作

      // 文件上传
      @Test
    	public void testPutFileToHDFS() throws Exception{
    		
    		// 1. 获取fs对象
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://10.110.147.193:9000"), conf, "root");
    		
    		// 2. 创建输入流
    		FileInputStream fis = new FileInputStream(new File("D:\ddd.txt"));
    		
    		// 3. 获取输出流
    		FSDataOutputStream fos = fs.create(new Path("/IOPut.txt"));
    		
    		// 4.流拷贝
    		IOUtils.copyBytes(fis, fos, conf);
    		
    		
    		// 3.关闭资源
    		IOUtils.closeStream(fos);
    		IOUtils.closeStream(fis);
    		fs.close();
    	}
    
    
      // 文件下载
      @Test
    	public void testGetFileToHDFS() throws Exception{
    		
    		// 1. 获取fs对象
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://10.110.147.193:9000"), conf, "root");
    		
    		// 2. 创建输入流
    		FSDataInputStream fis = fs.open(new Path("/IOPut.txt"));
    		
    		// 3. 获取输出流
    		FileOutputStream fos = new FileOutputStream(new File("D:\eee.txt"));
    		
    		// 4.流拷贝
    		IOUtils.copyBytes(fis, fos, conf);
    		
    		// 3.关闭资源
    		IOUtils.closeStream(fos);
    		IOUtils.closeStream(fis);
    		fs.close();
    	}
    

    7. 定位文件读取

    • 需求: 分块读取HDFS上的大文件,比如根目录下的/hadoop-2.8.5.tar.gz;
    // 下载第一块
      @Test
    	public void testReadFirstBlock() throws Exception{
    		
    		// 1. 获取fs对象
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://10.110.147.193:9000"), conf, "root");
    		
    		// 2. 创建输入流
    		FSDataInputStream fis = fs.open(new Path("/hadoop-2.8.5.tar.gz"));
    		
    		// 3. 获取输出流
    		FileOutputStream fos = new FileOutputStream(new File("D:\hadoop-2.8.5.tar.gz.part1"));
    		
    		// 4.流拷贝(只拷贝128M)
    		byte[] buf = new byte[1024];
    		for (int i = 0; i < 1024 * 128; i++) {
    			fis.read(buf);
    			fos.write(buf);
    		}
    		
    		// 3.关闭资源
    		IOUtils.closeStream(fos);
    		IOUtils.closeStream(fis);
    		fs.close();
    	}
    
    
    // 下载第二块
      @Test
    	public void testReadSecondBlock() throws Exception{
    		
    		// 1. 获取fs对象
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://10.110.147.193:9000"), conf, "root");
    		
    		// 2. 创建输入流
    		FSDataInputStream fis = fs.open(new Path("/hadoop-2.8.5.tar.gz"));
    		
    		// 3. 设置指定读取的起点(128M以后)
    		fis.seek(1024*1024*128);
    		
    		// 4. 获取输出流
    		FileOutputStream fos = new FileOutputStream(new File("D:\hadoop-2.8.5.tar.gz.part2"));
    		
    		// 5.流拷贝
    		IOUtils.copyBytes(fis, fos, conf);
    		
    		// 6.关闭资源
    		IOUtils.closeStream(fos);
    		IOUtils.closeStream(fis);
    		fs.close();
    	}
    

    参考资料

  • 相关阅读:
    [kafka] 005_kafka_Java_API
    [kafka] 004_kafka_安装运行
    [kafka] 003_kafka_主要配置
    [kafka] 002_kafka_相关术语详细解析
    [kafka] 001_kafka起步
    [随想感悟] 《归去来兮辞·并序》 赏析
    [hadoop] 一些基础概念
    [kylin] 部署kylin服务
    CSAPP 读书笔记
    ubuntu下安装vmTools, 和共享文件
  • 原文地址:https://www.cnblogs.com/linkworld/p/10940007.html
Copyright © 2011-2022 走看看