zoukankan      html  css  js  c++  java
  • HDFS(Hadoop Distribute File System)

    一、基本概念

         一句话概括:HDFS是hadoop分布式文件系统,作用是存储大数据文件,是hadoop领域最基础的部分。

    二、HDFS的重要特性

        一群屌丝机组成高富帅

      1、主从架构

        namenode作为master负责管理元数据,datanode作为从节点存储block块数据

        主从:通常是一主多从,主干活,从也干活,负责的分工不同

        主备:通常是一主一备,主要解决的是单节点故障问题,主干活,从为standby

      2、分块存储

        大文件的存储问题:写要能写的而下,读的时候还要读的块,将文件分成block块分布式存储,理论上多大的文件都可以存储,分块后再读取的时候可以并行读取,速度会非常的块。一般hadoop2.x版本的默认是128M

      3、名称空间

        名称空间是指namenode对外有统一的文件路径和命名,形如:hdfs://node01:9000/hadoop32/a.txt,要上传的文件名和要上传的位置,要下载的文件名和下载的文件路径,统一规范(简单理解就是我们通过50070能够看到的文件结构)。

      4、namenode元数据

        namenode负责元数据的管理,(元数据通常可以理解为修饰数据的数据),hdfs中元数据主要指目录结构和文件的块信息

        hdfs的存储规则是尽量不存储小文件,因为一条元数据理论 150byte,例如10W个1Kb的小文件存储的硬盘需要100M,而存储元数据的内存就需要15M。

      5、datanode数据存储

        datanode负责存储block块数据,负责真实数据的存储,启动后需要向namenode进行心跳,并且需要汇报本地块信息。

        datanode主要是存储,也就是需要磁盘,所以多台廉价的屌丝机就可以组成一个强大的HDFS存储集群

      6、副本机制

        副本就是备份,考虑的是容灾的问题,默认每个block有3个副本,他们为相互为对方的副本。

        机架感知(3副本机制)

        

      7、一次写入多次读取

      8、限额操作

    数量限额
    hdfs dfsadmin -setQuota 2 限定的文件的路径      # 给该文件夹下面设置最多上传两个文件,上传文件,发现只能上传一个文件
    清除权限 -clrQuota
    
    空间限额
    假如现在上传的一个文件的备份数是2 现在的空间不足256
    假如上传的是129M的文件  那么至少是需要剩余空间512才可以上传文件到hdfs
    
    
    查看hdfs文件xiane
    hdfs dfsadmin -setQuota 2 限定的文件的路径      # 给该文件夹下面设置最多上传两个文件,上传文件,发现只能上传一个文件

    三、hdfs常用shell命令介绍

    -ls
    使用方法:hadoop fs -ls [-h] [-R] <args>
    功能:显示文件、目录信息。
    示例:hadoop fs -ls /user/hadoop/file1
    
    -mkdir
    使用方法:hadoop fs -mkdir [-p] <paths>
    功能:在hdfs上创建目录,-p表示会创建路径中的各级父目录。
    示例:hadoop fs -mkdir –p /user/hadoop/dir1
    
    -put
    使用方法:hadoop fs -put [-f] [-p] [ -|<localsrc1> .. ]. <dst> 
             hadoop fs -put linux中文件的路径  hdfs上的路径
    功能:将单个src或多个srcs从本地文件系统复制到目标文件系统。
    -p:保留访问和修改时间,所有权和权限。
    -f:覆盖目的地(如果已经存在)
    示例:hadoop fs -put -f localfile1 localfile2 /user/hadoop/hadoopdir
    
    -get
    使用方法:hadoop fs -get [-ignorecrc] [-crc] [-p] [-f] <src> <localdst>
             hadoop fs -get hdfs中文件的路径  本地文件的路径
             hadoop fs -get /hadoop32/word.txt  /export
    -ignorecrc:跳过对下载文件的CRC检查。
    -crc:为下载的文件写CRC校验和。
    功能:将文件复制到本地文件系统(linux上)。
    示例:hadoop fs -get hdfs://host:port/user/hadoop/file localfile
    
    -appendToFile 
    使用方法:hadoop fs -appendToFile <localsrc> ... <dst>
    功能:追加一个文件到已经存在的文件末尾(将文件的内容加载到hdfs中已经存在的文件中)
    示例:hadoop fs -appendToFile localfile  /hadoop/hadoopfile
     
    -chown
    功能:改变文件的拥有者。使用-R将使改变在目录结构下递归进行。
    示例:hadoop  fs  -chown  someuser:somegrp   /hadoop/hadoopfile
    
    -cp              
    功能:从hdfs的一个路径拷贝hdfs的另一个路径
    示例: hadoop  fs  -cp  /aaa/jdk.tar.gz  /bbb/jdk.tar.gz.2
    
    -mv                     
    功能:在hdfs目录中移动文件
    示例: hadoop  fs  -mv  /aaa/jdk.tar.gz  /
    
    -rm                
    功能:删除指定的文件。只删除非空目录和文件。-r 递归删除。
    示例:hadoop fs -rm -r /aaa/bbb/
    
    -df               
    功能:统计文件系统的可用空间信息
    示例:hadoop  fs  -df  -h  /
    
    -du 
    功能:显示目录中所有文件大小,当只指定一个文件时,显示此文件的大小。
    示例:hadoop fs -du /user/hadoop/dir1
    

    四、HDFS的基本原理

    1、NameNode概述

    a、	NameNode是HDFS的核心。
    b、	NameNode也称为Master。
    c、	NameNode仅存储HDFS的元数据:文件系统中所有文件的目录树,并跟踪整个集群中的文件。
    d、	NameNode不存储实际数据或数据集。数据本身实际存储在DataNodes中。
    e、	NameNode知道HDFS中任何给定文件的块列表及其位置。使用此信息NameNode知道如何从块中构建文件。
    f、	NameNode并不持久化存储每个文件中各个块所在的DataNode的位置信息,这些信息会在系统启动时从数据节点重建。
    g、	NameNode对于HDFS至关重要,当NameNode关闭时,HDFS / Hadoop集群无法访问。
    h、	NameNode是Hadoop集群中的单点故障。
    i、	NameNode所在机器通常会配置有大量内存(RAM)。
    

    2、DataNode概述

    a、	DataNode负责将实际数据存储在HDFS中。
    b、	DataNode也称为Slave。
    c、	NameNode和DataNode会保持不断通信。
    d、	DataNode启动时,它将自己发布到NameNode并汇报自己负责持有的块列表。
    e、	当某个DataNode关闭时,它不会影响数据或群集的可用性。NameNode将安排由其他DataNode管理的块进行副本复制。
    f、	DataNode所在机器通常配置有大量的硬盘空间。因为实际数据存储在DataNode中。
    g、	DataNode会定期(dfs.heartbeat.interval配置项配置,默认是3秒)向NameNode发送心跳,如果NameNode长时间没有接受到DataNode发送的心跳, NameNode就会认为该DataNode失效。
    h、	block汇报时间间隔取参数dfs.blockreport.intervalMsec,参数未配置的话默认为6小时.
    

    3、HDFS的工作机制

      3.1 写过程

    1、datanode向namenode汇报自己的状态信息和块列表
    2、client找到namenode,向namenode申请上传文件
    3、namenode进行校验是否有权限上传,看是否有同名的文件 如果没加入覆盖则不允许上传
    4、如果通过,则允许上传文件
    5、假如现在的文件是129m 则会对文件进行切割,一个是128 一个是1
    6、进行上传 先上传第一个block块,这时会问namenode传送到哪个datanode块上去 ,
    7、namenode进行机架感知,选出一个datanode地址列表,并返回给客户端,比如node01、node03
    8、这是client就会知道连接哪个datanode。然后建立管道,node01和node03之间也需要建立管道
    9、会把block块再次拆分成64k的包然后上传,最后根据偏移量组装成一个新的文件
    10、备份成功后会返回一个ack确认码
    11、然后继续上传第二个block块

    3.2读过程

    1、申请下载,client向namenode询问 namenode中的namespace保存的是文件的名称和路径以及一些元数据信息
    2、这样就会知道你所要下载的文件到底在那个datanode上面 但是有时候备份很多的话,就会根据网络拓扑图翻回最近的一个block块并且是状态好的
    3、这样就会得到你需要的文件的地址列表,这样就会连接datanode,进行并行的读取,就会很快速的读取,集群做的够大,他的吞吐能力是非常的强的

    五、HDFS的应用操作

    1、HDFS的JAVAAPI操作

      1.1搭建开发环境

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.0-cdh5.14.0</version>
        </dependency>	
        <!--这里需要在添加一个junit 可以使用4.12版本-->
    </dependencies>

    1.2stream原始流对接操作--繁琐但灵活

    /**
     * @ClassName StreamAccess
     * @Description stream流对接的方式上传下载数据
     */
    public class StreamAccess {
    
        //创建客户端对象
        FileSystem fs = null;
    
        //创建fs的4种方式
        @Before
        public void createFS() throws IOException, URISyntaxException, InterruptedException {
            Configuration conf = new Configuration();
    
            //第一种方式 如果什么参数都不给,创建的是本地文件系统 file:///  hdfs  50070 9000
            // 9000是用来操作的 50070是用来看的
            //优先级  当前的配置  > 项目的classpath > 用户设置的hadoop/etc/hadoop/core-site.xml > 默认的core-default.xml
            conf.set("fs.defaultFS","hdfs://node01:9000");
            System.setProperty("HADOOP_USER_NAME","root");
            fs = FileSystem.get(conf);
    
            //第二种方式(用的最多)
            fs = FileSystem.get(new URI("hdfs://node01:9000"),conf,"root");
    
            //第三种方式
            conf.set("fs.defaultFS","hdfs://node01:9000");
            fs = FileSystem.newInstance(conf);
    
            //第四种方式
            fs = FileSystem.newInstance(new URI("hdfs://node01:9000"),conf);
        }
    
        //上传
        @Test
        public void streamToHdfs() throws Exception {
            //得到本地的输入流
            FileInputStream inputStream = new FileInputStream(new File("d:/pom.xml"));
            //得到hdfs的输出流 要写文件
            FSDataOutputStream outputStream = fs.create(new Path("/hadoop32/1.txt"));
            //流对接
            IOUtils.copy(inputStream,outputStream);
        }
    
        //下载
        @Test
        public void streamDownLoad() throws Exception {
            //hdfs得到输入流
            FSDataInputStream inputStream = fs.open(new Path("/hadoop32/1.txt"));
            //得到本地文件的输出流
            inputStream.seek(346); // 从哪个位置开始下载
            FileOutputStream outputStream = new FileOutputStream(new File("d:/3.txt"));
    
            IOUtils.copy(inputStream,outputStream);
        }
    
        //获取hdfs上的所有文件及路径  hadoop fs -ls -R /
        @Test
        public void findAllFiles() throws Exception {
            RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path("/"), true);// true为是否进行递归查询
            while (iterator.hasNext()) {
                //fileStatus是对文件或文件夹的所属数据信息
                LocatedFileStatus fileStatus = iterator.next();
                System.out.println(fileStatus.getPath());
            }
        }
    
        //遍历hdfs上所有的文件和文件夹
        @Test
        public void findAll() throws Exception { 
            findFiles("/");
        }
    
        private void findFiles(String path) throws IOException {
            FileStatus[] fileStatuses = fs.listStatus(new Path(path));
            for (FileStatus fileStatus : fileStatuses) {
                //每一个文件或文件夹的所有数据
                if(fs.isDirectory(fileStatus.getPath())){
                    //得到的就是文件夹
                    System.out.println("d----"+fileStatus.getPath());
                    findFiles(fileStatus.getPath().toString());
                }else{
                    //得到的就是文件
                    System.out.println("f----"+fileStatus.getPath());
                }
            }
        }
    
        //获取指定的block块
        @Test
        public void getBlock0() throws Exception {
            FSDataInputStream in = fs.open(new Path("/hadoop32/y.zip"));
            //如果path指定的是一个文件,那么数组返回的大小就是1
            FileStatus[] fileStatuses = fs.listStatus(new Path("/hadoop32/y.zip"));
            FileStatus fileStatus = fileStatuses[0];
            //返回当前文件所有block块的地址列表
            BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
           /* for (BlockLocation fileBlockLocation : fileBlockLocations) {
                //得到的是每个块的信息
                System.out.println( fileBlockLocation.getLength()+" offset :"+  fileBlockLocation.getOffset());
                String[] hosts = fileBlockLocation.getHosts();
                //当前块host
                for (String host : hosts) {
                    System.out.println("host"+host);
                }
            }*/
            BlockLocation fileBlockLocation = fileBlockLocations[1];
            //第二个block块的偏移量
            long offset = fileBlockLocations[1].getOffset();
            //注意长度的问题:第一个块的长度是128M 第二个块的长度是111M 所以就是从128M向后读了111M的数据,也就是从128M读取到239M的位置,这个长度是指239这个数值
            long length = offset+fileBlockLocations[1].getLength();
    
            byte[] b = new byte[4096];
    
            FileOutputStream os = new FileOutputStream(new File("d:/block1"));
            while(in.read(offset, b, 0, 4096)!=-1){
                os.write(b);
                offset += 4096;
                if(offset>length) {
                    return;
                }
            };
        }
    
        @After
        public void close() throws Exception {
            //通常如果是get方式的话不建议关闭,如果是newInstance的话 建议关闭
            fs.close();
        }
    }

    1.3客户端操作--简单易操作

    public class HDFSClient {
    
        FileSystem fs = null;
    
        //初始化hdfs文件系统
        @Before
        public void createFS() throws Exception{
            Configuration conf = new Configuration();
            fs = FileSystem.get(new URI("hdfs://node01:9000"),conf,"root");
            //获取到所有datanode节点信息
            DatanodeInfo[] dataNodeStats = ((DistributedFileSystem) fs).getDataNodeStats();
            for (DatanodeInfo dataNodeStat : dataNodeStats) {
                System.out.println(dataNodeStat.getName());
            }
        }
    
        //文件的上传
        @Test
        public void copyFromLocalFile() throws Exception{
            //文件上传  参数1:是否会删除本地原文件 参数2:是否会覆盖掉hdfs上目标的文件 参数3:本地文件路径  参数4:目标hdfs文件路径
            fs.copyFromLocalFile(true,true,new Path("d:/Maven_Repository.zip"),new Path("/hadoop32"));
        }
    
        //文件的下载
        @Test
        public void copyToLocalFile() throws Exception {
            fs.copyToLocalFile(true,new Path("/hadoop32/Maven_Repository.zip"),new Path("d:/y.zip"));
        }
    
        @Test
        public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
            // 创建目录
            fs.mkdirs(new Path("/a1/b1/c1"));
            // 删除文件夹 ,如果是非空文件夹,参数2必须给值true
            fs.delete(new Path("/aaa"), true);
            // 重命名文件或文件夹
            fs.rename(new Path("/a1"), new Path("/a2"));
        }
    }
  • 相关阅读:
    Linux下安装Flume
    [译]MIT6.824_1.1分布式系统介绍——驱动力与挑战
    MySQL-Canal-Kafka数据复制详解
    Linux下搭建Kafka集群
    我在创业公司的云原生之旅
    使用go向es进行数据操作脚本
    kubectl exec 向pod的文件中增加内容
    kubernetes资源导出小脚本
    面试题(四) -- 第一次当面试官
    gitlab备份检查小脚本
  • 原文地址:https://www.cnblogs.com/haojia/p/12386206.html
Copyright © 2011-2022 走看看