zoukankan      html  css  js  c++  java
  • hadoop(三)HDFS基础使用

    一、HDFS前言 

         1、 设计思想
              分而治之:将大文件,大批量文件,分布式的存放于大量服务器上。以便于采取分而治之的方式对海量数据进行运算分析
         2、 在大数据系统架构中的应用
             为各类分布式运算框架( MapReduce, Spark, Tez, Flink, …)提供数据存储服务
         3、 重点概念: 数据块, 负载均衡, 心跳机制, 副本存放策略, 元数据/元数据管理, 安全 模式,机架感知…
    二、HDFS相关概念和特性    

         首先,它是一个文件系统,用于存储文件,通过统一的命名空间——目录树来定位文件   其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角色;
    重要特性如下:
         1、 HDFS 中的文件在物理上是分块存储( block),块的大小可以通过配置参数( dfs.blocksize)   来规定,默认大小在 hadoop2.x 版本中是 128M,老版本中是 64M
         2、 HDFS 文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形 如: hdfs://namenode:port/dir-a/dir-b/dir-c/file.data
         3、 目录结构及文件分块位置信息(元数据)的管理由 namenode 节点承担
              ——namenode 是 HDFS 集群主节点,负责维护整个 hdfs 文件系统的目录树,以及每一个路 径(文件)所对应的 block 块信息( block 的 id,及所在的 datanode 服务器)
        4、 文件的各个 block 的存储管理由 datanode 节点承担
              ---- datanode HDFS 集群从节点,每一个 block 都可以在多个 datanode 上存储多个副本(副 本数量也可以通过参数设置 dfs.replication,默认是 3) 

        5、 HDFS 是设计成适应一次写入,多次读出的场景,且不支持文件的修改,支持追加 (PS:适合用来做数据分析,并不适合用来做网盘应用,因为,不便修改,延迟大,网络开 销大,成本太高)
    三、HDFS优缺点:

         HDFS优点:   

              高容错性
                     数据自动保存多个副本
                     副本丢失后,自动恢复
              适合批处理
                    移动计算而非数据
                    数据位置暴露给计算框架
              适合大数据处理
                    GB、 TB、甚至 PB 级数据
                    百万规模以上的文件数量
                    10K+节点规模
              流式文件访问
                   一次性写入,多次读取
                  保证数据一致性
                  可构建在廉价机器上
                  通过多副本提高可靠性
                  提供了容错和恢复机制

          HDFS缺点:             

                不适于以下操作:
                  低延迟数据访问  :   比如毫秒级     低延迟与高吞吐率
                  小文件存取    : 占用 NameNode 大量内存  寻道时间超过读取时间
                   并发写入、文件随机修改:     一个文件只能有一个写者  仅支持 append

    四、HDFS的shell操作

           常用命令参数介绍:


     
    五、HDFS的Java API操作
         1、利用eclipse 查看HDFS集群的文件信息         

           (1)下载一个 eclipse 开发工具 eclipse-jee-luna-SR1-win32-x86_64.zip
           (2) 解压到一个文件夹 C:myProgrameclipse
           (3) 把 hadoop-eclipse-plugin-2.6.4.jar 放入到 eclipse/plugins 文件夹下
           (4)双击启动 eclipse
           (5) 将 无 jar 版 windows 平台 hadoop-2.6.1.zip 解压到 windows 系统下一个文件夹下,文件 夹的路径最好不要带中文。我的目录是: C:myProgramhadoop-2.6.1
           (6) 打开了 eclipse 之后, 点击 windows –> prefrences -> 会出现一个对话框。找到如图所示 Hadoop MapReduce 选项: 然后把你安装的 hadoop 路径配置上,就是上一步你解压的那 个文件夹: C:myProgramhadoop-2.6.1,然后保存

           (注意的是如果 C:myProgramhadoop-2.6.1下还有一层文件就是hadoop-2.6.1,要写到C:myProgramhadoop-2.6.1hadoop-2.6.1) 

          (7) 然后点击 windows show view other 在列表中找到图中这个东西: 然后双击

     

         (8)然后会出现这么一个显示框

     

        (9)咱们点击红框中这个东西,会出现相应的这么一个对话框,修改相应的信息,

         (10)填完以上信息之后,点击 finish 会出现:

        (11)最重要的时候在左上角的这个地方会出现:

       至此,我们便完成了,利用 hadoop eclipse 插件链接 hdfs 集群实现查看 hdfs 集群文件的 功能,大功告成。

          2、搭建开发环境

            创建Java project ,导入操作HDFS的jar包,导入的jar包有:

          3、FileSystem实例获取讲解          

            在 java 中操作 hdfs,首先要获得一个客户端实例:
               Configuration conf = new Configuration()
               FileSystem fs = FileSystem.get(conf)
           而我们的操作目标是 HDFS,所以获取到的 fs 对象应该是 DistributedFileSystem 的实例; get 方法是从何处判断具体实例化那种客户端类呢?
           ——从 conf 中的一个参数 fs.defaultFS 的配置值判断;
           如果我们的代码中没有指定 fs.defaultFS,并且工程 classpath 下也没有给定相应的配置, conf 中的默认值就来自于 hadoop 的 jar 包中的 core-default.xml,默认值为: file:///,则获取的 将不是一个 DistributedFileSystem 的实例,而是一个本地文件系统的客户端对象

         4、HDFS常用Java API演示

    package com.ghgj.hdfs;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocatedFileStatus;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.RemoteIterator;
    
    public class HDFSDemo {
    	static FileSystem fs = null;
    	static Configuration conf = null;	
    	public static void main(String[] args) throws Exception {
    		init();		
    //		testMkdirs();
    //		testPut();
    //		testGet();
    //		testConf();
    //		testDelete();
    //		testRename();
    //		testList();		
    		close();
    	}
    	
    	public static void testConf(){
    		
    		/**
    		 * 给我们要操作的hdfs集群设置配置信息的方式有三种:
    		 * 第一种:通过代码,用conf.set()
    		 * 第二种:直接接在jar里面所携带的默认的core-default.xml
    		 * 第三种:加载我们项目自带的关于集群配置相关的xml
    		 */
    		
    		/**
    		 * 1 > 3 > 2  加载级别高低,也就是谁后生效
    		 * 加载顺序:默认配置 , 集群安装配置=项目配置文件, 代码里面设置的配置信息
    		 */
    		
    		String fileSystem = conf.get("fs.defaultFS");
    		String string = conf.get("dfs.replication");
    		System.out.println(fileSystem);
    		System.out.println(string);
    		
    //		conf.addResource(new Path("./myxmltext.xml"));
    //		conf.addResource("myxmltext.xml");
    //		System.out.println(conf.get("myname"));
    	}
    	
    	/**
    	 * 初始化fs链接
    	 * @throws IOException
    	 */
    	public static void init()  throws IOException{
    		conf = new Configuration();
    		conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
    		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		fs = FileSystem.get(conf);		
    	}
    	
    	/**
    	 * 关闭fs链接
    	 * @throws IOException
    	 */
    	public static void close()  throws IOException{
    		fs.close();
    	}	
    	/**
    	 * 删除文件夹或者文件
    	 * @throws IOException
    	 */
    	public static void testDelete()   throws IOException{
    		fs.delete(new Path("/abcd"), true);
    	}
    	
    	/**
    	 * 重命名
    	 * @throws IOException
    	 */
    	public static void testRename() throws IOException{
    		fs.rename(new Path("/txt1.txt"), new Path("/huangbo.txt"));
    	}
    	
    	/**
    	 * 遍历文件
    	 * @throws IOException
    	 */
    	public static void testList() throws IOException{
    		
    		// 遍历某个路径下所有的文件节点。包括文件和文件夹
    		/*FileStatus[] listStatus = fs.listStatus(new Path("/"));
    		for(FileStatus fss: listStatus){
    			boolean directory = fss.isDirectory();
    			if(directory){
    				System.out.println(fss.getPath()+" --  文件夹");
    			}else{
    				System.out.println(fss.getPath()+" --  文件");
    			}
    		}*/
    		
    		// 只遍历文件
    		RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
    		while(listFiles.hasNext()){
    			LocatedFileStatus next = listFiles.next();
    			boolean directory = next.isDirectory();
    //			next.getBlockSize()
    			BlockLocation[] blockLocations = next.getBlockLocations();
    			
    			for(BlockLocation bl:  blockLocations){
    				String[] hosts = bl.getHosts();
    				System.out.println("长度:"+bl.getLength());
    				for(String s: hosts){
    					System.out.print(s + "	");
    				}
    				System.out.println(next.getPath());
    			}
    			
    			/*if(directory){
    				System.out.println(next.getPath()+" --  文件夹"+ blockLocations.length);
    			}else{
    				System.out.println(next.getPath()+" --  文件"+ blockLocations.length);
    			}*/
    		}
    	}
    	
    	/**
    	 * s上传文件
    	 * @throws IOException
    	 */
    	public static void testMkdirs() throws IOException{
    		boolean mkdirs = fs.mkdirs(new Path("/abcd/nnn/mmm/uuu/jjj"));
    		System.out.println(mkdirs);
    	}
    	
    	/**
    	 * 上传文件
    	 * @throws IOException
    	 */
    	public static void testPut()  throws IOException{
    		fs.copyFromLocalFile(new Path("C:/wdata/student.txt"), new Path("/abcd"));
    	}
    	
    	/**
    	 * 下载文件
    	 * @throws IOException
    	 */
    	public static void testGet()   throws IOException{
    		fs.copyToLocalFile(new Path("/abcd/student.txt"), new Path("C:/ss.txt"));
    		System.out.println("is done");
    	}
    	
    }
    

      5、HDFS流式数据访问

    相对那些封装好的方法而言的更底层一些的操作方式 上层那些 mapreduce spark 等运算  框架,去 hdfs 中获取数据的时候,就是调的这种底层的 api

    package com.ghgj.hdfs;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class TestHDFSStream {
    
    	static Configuration conf = null;
    	static FileSystem fs = null;
    	
    	public static void main(String[] args) throws Exception {
    		init();
    //		getHDFSFileByStream();
    		putFileToHDFS();
    		close();
    	}
    	
    	/**
    	 * 通过流的方式,去HDFS上下载一个文件下来
    	 * 实现思路:
    	 * 1、利用一个输入流读取该文件的数据
    	 * FSI = fs.open(path);
    	 * 2、......
    	 * 3、获取文件输出流,把输入流的数据给存到本地
    	 */
    	public static void getHDFSFileByStream() throws Exception{
    		
    		// 获取数据输入流
    		Path path = new Path("hdfs://hadoop02:9000/hadoop-2.6.4-centos-6.5.tar.gz");
    		FSDataInputStream open = fs.open(path);
    		
    		// 获取文件输出流
    		FileOutputStream fos = new FileOutputStream(new File("c:/myhadoop.txt"));
    		
    		// 作用:让输入流和输出流对接,进行数据的传输(复制)
    //		IOUtils.copyBytes(open, fos, conf);
    		IOUtils.copyBytes(open, fos, conf.getInt("io.file.buffer.size", 4096), true);
    		
    	}
    	
    	/**
    	 * 实现思路:
    	 * 1、获取一个本地文件的输入流
    	 * 2、、、 
    	 * 3、在hdfs的客户端创建一个输出流,把输入流上的数据复制到hdfs文件系统里面去
    	 */
    	public static void putFileToHDFS() throws Exception{
    		
    		// 获取一个本地文件的输入流
    		FileInputStream fin = new FileInputStream(new File("c:/myhadoop.txt"));
    		
    		// 准备一个hdfs的输出流
    		Path path = new Path("/myhadoop.tar.gz");
    		
    		FSDataOutputStream create = fs.create(path);
    		
    		IOUtils.copyBytes(fin, create, conf);
    	}
    	
    	public static void init() throws Exception{
    		conf = new Configuration();
    		conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
    		System.setProperty("HADOOP_USER_NAME","hadoop");
    
    		fs = FileSystem.get(conf);
    	}
    	
    	public static void close()  throws Exception{
    		fs.close();
    	}
    }
    

        经典案例:

           在 mapreduce 、 spark 等运算框架中,有一个核心思想就是将运算移往数据,或者说,就是 要在并发计算中尽可能让运算本地化,这就需要获取数据所在位置的信息并进行相应范围读 取。 以下模拟实现:获取一个文件的所有 block 位置信息,然后读取指定 block 中的内容

    @Test
    public void testCat() throws IllegalArgumentException, IOException {
    FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
    // 拿到文件信息
    FileStatus[] listStatus = fs.listStatus(new Path(
    "/weblog/input/access.log.10"));
    // 获取这个文件的所有 block 的信息
    BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(
    listStatus[0], 0L, listStatus[0].getLen());
    // 第一个 block 的长度
    long length = fileBlockLocations[0].getLength();
    // 第一个 block 的起始偏移量
    long offset = fileBlockLocations[0].getOffset();
    System.out.println(length);
    System.out.println(offset);
    // 获取第一个 block 写入输出流
    // IOUtils.copyBytes(in, System.out, (int)length);
    byte[] b = new byte[4096];
    FileOutputStream os = new FileOutputStream(new File("d:/block0"));
    while (in.read(offset, b, 0, 4096) != -1) {
    os.write(b);
    offset += 4096;
    if (offset > length)
    return;
    }
    os.flush();
    os.close();
    in.close();
    }
    

      案例:

    package com.ghgj.hdfs;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.BlockLocation;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileStatus;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class TestRandomStream {
    
    	public static void main(String[] args) throws Exception {
    		
    //		testSeek();
    		getSecondBlock();
    	}
    	
    	/**
    	 * 通过HDFS api 取到对应数据对应数据块的数据
    	 * 作用:
    	 * @throws Exception
    	 */
    	public static void getSecondBlock()  throws Exception{
    		Configuration conf = new Configuration();
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop02:9000"), conf, "hadoop");
    		
    		FSDataInputStream open = fs.open(new Path("/hadoop-2.6.4-centos-6.5.tar.gz"));
    		
    		// 134217728
    		// 46595337
    		
    		long offset = 0;
    		long length = 0;
    		
    		FileStatus[] listStatus = fs.listStatus(new Path("/hadoop-2.6.4-centos-6.5.tar.gz"));
    		length = listStatus[0].getLen();
    		
    		BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0, length);
    		// 想要获取到hadoop安装包第二块数据的数据块的起始偏移量
    		offset = fileBlockLocations[1].getOffset();
    		// 设置起始偏移量
    		open.seek(offset);
    		
    		long seekLength = fileBlockLocations[1].getLength();
    		System.out.println(offset+"	"+seekLength);
    		
    		// 构建文件输出流
    		FileOutputStream fout = new FileOutputStream(new File("c:/secondHadoop.tar.gz"));
    		
    		// 把输入流数据,复制到输出流
    		IOUtils.copyBytes(open, fout, seekLength, true);
    		fs.close();
    	}
    	
    	/**
    	 * 从随机位置offset读取任意长度length的数据
    	 * @throws Exception
    	 */
    	public static void testSeek()  throws Exception{
    		Configuration conf = new Configuration();
    //		System.setProperty("HADOOP_USER_NAME", "hadoop");
    		FileSystem fs = FileSystem.get(new URI("hdfs://hadoop02:9000"), conf, "hadoop");
    		
    		FSDataInputStream open = fs.open(new Path("hdfs://hadoop02:9000/shuzi.txt"));
    		
    		long offset = 12;
    		long length = 23L;
    		
    		// 设置起始偏移量
    		open.seek(offset);
    		
    		FileOutputStream fout = new FileOutputStream(new File("c:/myshuzi.txt"));
    		
    		// 注意第三个参数:它是long类型。表示取数据的长度
    		IOUtils.copyBytes(open, fout, length, true);
    		
    		fs.close();
    	}
    	
    	
    }
    

      



         

     

  • 相关阅读:
    linux下使用g++编译cpp工程
    c++字符串互相转换
    MFC双缓冲绘图实例
    Python进阶之迭代器和生成器
    <大话设计模式>笔记
    配置程序成为Linux服务
    Django的设计模式
    Django ModelForm修改默认的控件属性
    Android活动生命周期
    MySQL必知必会笔记
  • 原文地址:https://www.cnblogs.com/liuwei6/p/6675154.html
Copyright © 2011-2022 走看看