zoukankan      html  css  js  c++  java
  • HDFS详解

    HDFS基本概念

    1、HDFS设计思想

    分而治之:将大文件、大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析

    2、概念和特性

    概念:HDFS是一个分布式文件系统

    特性:

    (1)HDFS中的文件在物理上是分块存储(block,块的大小可以通过配置参数( dfs.blocksize)来规定,默认大小在hadoop2.x版本中是128M,老版本中是64M

    (2)HDFS文件系统会给客户端提供一个统一的抽象目录树,客户端通过路径来访问文件,形如:hdfs://namenode:port/dir-a/dir-b/dir-c/file.data

    (3)目录结构及文件分块信息(元数据)的管理由namenode节点承担

    ——namenodeHDFS集群主节点,负责维护整个hdfs文件系统的目录树,以及每一个路径(文件)所对应的block块信息(blockid,及所在的datanode服务器)

    (4)文件的各个block的存储管理由datanode节点承担

    ---- datanodeHDFS集群从节点,每一个block都可以在多个datanode上存储多个副本(副本数量也可以通过参数设置dfs.replication

    (5)HDFS是设计成适应一次写入,多次读出的场景,且不支持文件的修改

    (注:适合用来做数据分析,并不适合用来做网盘应用,因为,不便修改,延迟大,网络开销大,成本太高)


    HDFS基本操作(shell操作)

    1、HDFS命令行客户端使用

    2、命令行客户端支持的命令参数

    3、常用命令参数介绍

    -help             

    功能:输出这个命令参数手册

    hadoop fs -help

    -ls                  

    功能:显示目录信息

    示例: hadoop fs -ls hdfs://hadoop1:9000/

    备注:这些参数中,所有的hdfs路径都可以简写

    -->hadoop fs -ls /   等同于上一条命令的效果hadoop fs -ls -R / 会列出所有嵌套文件 

    -mkdir              

    功能:在hdfs上创建目录

    示例:hadoop fs  -mkdir  -p  /aaa/bbb/cc/dd

    -moveFromLocal            

    功能:从本地剪切粘贴到hdfs

    示例:hadoop  fs  - moveFromLocal  /home/hadoop/a.txt  /aaa/bbb/cc/dd

    -moveToLocal              

    功能:从hdfs剪切粘贴到本地

    示例:hadoop  fs  - moveToLocal   /aaa/bbb/cc/dd  /home/hadoop/a.txt 

    --appendToFile  

    功能:追加一个文件到已经存在的文件末尾

    示例:hadoop  fs  -appendToFile  ./hello.txt  hdfs://hadoop-server01:9000/hello.txt

    可以简写为:

    hadoop  fs  -appendToFile  ./hello.txt  /hello.txt

    -cat  

    功能:显示文件内容  

    示例:hadoop fs -cat  /hello.txt

    -tail                 

    功能:显示一个文件的末尾

    示例:hadoop  fs  -tail  /weblog/access_log.1

    -text                  

    功能:以字符形式打印一个文件的内容

    示例:hadoop  fs  -text  /weblog/access_log.1

    -chgrp

    -chmod

    -chown

    功能:linux文件系统中的用法一样,对文件所属权限

    示例:

    hadoop  fs  -chmod  666  /hello.txt

    hadoop  fs  -chown  someuser:somegrp   /hello.txt

    -copyFromLocal    

    功能:从本地文件系统中拷贝文件到hdfs路径去

    示例:hadoop  fs  -copyFromLocal  ./jdk.tar.gz  /aaa/

    -copyToLocal      

    功能:从hdfs拷贝到本地

    示例:hadoop fs -copyToLocal /aaa/jdk.tar.gz   /

    -cp              

    功能:从hdfs的一个路径拷贝hdfs的另一个路径

    示例: hadoop  fs  -cp  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2

    -mv                     

    功能:在hdfs目录中移动文件

    示例: hadoop  fs  -mv  /aaa/jdk.tar.gz  /

    -get              

    功能:等同于copyToLocal,就是从hdfs下载文件到本地

    示例:hadoop fs  -get  /aaa/jdk.tar.gz   /

    -getmerge             

    功能:合并下载多个文件

    示例:比如hdfs的目录 /aaa/下有多个文件:log.1, log.2,log.3,...

    hadoop fs -getmerge  /aaa/log.*   ./log.sum

    -put                

    功能:等同于copyFromLocal

    示例:hadoop  fs  -put  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2

    -rm                

    功能:删除文件或文件夹

    示例:hadoop fs -rm -r /aaa/bbb/

    -rmdir                 

    功能:删除空目录

    示例:hadoop  fs  -rmdir   /aaa/bbb/ccc

    -df               

    功能:统计文件系统的可用空间信息

    示例:hadoop  fs  -df  -h  /

    -du

    功能:统计文件夹的大小信息

    示例:

    hadoop  fs  -du  -s  -h /aaa/*

    -count         

    功能:统计一个指定目录下的文件节点数量

    示例:hadoop  fs  -count  /aaa/

    -setrep                

    功能:设置hdfs中文件的副本数量

    示例:hadoop fs -setrep 3 /aaa/jdk.tar.gz

    <这里设置的副本数只是记录在namenode的元数据中,是否真的会有这么多副本,还得看datanode的数量>


    HDFS原理

    1、HDFS的工作机制

    1.1概述

    1. HDFS集群分为两大角色:NameNodeDataNode
    2. NameNode负责管理整个文件系统的元数据
    3. DataNode 负责管理用户的文件数据块
    4. 文件会按照固定的大小(blocksize)切成若干块后分布式存储在若干台datanode
    5. 每一个文件块可以有多个副本,并存放在不同的datanode
    6. Datanode会定期向Namenode汇报自身所保存的文件block信息,而namenode则会负责保持文件的副本数量
    7. HDFS的内部工作机制对客户端保持透明,客户端请求访问HDFS都是通过向namenode申请来进行

    1.2HDFS写数据流程

    1.2.1概述

    客户端要向HDFS写数据,首先要跟namenode通信以确认可以写文件并获得接收文件blockdatanode,然后,客户端按顺序将文件逐个block传递给相应datanode,并由接收到blockdatanode负责向其他datanode复制block的副本

    1.2.2详细步骤图(上传文件)

    1.2.3详细步骤解析

    1、client跟namenode通信,请求上传文件,namenode检查目录树中目标文件是否已存在?父目录是否存在?

    2namenode返回是否可以上传

    3client请求第一个block该传输到哪些datanode服务器上

    4namenode查询DataNode信息,然后返回3个可用的datanode服务器ABC给client

    5client请求3台DataNode中的一台A上传数据(本质上是一个RPC调用,建立pipeline),A收到请求会继续调用B,然后B调用C,将整个pipeline建立完成,逐级返回客户端

    6client开始往A上传第一个block(先从磁盘读取数据放到一个本地内存缓存),以packet为单位,A收到一个packet就会传给BB传给CA每传一个packet会放入一个应答队列等待应答

    7、当一个block传输完成之后,client再次请求namenode上传第二个block的服务器。

    1.3HDFS读数据流程

    1.3.1概述

    客户端将要读取的文件路径发送给namenodenamenode获取文件的元信息(主要是block的存放位置信息)返回给客户端,客户端根据返回的信息找到相应datanode逐个获取文件的block并在客户端本地进行数据追加合并从而获得整个文件

    1.3.2详细步骤图(下载文件)

    1.3.3详细步骤解析

    1、client跟namenode通信请求读取文件,namenode查询元数据,找到文件块所在的datanode服务器,然后将每个文件块所在的DataNode服务器返回给client

    2、挑选一台datanode(就近原则,然后随机)服务器,请求建立socket

    3datanode开始发送数据(从磁盘里面读取数据放入流,以packet为单位来做校验)

    4、客户端以packet为单位接收,先在本地缓存,然后写入目标文件

    2、NameNode工作机制

    2.1namenode职责

      负责客户端青请求的响应

      元数据的管理(查询、修改)

    2.2元数据管理

    namenode对数据的管理采用了三种存储形式:

      内存元数据(NameSystem)

      磁盘元数据镜像文件

      数据操作日志文件(可通过日志运算出元数据)

    2.2.1元数据存储机制

    A、内存中有一份完整的元数据(内存meta data)

    B、磁盘有一个“准完整”的元数据镜像(fsimage)文件(namenode的工作目录中)

    C、用于衔接内存metadata和持久化元数据镜像fsimage之间的操作日志(edits文件

    注:当客户端对hdfs中的文件进行新增或者修改操作,操作记录首先被记入edits日志文件中,当客户端操作成功后,相应的元数据会更新到内存meta.data

    2.2.2元数据手动查看

    可以通过hdfs的一个工具来查看edits中的信息

      bin/hdfs oev -i edits -o edits.xml

      bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml

    2.2.3元数据的checkpoint

    每隔一段时间,会由secondary namenodenamenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merge(这个过程称为checkpoint

    checkpoint的详细过程

    checkpoint操作的触发条件配置参数

    dfs.namenode.checkpoint.check.period=60  #检查触发条件是否满足的频率,60

    dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary

    #以上两个参数做checkpoint操作时,secondary namenode的本地工作目录

    dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}

    dfs.namenode.checkpoint.max-retries=3  #最大重试次数

    dfs.namenode.checkpoint.period=3600  #两次checkpoint之间的时间间隔3600秒

    dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间最大的操作记录

    checkpoint的附带作用

    namenodesecondary namenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据

    3、DataNode工作机制

    3.1 概述

    1Datanode工作职责:

    存储管理用户的文件块数据

    定期向namenode汇报自身所持有的block信息(通过心跳信息上报)

    (这点很重要,因为,当集群中发生某些block副本失效时,集群如何恢复block初始副本数量的问题

    <property>

    <name>dfs.blockreport.intervalMsec</name>

    <value>3600000</value>

    <description>Determines block reporting interval in milliseconds.</description>

    </property>

    2Datanode掉线判断时限参数

    datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:

    timeout  = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval

    而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。

    需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。

    <property>

            <name>heartbeat.recheck.interval</name>

            <value>2000</value>

    </property>

    <property>

            <name>dfs.heartbeat.interval</name>

            <value>1</value>

    </property>

    3.2 观察验证DATANODE功能

    上传一个文件,观察文件的block具体的物理存放情况:

    在每一台datanode机器上的这个目录中能找到文件的切块:

    /usr/local/hadoop/tmp/dfs/data/current/BP-193442119-192.168.2.120-1432457733977/current/finalized


    HDFS应用开发(Java操作)

    1、搭建开发环境

    1.1引入依赖

      手动引入jar包,hdfs的jar包位于hadoop安装目录的share文件夹下。

      创建一个hdfsjar用户类库,然后将common和hdfs两个文件夹中的jar包全部添加进去。

    1.2Windows下开发的说明

      建议在Linux下进行hadoop应用的开发,这样不会存在兼容性问题。

      如果在Windows上做客户端应用开发,需要设置一下环境:

          A、windows的某个目录下解压一个hadoop的安装  d:hadoop-2.6.4

          B、将安装包下的libbin目录用对应windows版本平台编译的本地库替换

          C、window系统中配置HADOOP_HOME指向你解压的安装包

                HADOOP_HOME=d:hadoop-2.6.4

          D、windows系统的path变量中加入hadoopbin目录

                PATH=d:hadoop-2.6.4in

    2、获取API中的客户端对象

    java中操作hdfs,首先要获得一个客户端实例

    Configuration conf = new Configuration()

    FileSystem fs = FileSystem.get(conf)

    而我们的操作目标是HDFS,所以获取到的fs对象应该是DistributedFileSystem的实例;

    get方法是从何处判断具体实例化那种客户端类呢?

    ——从conf中的一个参数 fs.defaultFS的配置值判断;

    如果我们的代码中没有指定fs.defaultFS,并且工程classpath下也没有给定相应的配置,conf中的默认值就来自于hadoopjar包中的core-default.xml,默认值为: file:///,则获取的将不是一个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象

    3、DistributedFileSystem实例对象所具备的方法

    4、hdfs客户端操作数据代码

    4.1文件的增删改查

      1 package com.ahu.bigdata.javaclient;
      2 
      3 import java.net.URI;
      4 import org.apache.hadoop.conf.Configuration;
      5 import org.apache.hadoop.fs.BlockLocation;
      6 import org.apache.hadoop.fs.FileStatus;
      7 import org.apache.hadoop.fs.FileSystem;
      8 import org.apache.hadoop.fs.LocatedFileStatus;
      9 import org.apache.hadoop.fs.Path;
     10 import org.apache.hadoop.fs.RemoteIterator;
     11 
     12 /**
     13  * 使用hdfsJava客户端实现文件的增删改查
     14  * 
     15  * @author ahu_lichang
     16  * 
     17  */
     18 public class HdfsClientDemo {
     19     static FileSystem fs = null;
     20 
     21     public static void init() throws Exception {
     22         // 构造一个配置参数对象,设置一个参数:我们要访问的hdfs的URI
     23         // 从而FileSystem.get()方法就知道应该是去构造一个访问hdfs文件系统的客户端,以及hdfs的访问地址
     24         // new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml
     25         // 然后再加载classpath下的hdfs-site.xml
     26         Configuration configuration = new Configuration();
     27         // configuration.set("fs.defaultFS", "hdfs://hadoop1:9000");
     28         /*
     29          * 参数优先级:1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、服务器的默认设置
     30          */
     31         // configuration.set("dfs.replication", "3");
     32         // 获取一个hdfs的访问客户端,根据参数,这个实例应该是DistributedFileSystem的实例
     33         // fs=FileSystem.get(configuration);
     34         // 如果这样去获取,那configuration里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是root用户
     35         fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration,
     36                 "root");
     37     }
     38 
     39     /**
     40      * 向hdfs上传文件
     41      * 
     42      * @throws Exception
     43      */
     44     public static void testAddFileToHdfs() throws Exception {
     45         // 本地路径
     46         Path src = new Path("E:/access.log");
     47         // 目标路径
     48         Path dst = new Path("/access.log.copy");
     49         fs.copyFromLocalFile(src, dst);
     50         fs.close();
     51     }
     52 
     53     /**
     54      * 从hdfs中复制文件到本地文件系统
     55      * 
     56      * @throws Exception
     57      */
     58     public static void testDownloadFileToLocal() throws Exception {
     59         fs.copyToLocalFile(new Path("/access.log.copy"), new Path("E:/"));
     60         fs.close();
     61     }
     62 
     63     /**
     64      * 在hdfs中创建目录、删除文件夹、重命名文件或文件夹
     65      * 
     66      * @throws Exception
     67      */
     68     public static void testMkdirAndDeleteAndRename() throws Exception {
     69         // 创建目录
     70         fs.mkdirs(new Path("/a1/b1/c1"));
     71         // 删除文件夹,如果是非空文件夹,参数2必须给值true
     72         fs.delete(new Path("/access.log.copy"), true);
     73         // 重命名文件或文件夹
     74         fs.rename(new Path("/a1"), new Path("/a2"));
     75     }
     76 
     77     /**
     78      * 查看目录信息,只显示文件
     79      * 
     80      * @throws Exception
     81      */
     82     public static void testListFiles() throws Exception {
     83         RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(
     84                 new Path("/"), true);
     85         while (listFiles.hasNext()) {
     86             LocatedFileStatus fileStatus = listFiles.next();
     87             System.out.println(fileStatus.getPath().getName());// job
     88             System.out.println(fileStatus.getBlockSize());// 块大小(都一样大)
     89             System.out.println(fileStatus.getPermission());// 权限
     90             System.out.println(fileStatus.getLen());// 块大小
     91             BlockLocation[] blockLocations = fileStatus.getBlockLocations();
     92             for (BlockLocation blockLocation : blockLocations) {
     93                 System.out.println("block-length:" + blockLocation.getLength()
     94                         + "---" + "block-offset" + blockLocation.getOffset());// 块长度、块的起始偏移量
     95                 String[] hosts = blockLocation.getHosts();
     96                 for (String host : hosts) {// 块分布在哪些主机上
     97                     System.out.println(host);
     98                 }
     99             }
    100             System.out
    101                     .println("----------------------------------------------------------");
    102         }
    103     }
    104 
    105     /**
    106      * 查看文件及文件夹信息(跟hadoop fs -ls /查询的效果差不多)
    107      * 
    108      * @throws Exception
    109      */
    110     public static void testListAll() throws Exception {
    111         FileStatus[] listStatus = fs.listStatus(new Path("/"));
    112         String flag = "d--         ";
    113         for (FileStatus fileStatus : listStatus) {
    114             if (fileStatus.isFile())
    115                 flag = "f--         ";
    116             System.out.println(flag + fileStatus.getPath().getName());
    117         }
    118     }
    119 
    120     public static void main(String[] args) throws Exception {
    121         init();
    122         // testAddFileToHdfs();
    123         // testDownloadFileToLocal();
    124         // testMkdirAndDeleteAndRename();
    125         // testListFiles();
    126         testListAll();
    127     }
    128 }

    4.2通过流的方式访问hdfs和场景编程

      1 package com.ahu.bigdata.javaclient;
      2 
      3 import java.io.File;
      4 import java.io.FileOutputStream;
      5 import java.io.IOException;
      6 import java.net.URI;
      7 import org.apache.hadoop.conf.Configuration;
      8 import org.apache.hadoop.fs.BlockLocation;
      9 import org.apache.hadoop.fs.FSDataInputStream;
     10 import org.apache.hadoop.fs.FileStatus;
     11 import org.apache.hadoop.fs.FileSystem;
     12 import org.apache.hadoop.fs.Path;
     13 import org.apache.hadoop.io.IOUtils;
     14 
     15 /**
     16  * 通过流的方式访问hdfs
     17  * 
     18  * 相对于那些封装好的方法而言的,是更底层的一些操作方式
     19  * 
     20  * 上层的那些mapreduce、spark等运算框架,去hdfs中获取数据的时候,就是调用这种底层的api
     21  * 
     22  * @author ahu_lichang
     23  * 
     24  */
     25 public class StreamAccess {
     26     static FileSystem fs = null;
     27 
     28     public static void init() throws Exception {
     29         Configuration configuration = new Configuration();
     30         fs = FileSystem.get(new URI("hdfs://hadoop1:9000"), configuration,
     31                 "root");
     32     }
     33 
     34     /**
     35      * 将hdfs中的文件下载到本地
     36      * 
     37      * @throws Exception
     38      */
     39     public static void testDownloadFileToLocal() throws Exception {
     40         // 先获取一个文件的输入流---针对hdfs
     41         FSDataInputStream inputStream = fs.open(new Path(
     42                 "/wordcount/output/part-r-00000"));
     43         // 再构造一个文件的输出流---针对本地
     44         FileOutputStream outputStream = new FileOutputStream(new File(
     45                 "E:/part-r-00000"));
     46         // 将输入流中数据传输到输出流中
     47         IOUtils.copyBytes(inputStream, outputStream, 4096);// 4096缓冲区大小
     48     }
     49 
     50     /**
     51      * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度
     52      * 
     53      * 用于上层分布式运算框架并发处理数据
     54      * 
     55      * @throws Exception
     56      */
     57     public static void testRandomAccess() throws Exception {
     58         // 先获取一个文件的输入流---针对hdfs
     59         FSDataInputStream inputStream = fs.open(new Path(
     60                 "/wordcount/output/part-r-00000"));
     61         // 可以将输入流的起始偏移量进行自定义
     62         inputStream.seek(200);
     63         // 再构造一个文件的输出流---针对本地
     64         FileOutputStream outputStream = new FileOutputStream(new File(
     65                 "E:/part-r-custom"));
     66         IOUtils.copyBytes(inputStream, outputStream, 4096L, true);// true表示传输完毕关闭流
     67     }
     68 
     69     /**
     70      * 显示hdfs文件上的内容
     71      * 
     72      * @throws Exception
     73      */
     74     public static void testCat() throws Exception {
     75         FSDataInputStream inputStream = fs.open(new Path(
     76                 "/wordcount/output/part-r-00000"));
     77         IOUtils.copyBytes(inputStream, System.out, 1024);
     78     }
     79 
     80     /**
     81      * 场景编程:获取一个文件的所有block位置信息,然后读取指定block中的内容
     82      * 
     83      * @throws IOException
     84      * @throws IllegalArgumentException
     85      */
     86     public static void testBlockCat() throws Exception {
     87         FSDataInputStream inputStream = fs.open(new Path(
     88                 "/wordcount/output/part-r-00000"));
     89         // 拿到文件信息
     90         FileStatus[] listStatus = fs.listStatus(new Path(
     91                 "/wordcount/output/part-r-00000"));
     92         // 获取这个文件的所有block的信息
     93         BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(
     94                 listStatus[0], 0L, listStatus[0].getLen());
     95         // 第一个block的长度
     96         long length = fileBlockLocations[0].getLength();
     97         // 第一个block的起始偏移量
     98         long offset = fileBlockLocations[0].getOffset();
     99         System.out.println(length);
    100         System.out.println(offset);
    101         // 获取第一个block写入输出流
    102         byte[] b = new byte[4096];
    103         FileOutputStream fos = new FileOutputStream(new File("E:/block0"));
    104         while (inputStream.read(offset, b, 0, 4096) != -1) {
    105             fos.write(b);
    106             offset += 4096;
    107             if (offset >= length)
    108                 return;
    109         }
    110         fos.flush();
    111         fos.close();
    112         inputStream.close();
    113     }
    114 
    115     public static void main(String[] args) throws Exception {
    116         init();
    117         // testDownloadFileToLocal();
    118         // testRandomAccess();
    119         // testCat();
    120         testBlockCat();
    121     }
    122 
    123 }
  • 相关阅读:
    PHP面试:实现动态获取函数参数的方法
    PHP面试:什么是类的多态性,请写出一个例子
    php相关操作
    客户端app支付宝登录接口
    商品分类设计
    Git连接远程服务器
    iptables/mysql设置指定主机访问指定端口
    CMake安装grpc生成gRPCTargets.cmake文件
    Linux下Springboot解决`APR based Apache Tomcat Native library`提示
    java双重检测或枚举类实现线程安全单例(懒汉模式)
  • 原文地址:https://www.cnblogs.com/ahu-lichang/p/6771528.html
Copyright © 2011-2022 走看看