zoukankan      html  css  js  c++  java
  • HDFS常用API操作 和 HDFS的I/O流操作

    前置操作

    创建maven工程,修改pom.xml文件:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.mcq</groupId>
      <artifactId>HDFS-001</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <dependencies>
    		<dependency>
    			<groupId>junit</groupId>
    			<artifactId>junit</artifactId>
    			<version>RELEASE</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.logging.log4j</groupId>
    			<artifactId>log4j-core</artifactId>
    			<version>2.8.2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-common</artifactId>
    			<version>2.7.2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-client</artifactId>
    			<version>2.7.2</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.hadoop</groupId>
    			<artifactId>hadoop-hdfs</artifactId>
    			<version>2.7.2</version>
    		</dependency>
    		<dependency>
    			<groupId>jdk.tools</groupId>
    			<artifactId>jdk.tools</artifactId>
    			<version>1.8</version>
    			<scope>system</scope>
    			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
    		</dependency>
    </dependencies>
    
    </project>
    

    在resources添加一个file:log4j.properties:

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
    log4j.appender.logfile=org.apache.log4j.FileAppender
    log4j.appender.logfile.File=target/spring.log
    log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
    log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
    

    API操作

    HDFS的命令和linux极其相似,可以类比记忆,在这里列出一些java api操作:

    package com.mcq;
    
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    import org.junit.Test;
    
    public class HDFSClient {
    	public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		// c.set("fs.defaultFS", "hdfs://hadoop103:9000");
    		// FileSystem fs = FileSystem.get(c);
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.mkdirs(new Path("/ppqq"));
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test // 文件上传
    	public void testCopyFromLocalFile()
    			throws IllegalArgumentException, IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.copyFromLocalFile(new Path("d:/banzhang.txt"), new Path("/banzhang.txt"));
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test // 文件下载
    	public void testCopyToLocalFile() throws IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.copyToLocalFile(false, new Path("/banzhang.txt"), new Path("d:/hadoop test/banhua.txt"), true);
    		// 第一个false表示不剪切,最后一个true表示本地,不产生crc文件
    
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test // 文件删除
    	public void testDelete() throws IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.delete(new Path("/0811"), true); // 是否递归删除
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test // 文件更名
    	public void testRename() throws IOException, InterruptedException, URISyntaxException {
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), conf, "mcq");
    		fs.rename(new Path("/banzhang.txt"), new Path("/lala.txt"));
    		fs.close();
    		System.out.println("over");
    	}
    
    	@Test
    	public void testListFiles() throws IOException, InterruptedException, URISyntaxException {
    
    		// 1获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    
    		// 2 获取文件详情
    		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
    
    		while (listFiles.hasNext()) {
    			LocatedFileStatus status = listFiles.next();
    
    			// 输出详情
    			// 文件名称
    			System.out.println(status.getPath().getName());
    			// 长度
    			System.out.println(status.getLen());
    			// 权限
    			System.out.println(status.getPermission());
    			// 分组
    			System.out.println(status.getGroup());
    
    			// 获取存储的块信息
    			BlockLocation[] blockLocations = status.getBlockLocations();
    
    			for (BlockLocation blockLocation : blockLocations) {
    
    				// 获取块存储的主机节点
    				String[] hosts = blockLocation.getHosts();
    
    				for (String host : hosts) {
    					System.out.println(host);
    				}
    			}
    
    			System.out.println("-----------分割线----------");
    		}
    
    		// 3 关闭资源
    		fs.close();
    	}
    	
    	@Test
    	public void testListStatus() throws IOException, InterruptedException, URISyntaxException{
    			
    		// 1 获取文件配置信息
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    			
    		// 2 判断是文件还是文件夹
    		FileStatus[] listStatus = fs.listStatus(new Path("/"));
    			
    		for (FileStatus fileStatus : listStatus) {
    			
    			// 如果是文件
    			if (fileStatus.isFile()) {
    					System.out.println("f:"+fileStatus.getPath().getName());
    				}else {
    					System.out.println("d:"+fileStatus.getPath().getName());
    				}
    			}
    			
    		// 3 关闭资源
    		fs.close();
    	}
    }
    

     I/O流操作

    上面的API操作 HDFS系统都是框架封装好的,如果我们想自己实现上述API操作可以采用IO流的方式实现数据的上传和下载。

    package com.mcq;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.URI;
    import java.net.URISyntaxException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.yarn.api.records.URL;
    import org.junit.Test;
    
    public class HDFSIO {
    	//文件上传
    	@Test
    	public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {
    
    		// 1 获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    
    		// 2 创建输入流
    		FileInputStream fis = new FileInputStream(new File("d:/banzhang.txt"));
    
    		// 3 获取输出流
    		FSDataOutputStream fos = fs.create(new Path("/xiaocao.txt"));
    
    		// 4 流对拷
    		IOUtils.copyBytes(fis, fos, configuration);
    
    		// 5 关闭资源
    		IOUtils.closeStream(fos);
    		IOUtils.closeStream(fis);
    		fs.close();
    	}
    	// 文件下载
    	@Test
    	public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{
    
    		// 1 获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    			
    		// 2 获取输入流
    		FSDataInputStream fis = fs.open(new Path("/banhua.txt"));
    			
    		// 3 获取输出流
    		FileOutputStream fos = new FileOutputStream(new File("d:/banhua.txt"));
    			
    		// 4 流的对拷
    		IOUtils.copyBytes(fis, fos, configuration);
    			
    		// 5 关闭资源
    		IOUtils.closeStream(fos);
    		IOUtils.closeStream(fis);
    		fs.close();
    	}
    	//定位文件读取
    	//(1)下载第一块
    	@Test
    	public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{
    
    		// 1 获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    			
    		// 2 获取输入流
    		FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
    			
    		// 3 创建输出流
    		FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));
    			
    		// 4 流的拷贝
    		byte[] buf = new byte[1024];
    			
    		for(int i =0 ; i < 1024 * 128; i++){
    			fis.read(buf);
    			fos.write(buf);
    		}
    			
    		// 5关闭资源
    		IOUtils.closeStream(fis);
    		IOUtils.closeStream(fos);
    	fs.close();
    	}
    	//(2)下载第二块
    	@Test
    	public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{
    
    		// 1 获取文件系统
    		Configuration configuration = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop103:9000"), configuration, "mcq");
    			
    		// 2 打开输入流
    		FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));
    			
    		// 3 定位输入数据位置
    		fis.seek(1024*1024*128);
    			
    		// 4 创建输出流
    		FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));
    			
    		// 5 流的对拷
    		IOUtils.copyBytes(fis, fos, configuration);
    			
    		// 6 关闭资源
    		IOUtils.closeStream(fis);
    		IOUtils.closeStream(fos);
    	}
    }
    
  • 相关阅读:
    【2016-11-6】【坚持学习】【Day21】【主窗口关闭时,同步关闭它的子窗口】
    【2016-11-5】【坚持学习】【Day20】【通过委托事件,关闭窗口】
    【2016-11-5】【坚持学习】【Day20】【Linq where in 语句】
    【2016-11-4】【坚持学习】【Day19】【MVVM ICommand】
    【2016-11-5】【坚持学习】【Day20】【MVVM DelegateCommand】
    【2016-11-3】【坚持学习】【Day18】【我认识的ORM】
    【2016-11-3】【坚持学习】【Day18】【Oracle 数据类型 与C#映射关系】
    【2016-11-3】【坚持学习】【Day18】【ADO.NET 】
    【2016-11-2】【坚持学习】【Day17】【通过反射自动将datareader转为实体info】
    【2016-11-2】【坚持学习】【Day17】【微软 推出的SQLHelper】
  • 原文地址:https://www.cnblogs.com/mcq1999/p/11769328.html
Copyright © 2011-2022 走看看