zoukankan      html  css  js  c++  java
  • HDFS的认识和理解6

    HDFS:Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统。它和现有的分布式文件系统有很多共同点。但同时,它和其他的分布式文件系统的区别也是很明显的。HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。HDFS放宽了一部分POSIX约束,来实现流式读取文件系统数据的目的。HDFS在最开始是作为Apache Nutch搜索引擎项目的基础架构而开发的。HDFS是Apache Hadoop Core项目的一部分。

    HDFS是一个的主从结构,一个HDFS集群是由一个名字节点,它是一个管理文件命名空间和调节客户端访问文件的主服务器,当然还有一些数据节点,通常是一个节点一个机器,它来管理对应节点的存储。HDFS对外开放文件命名空间并允许用户数据以文件形式存储。 内部机制是将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如打开,关闭,重命名等等。它同时确定块与数据节点的映射。数据节点来负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建,删除,和来自名字节点的块复制指令。

    1、获取文件系统 2、通过文件系统打开文件 3、将文件内容输出

    public static void read(Path path) throws IOException{
        FileSystem hdfs = HdfsUtils.getFilesystem();  //步骤 1
        FSDataInputStream fsDataInputStream =  hdfs.open(path); //步骤 2
        IOUtils.copyBytes(fsDataInputStream, System.out, 4096,false);  //步骤 3
    }

    获取文件系统对象

    要从HDFS上读取文件,必须先得到一个FileSystem。HDFS本身就是一个文件系统,所以,我们得到一个文件系统后就可以对HDFS进行相关操作。获取文件系统的步骤可以分为以下2步。 1、读取配置文件。 2、获取文件系统。 读取配置文件:Configuration类有三个构造器,无参数的构造器表示直接加载默认资源,也可以指定一个boolean参数来关闭加载默认值,或直接使用另外一个Configuration对象来初始化。

    package com.yq.common;
     
    import java.net.URI;
     
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
     
    public class HdfsUtils {
        public static FileSystem getFilesystem(){
            FileSystem hdfs=null;
            Configuration conf=new Configuration(); 
            try{
                URI uri = new URI("hdfs://localhost:9000");
                hdfs = FileSystem.get(uri,conf);
            }
            catch(Exception ex){
            //
            }
        return hdfs;
        }
    }
    
    

    打开文件

    FSDataInputStream fsDataInputStream =  hdfs.open(path);
    

    打开文件其实就是创建一个文件输入流,跟踪文件系统的open方法,可以找到源码

      public FSDataInputStream open(Path f) throws IOException {
        return open(f, getConf().getInt("io.file.buffer.size", 4096));
      }
    
    

    再跟踪open方法,找到以下抽象方法。

      public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException;
          //这个方法在DistributedFileSystem类有实现,如下
      @Override
      public FSDataInputStream open(Path f, final int bufferSize) throws IOException {
        statistics.incrementReadOps(1);
        Path absF = fixRelativePart(f);
        return new FileSystemLinkResolver<FSDataInputStream>() {
          @Override
          public FSDataInputStream doCall(final Path p)
              throws IOException, UnresolvedLinkException {
            return new HdfsDataInputStream(
                dfs.open(getPathName(p), bufferSize, verifyChecksum));
          }
          @Override
          public FSDataInputStream next(final FileSystem fs, final Path p)
              throws IOException {
            return fs.open(p, bufferSize);
          }
        }.resolve(this, absF);
      }
    
    

    在返回结果的时候,创建了一个FileSystemLinkResolver对象,并实现了此类的两个抽象方法。doCall方法和next方法都在resolve方法里用到了,而next方法只是在resolve方法异常捕获时才调用。 跟踪doCall方法,doCall方法里的open()方法有3个参数,src表示要打开的文件路径,buffersize表示缓冲大小,verifyChecksum表示是否校验和,的源代码如下。

      public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
          throws IOException, UnresolvedLinkException {
        checkOpen();
        //    Get block info from namenode
        return new DFSInputStream(this, src, buffersize, verifyChecksum);
      }
    
    

    checkOpen方法表示检查文件系统是否已经打开,如果没有打开,则抛出异常(FileSystemclosed)。 然后返回一个分布式文件系统输入流(DFSInputStream),此处调用的构造方法源代码如下。

      DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
                     ) throws IOException, UnresolvedLinkException {
        this.dfsClient = dfsClient;
        this.verifyChecksum = verifyChecksum;
        this.buffersize = buffersize;
        this.src = src;
        this.cachingStrategy =
            dfsClient.getDefaultReadCachingStrategy();
        openInfo();
      }
    
    

    这个方法先是做了一些准备工作,然后调用openInfo()方法,openInfo()方法是一个线程安全的方法,作用是从namenode获取已打开的文件信息。其源代码如下。

     /**
       * Grab the open-file info from namenode
       */
      synchronized void openInfo() throws IOException, UnresolvedLinkException {
        lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
        int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
        while (retriesForLastBlockLength > 0) {
          // Getting last block length as -1 is a special case. When cluster
          // restarts, DNs may not report immediately. At this time partial block
          // locations will not be available with NN for getting the length. Lets
          // retry for 3 times to get the length.
          if (lastBlockBeingWrittenLength == -1) {
            DFSClient.LOG.warn("Last block locations not available. "
                + "Datanodes might not have reported blocks completely."
                + " Will retry for " + retriesForLastBlockLength + " times");
            waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
            lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
          } else {
            break;
          }
          retriesForLastBlockLength--;
        }
        if (retriesForLastBlockLength == 0) {
          throw new IOException("Could not obtain the last block locations.");
        }
      }
    
    

    此方法有调用fetchLocatedBlocksAndGetLastBlockLength()方法获取块的位置信息。

      private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
        final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
        if (DFSClient.LOG.isDebugEnabled()) {
          DFSClient.LOG.debug("newInfo = " + newInfo);
        }
        if (newInfo == null) {
          throw new IOException("Cannot open filename " + src);
        }
     
        if (locatedBlocks != null) {
          Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
          Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
          while (oldIter.hasNext() && newIter.hasNext()) {
            if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
              throw new IOException("Blocklist for " + src + " has changed!");
            }
          }
        }
        locatedBlocks = newInfo;
        long lastBlockBeingWrittenLength = 0;
        if (!locatedBlocks.isLastBlockComplete()) {
          final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
          if (last != null) {
            if (last.getLocations().length == 0) {
              if (last.getBlockSize() == 0) {
                // if the length is zero, then no data has been written to
                // datanode. So no need to wait for the locations.
                return 0;
              }
              return -1;
            }
            final long len = readBlockLength(last);
            last.getBlock().setNumBytes(len);
            lastBlockBeingWrittenLength = len; 
          }
        }
     
        currentNode = null;
        return lastBlockBeingWrittenLength;
      }
    
    

    getLocatedBlocks方法可以获取块的位置信息。LocatedBlocks类是许多块的位置信息的集合。因为从此类的源码可以发现有这个一个私有属性:

     private final List<LocatedBlock> blocks; // array of blocks with prioritized locations
    

    通过文件名,FSDataInputStream类可以获取相文件内容,也可以充当namenode与datanode桥梁。

    将文件内容在标准输出显示

    因为之前已经获得了一个FSDataInputStream,所以,我们可以调用方法copyBytes将FSDataInputStream拷贝到标准输出流System.out显示。

     public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
        throws IOException {
        try {
          copyBytes(in, out, buffSize);
          if(close) {
            out.close();
            out = null;
            in.close();
            in = null;
          }
        } finally {
          if(close) {
            closeStream(out);
            closeStream(in);
          }
        }
      }
    
    

    此方法里又调用了另外一个copyBytes方法,作用同样是从一个流拷贝到另外一个流。

    public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
        throws IOException {
        PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
        byte buf[] = new byte[buffSize];
        int bytesRead = in.read(buf);
        while (bytesRead >= 0) {
          out.write(buf, 0, bytesRead);
          if ((ps != null) && ps.checkError()) {
            throw new IOException("Unable to write to output stream.");
          }
          bytesRead = in.read(buf);
        }
      }
    
    

    先从输入流中读取buffSize大小的数据到缓冲里面,然后将缓冲里的数据写入到输出流out里。一直循环,直到从输入流中读到缓冲里的字节长度为0,表示输入流里的数据已经读取完毕

  • 相关阅读:
    随便写写
    mysql 快速插入100完毕 40秒
    存储过程 插入表数据 循环
    打开地图拖动位置获取经纬度 给父窗口传值
    Go源码共读计划
    源码读起来,Go源码共读计划
    清除centos所有命令记录
    删除django后台最近一个动作提示。
    自动延期pycharm插件,非常好用.
    pycharm中使用solidity插件 ,编写solidity以及在pycharm内进行编译。
  • 原文地址:https://www.cnblogs.com/shan13936/p/13747379.html
Copyright © 2011-2022 走看看