zoukankan      html  css  js  c++  java
  • HDFS常用API操作 和 HDFS的I/O流操作

    前置操作

    创建maven工程,修改pom.xml文件:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.mcq</groupId>
      <artifactId>HDFS-001</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <dependencies>
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.logging.log4j</groupId>
    			<artifactId>log4j-core</artifactId>
    			<version>2.8.2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-common</artifactId>
    			<version>2.7.2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-client</artifactId>
    			<version>2.7.2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-hdfs</artifactId>
    			<version>2.7.2</version>
    		</dependency>
    		<dependency>
    			<groupId>jdk.tools</groupId>
    			<artifactId>jdk.tools</artifactId>
    			<version>1.8</version>
    			<scope>system</scope>
    			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
    		</dependency>
    </dependencies>
    
    </project>
    

    在resources添加一个file:log4j.properties:

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
    

    API操作

    HDFS的命令和linux极其相似,可以类比记忆,在这里列出一些java api操作:

    package com.mcq;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.junit.Test;
    
    public class HDFSClient {
    	public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		// c.set("fs.defaultFS", "hdfs://hadoop103:9000");
    		// FileSystem fs = FileSystem.get(c);
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.mkdirs(new Path("/ppqq"));
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test // 文件上传
    	public void testCopyFromLocalFile()
    			throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.copyFromLocalFile(new Path("d:/banzhang.txt"), new Path("/banzhang.txt"));
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test // 文件下载
    	public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("d:/hadoop test/banhua.txt"), true);
    		// 第一个false表示不剪切,最后一个true表示本地,不产生crc文件
    
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test // 文件删除
    	public void testDelete() throws IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.delete(new Path("/0811"), true); // 是否递归删除
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test // 文件更名
    	public void testRename() throws IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.rename(new Path("/banzhang.txt"), new Path("/lala.txt"));
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test
    	public void testListFiles() throws IOException, InterruptedException, URISyntaxException {
    
    		// 1获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    
    		// 2 获取文件详情
    		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
    
    		while (listFiles.hasNext()) {
    			LocatedFileStatus status = listFiles.next();
    
    			// 输出详情
    			// 文件名称
    			System.out.println(status.getPath().getName());
    			// 长度
    			System.out.println(status.getLen());
    			// 权限
    			System.out.println(status.getPermission());
    			// 分组
    			System.out.println(status.getGroup());
    
    			// 获取存储的块信息
    			BlockLocation[] blockLocations = status.getBlockLocations();
    
    			for (BlockLocation blockLocation : blockLocations) {
    
    				// 获取块存储的主机节点
    				String[] hosts = blockLocation.getHosts();
    
    				for (String host : hosts) {
    					System.out.println(host);
    				}
    			}
    
    			System.out.println("-----------分割线----------");
    		}
    
    		// 3 关闭资源
    		fs.close();
    	}
    	
    	@Test
    	public void testListStatus() throws IOException, InterruptedException, URISyntaxException{
    			
    		// 1 获取文件配置信息
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    			
    		// 2 判断是文件还是文件夹
    		FileStatus[] listStatus = fs.listStatus(new Path("/"));
    			
    		for (FileStatus fileStatus : listStatus) {
    			
    			// 如果是文件
    			if (fileStatus.isFile()) {
    					System.out.println("f:"+fileStatus.getPath().getName());
    				}else {
    					System.out.println("d:"+fileStatus.getPath().getName());
    				}
    			}
    			
    		// 3 关闭资源
    		fs.close();
    	}
    }
    

     I/O流操作

    上面的API操作 HDFS系统都是框架封装好的,如果我们想自己实现上述API操作可以采用IO流的方式实现数据的上传和下载。

    package com.mcq;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.yarn.api.records.URL;
    import org.junit.Test;
    
    public class HDFSIO {
    	//文件上传
    	@Test
    	public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
    
    		// 1 获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    
    		// 2 创建输入流
    		FileInputStream fis = new FileInputStream(new File("d:/banzhang.txt"));
    
    		// 3 获取输出流
    		FSDataOutputStream fos = fs.create(new Path("/xiaocao.txt"));
    
    		// 4 流对拷
    		IOUtils.copyBytes(fis, fos, configuration);
    
    		// 5 关闭资源
    		IOUtils.closeStream(fos);
    		IOUtils.closeStream(fis);
    		fs.close();
    	}
    	// 文件下载
    	@Test
    	public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{
    
    		// 1 获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    			
    		// 2 获取输入流
    		FSDataInputStream fis = fs.open(new Path("/banhua.txt"));
    			
    		// 3 获取输出流
    		FileOutputStream fos = new FileOutputStream(new File("d:/banhua.txt"));
    			
    		// 4 流的对拷
    		IOUtils.copyBytes(fis, fos, configuration);
    			
    		// 5 关闭资源
    		IOUtils.closeStream(fos);
    		IOUtils.closeStream(fis);
    		fs.close();
    	}
    	//定位文件读取
    	//(1)下载第一块
    	@Test
    	public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{
    
    		// 1 获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    			
    		// 2 获取输入流
    		FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
    			
    		// 3 创建输出流
    		FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));
    			
    		// 4 流的拷贝
    		byte[] buf = new byte[1024];
    			
    		for(int i =0 ; i < 1024 * 128; i++){
    			fis.read(buf);
    			fos.write(buf);
    		}
    			
    		// 5关闭资源
    		IOUtils.closeStream(fis);
    		IOUtils.closeStream(fos);
    	fs.close();
    	}
    	//(2)下载第二块
    	@Test
    	public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{
    
    		// 1 获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    			
    		// 2 打开输入流
    		FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
    			
    		// 3 定位输入数据位置
    		fis.seek(1024*1024*128);
    			
    		// 4 创建输出流
    		FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));
    			
    		// 5 流的对拷
    		IOUtils.copyBytes(fis, fos, configuration);
    			
    		// 6 关闭资源
    		IOUtils.closeStream(fis);
    		IOUtils.closeStream(fos);
    	}
    }
    
  • 相关阅读:
    eclipse中文乱码问题解决方案
    修改Tomcat的JDK目录
    Tomcat 5.5 修改服务器的侦听端口
    HTML DOM教程 27HTML DOM Button 对象
    HTML DOM教程 24HTML DOM Frameset 对象
    Navicat for MySQL v8.0.27 的注册码
    HTML DOM教程 25HTML DOM IFrame 对象
    Tomcat 5.5 的下载和安装
    android manifest相关属性
    ubuntu10.04 下 eclipse 小结
  • 原文地址:https://www.cnblogs.com/mcq1999/p/11769328.html
Copyright © 2011-2022 走看看