zoukankan      html  css  js  c++  java
  • HDFS读写过程解析

    一、文件的打开

    1.1、客户端

    HDFS打开一个文件,需要在客户端调用DistributedFileSystem.open(Path f, int bufferSize),其实现为:

    public FSDataInputStream open(Path f, int bufferSize) throws IOException {

      return new DFSClient.DFSDataInputStream(

            dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));

    }

    其中dfs为DistributedFileSystem的成员变量DFSClient,其open函数被调用,其中创建一个DFSInputStream(src, buffersize, verifyChecksum)并返回。

    在DFSInputStream的构造函数中,openInfo函数被调用,其主要从namenode中得到要打开的文件所对应的blocks的信息,实现如下:

    synchronized void openInfo() throws IOException {

      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);

      this.locatedBlocks = newInfo;

      this.currentNode = null;

    }

    private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,

        String src, long start, long length) throws IOException {

        return namenode.getBlockLocations(src, start, length);

    }

    LocatedBlocks主要包含一个链表的List<LocatedBlock> blocks,其中每个LocatedBlock包含如下信息:

    • Block b:此block的信息
    • long offset:此block在文件中的偏移量
    • DatanodeInfo[] locs:此block位于哪些DataNode上

    上面namenode.getBlockLocations是一个RPC调用,最终调用NameNode类的getBlockLocations函数。

    1.2、NameNode

    NameNode.getBlockLocations实现如下:

    public LocatedBlocks   getBlockLocations(String src,

                                            long offset,

                                            long length) throws IOException {

      return namesystem.getBlockLocations(getClientMachine(),

                                          src, offset, length);

    }

    namesystem是NameNode一个成员变量,其类型为FSNamesystem,保存的是NameNode的name space树,其中一个重要的成员变量为FSDirectory dir。

    FSDirectory和Lucene中的FSDirectory没有任何关系,其主要包括FSImage fsImage,用于读写硬盘上的fsimage文件,FSImage类有成员变量FSEditLog editLog,用于读写硬盘上的edit文件,这两个文件的关系在上一篇文章中已经解释过。

    FSDirectory还有一个重要的成员变量INodeDirectoryWithQuota rootDir,INodeDirectoryWithQuota的父类为INodeDirectory,实现如下:

    public class INodeDirectory extends INode {

      ……

      private List<INode> children;

      ……

    由此可见INodeDirectory本身是一个INode,其中包含一个链表的INode,此链表中,如果仍为文件夹,则是类型INodeDirectory,如果是文件,则是类型INodeFile,INodeFile中有成员变量BlockInfo blocks[],是此文件包含的block的信息。显然这是一棵树形的结构。

    FSNamesystem.getBlockLocations函数如下:

    public LocatedBlocks getBlockLocations(String src, long offset, long length,

        boolean doAccessTime) throws IOException {

      final LocatedBlocks ret = getBlockLocationsInternal(src, dir.getFileINode(src),

          offset, length, Integer.MAX_VALUE, doAccessTime); 

      return ret;

    }

    dir.getFileINode(src)通过路径名从文件系统树中找到INodeFile,其中保存的是要打开的文件的INode的信息。

    getBlockLocationsInternal的实现如下:

    private synchronized LocatedBlocks getBlockLocationsInternal(String src,

                                                         INodeFile inode,

                                                         long offset,

                                                         long length,

                                                         int nrBlocksToReturn,

                                                         boolean doAccessTime)

                                                         throws IOException {

      //得到此文件的block信息

      Block[] blocks = inode.getBlocks();

      List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);

      //计算从offset开始,长度为length所涉及的blocks

      int curBlk = 0;

      long curPos = 0, blkSize = 0;

      int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;

      for (curBlk = 0; curBlk < nrBlocks; curBlk++) {

        blkSize = blocks[curBlk].getNumBytes();

        if (curPos + blkSize > offset) {

          //当offset在curPos和curPos + blkSize之间的时候,curBlk指向offset所在的block

          break;

        }

        curPos += blkSize;

      }

      long endOff = offset + length;

      //循环,依次遍历从curBlk开始的每个block,直到当前位置curPos越过endOff

      do {

        int numNodes = blocksMap.numNodes(blocks[curBlk]);

        int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();

        int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]);

        boolean blockCorrupt = (numCorruptNodes == numNodes);

        int numMachineSet = blockCorrupt ? numNodes :

                              (numNodes - numCorruptNodes);

        //依次找到此block所对应的datanode,将其中没有损坏的放入machineSet中

        DatanodeDescriptor[] machineSet = new DatanodeDescriptor[numMachineSet];

        if (numMachineSet > 0) {

          numNodes = 0;

          for(Iterator<DatanodeDescriptor> it =

              blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {

            DatanodeDescriptor dn = it.next();

            boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);

            if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))

              machineSet[numNodes++] = dn;

          }

        }

        //使用此machineSet和当前的block构造一个LocatedBlock

        results.add(new LocatedBlock(blocks[curBlk], machineSet, curPos,

                    blockCorrupt));

        curPos += blocks[curBlk].getNumBytes();

        curBlk++;

      } while (curPos < endOff

            && curBlk < blocks.length

            && results.size() < nrBlocksToReturn);

      //使用此LocatedBlock链表构造一个LocatedBlocks对象返回

      return inode.createLocatedBlocks(results);

    }

    1.3、客户端

    通过RPC调用,在NameNode得到的LocatedBlocks对象,作为成员变量构造DFSInputStream对象,最后包装为FSDataInputStream返回给用户。

    二、文件的读取

    2.1、客户端

    文件读取的时候,客户端利用文件打开的时候得到的FSDataInputStream.read(long position, byte[] buffer, int offset, int length)函数进行文件读操作。

    FSDataInputStream会调用其封装的DFSInputStream的read(long position, byte[] buffer, int offset, int length)函数,实现如下:

    public int read(long position, byte[] buffer, int offset, int length)

      throws IOException {

      long filelen = getFileLength();

      int realLen = length;

      if ((position + length) > filelen) {

        realLen = (int)(filelen - position);

      }

      //首先得到包含从offset到offset + length内容的block列表

      //比如对于64M一个block的文件系统来说,欲读取从100M开始,长度为128M的数据,则block列表包括第2,3,4块block

      List<LocatedBlock> blockRange = getBlockRange(position, realLen);

      int remaining = realLen;

      //对每一个block,从中读取内容

      //对于上面的例子,对于第2块block,读取从36M开始,读取长度28M,对于第3块,读取整一块64M,对于第4块,读取从0开始,长度为36M,共128M数据

      for (LocatedBlock blk : blockRange) {

        long targetStart = position - blk.getStartOffset();

        long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);

        fetchBlockByteRange(blk, targetStart,

                            targetStart + bytesToRead - 1, buffer, offset);

        remaining -= bytesToRead;

        position += bytesToRead;

        offset += bytesToRead;

      }

      assert remaining == 0 : "Wrong number of bytes read.";

      if (stats != null) {

        stats.incrementBytesRead(realLen);

      }

      return realLen;

    }

    其中getBlockRange函数如下:

    private synchronized List<LocatedBlock> getBlockRange(long offset,

                                                          long length)

                                                        throws IOException {

      List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();

      //首先从缓存的locatedBlocks中查找offset所在的block在缓存链表中的位置

      int blockIdx = locatedBlocks.findBlock(offset);

      if (blockIdx < 0) { // block is not cached

        blockIdx = LocatedBlocks.getInsertIndex(blockIdx);

      }

      long remaining = length;

      long curOff = offset;

      while(remaining > 0) {

        LocatedBlock blk = null;

        //按照blockIdx的位置找到block

        if(blockIdx < locatedBlocks.locatedBlockCount())

          blk = locatedBlocks.get(blockIdx);

        //如果block为空,则缓存中没有此block,则直接从NameNode中查找这些block,并加入缓存

        if (blk == null || curOff < blk.getStartOffset()) {

          LocatedBlocks newBlocks;

          newBlocks = callGetBlockLocations(namenode, src, curOff, remaining);

          locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());

          continue;

        }

        //如果block找到,则放入结果集

        blockRange.add(blk);

        long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;

        remaining -= bytesRead;

        curOff += bytesRead;

        //取下一个block

        blockIdx++;

      }

      return blockRange;

    }

    其中fetchBlockByteRange实现如下:

    private void fetchBlockByteRange(LocatedBlock block, long start,

                                     long end, byte[] buf, int offset) throws IOException {

      Socket dn = null;

      int numAttempts = block.getLocations().length;

      //此while循环为读取失败后的重试次数

      while (dn == null && numAttempts-- > 0 ) {

        //选择一个DataNode来读取数据

        DNAddrPair retval = chooseDataNode(block);

        DatanodeInfo chosenNode = retval.info;

        InetSocketAddress targetAddr = retval.addr;

        BlockReader reader = null;

        try {

          //创建Socket连接到DataNode

          dn = socketFactory.createSocket();

          dn.connect(targetAddr, socketTimeout);

          dn.setSoTimeout(socketTimeout);

          int len = (int) (end - start + 1);

          //利用建立的Socket链接,生成一个reader负责从DataNode读取数据

          reader = BlockReader.newBlockReader(dn, src,

                                              block.getBlock().getBlockId(),

                                              block.getBlock().getGenerationStamp(),

                                              start, len, buffersize,

                                              verifyChecksum, clientName);

          //读取数据

          int nread = reader.readAll(buf, offset, len);

          return;

        } finally {

          IOUtils.closeStream(reader);

          IOUtils.closeSocket(dn);

          dn = null;

        }

        //如果读取失败,则将此DataNode标记为失败节点

        addToDeadNodes(chosenNode);

      }

    }

    BlockReader.newBlockReader函数实现如下:

    public static BlockReader newBlockReader( Socket sock, String file,

                                       long blockId,

                                       long genStamp,

                                       long startOffset, long len,

                                       int bufferSize, boolean verifyChecksum,

                                       String clientName)

                                       throws IOException {

      //使用Socket建立写入流,向DataNode发送读指令

      DataOutputStream out = new DataOutputStream(

        new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));

      out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );

      out.write( DataTransferProtocol.OP_READ_BLOCK );

      out.writeLong( blockId );

      out.writeLong( genStamp );

      out.writeLong( startOffset );

      out.writeLong( len );

      Text.writeString(out, clientName);

      out.flush();

      //使用Socket建立读入流,用于从DataNode读取数据

      DataInputStream in = new DataInputStream(

          new BufferedInputStream(NetUtils.getInputStream(sock),

                                  bufferSize));

      DataChecksum checksum = DataChecksum.newDataChecksum( in );

      long firstChunkOffset = in.readLong();

      //生成一个reader,主要包含读入流,用于读取数据

      return new BlockReader( file, blockId, in, checksum, verifyChecksum,

                              startOffset, firstChunkOffset, sock );

    }

    BlockReader的readAll函数就是用上面生成的DataInputStream读取数据。

    2.2、DataNode

    在DataNode启动的时候,会调用函数startDataNode,其中与数据读取有关的逻辑如下:

    void startDataNode(Configuration conf,

                       AbstractList<File> dataDirs

                       ) throws IOException {

      ……

      // 建立一个ServerSocket,并生成一个DataXceiverServer来监控客户端的链接

      ServerSocket ss = (socketWriteTimeout > 0) ?

            ServerSocketChannel.open().socket() : new ServerSocket();

      Server.bind(ss, socAddr, 0);

      ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);

      // adjust machine name with the actual port

      tmpPort = ss.getLocalPort();

      selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),

                                       tmpPort);

      this.dnRegistration.setName(machineName + ":" + tmpPort);

      this.threadGroup = new ThreadGroup("dataXceiverServer");

      this.dataXceiverServer = new Daemon(threadGroup,

          new DataXceiverServer(ss, conf, this));

      this.threadGroup.setDaemon(true); // auto destroy when empty

      ……

    }

    DataXceiverServer.run()函数如下:

    public void run() {

      while (datanode.shouldRun) {

          //接受客户端的链接

          Socket s = ss.accept();

          s.setTcpNoDelay(true);

          //生成一个线程DataXceiver来对建立的链接提供服务

          new Daemon(datanode.threadGroup,

              new DataXceiver(s, datanode, this)).start();

      }

      try {

        ss.close();

      } catch (IOException ie) {

        LOG.warn(datanode.dnRegistration + ":DataXceiveServer: "

                                + StringUtils.stringifyException(ie));

      }

    }

    DataXceiver.run()函数如下:

    public void run() {

      DataInputStream in=null;

      try {

        //建立一个输入流,读取客户端发送的指令

        in = new DataInputStream(

            new BufferedInputStream(NetUtils.getInputStream(s),

                                    SMALL_BUFFER_SIZE));

        short version = in.readShort();

        boolean local = s.getInetAddress().equals(s.getLocalAddress());

        byte op = in.readByte();

        // Make sure the xciver count is not exceeded

        int curXceiverCount = datanode.getXceiverCount();

        long startTime = DataNode.now();

        switch ( op ) {

        //读取

        case DataTransferProtocol.OP_READ_BLOCK:

          //真正的读取数据

          readBlock( in );

          datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);

          if (local)

            datanode.myMetrics.readsFromLocalClient.inc();

          else

            datanode.myMetrics.readsFromRemoteClient.inc();

          break;

        //写入

        case DataTransferProtocol.OP_WRITE_BLOCK:

          //真正的写入数据

          writeBlock( in );

          datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);

          if (local)

            datanode.myMetrics.writesFromLocalClient.inc();

          else

            datanode.myMetrics.writesFromRemoteClient.inc();

          break;

        //其他的指令

        ……

        }

      } catch (Throwable t) {

        LOG.error(datanode.dnRegistration + ":DataXceiver",t);

      } finally {

        IOUtils.closeStream(in);

        IOUtils.closeSocket(s);

        dataXceiverServer.childSockets.remove(s);

      }

    }

    private void readBlock(DataInputStream in) throws IOException {

      //读取指令

      long blockId = in.readLong();         

      Block block = new Block( blockId, 0 , in.readLong());

      long startOffset = in.readLong();

      long length = in.readLong();

      String clientName = Text.readString(in);

      //创建一个写入流,用于向客户端写数据

      OutputStream baseStream = NetUtils.getOutputStream(s,

          datanode.socketWriteTimeout);

      DataOutputStream out = new DataOutputStream(

                   new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));

      //生成BlockSender用于读取本地的block的数据,并发送给客户端

      //BlockSender有一个成员变量InputStream blockIn用于读取本地block的数据

      BlockSender blockSender = new BlockSender(block, startOffset, length,

              true, true, false, datanode, clientTraceFmt);

       out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status

       //向客户端写入数据

       long read = blockSender.sendBlock(out, baseStream, null);

       ……

      } finally {

        IOUtils.closeStream(out);

        IOUtils.closeStream(blockSender);

      }

    }

    三、文件的写入

    下面解析向hdfs上传一个文件的过程。

    3.1、客户端

    上传一个文件到hdfs,一般会调用DistributedFileSystem.create,其实现如下:

      public FSDataOutputStream create(Path f, FsPermission permission,

        boolean overwrite,

        int bufferSize, short replication, long blockSize,

        Progressable progress) throws IOException {

        return new FSDataOutputStream

           (dfs.create(getPathName(f), permission,

                       overwrite, replication, blockSize, progress, bufferSize),

            statistics);

      }

    其最终生成一个FSDataOutputStream用于向新生成的文件中写入数据。其成员变量dfs的类型为DFSClient,DFSClient的create函数如下:

      public OutputStream create(String src,

                                 FsPermission permission,

                                 boolean overwrite,

                                 short replication,

                                 long blockSize,

                                 Progressable progress,

                                 int buffersize

                                 ) throws IOException {

        checkOpen();

        if (permission == null) {

          permission = FsPermission.getDefault();

        }

        FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));

        OutputStream result = new DFSOutputStream(src, masked,

            overwrite, replication, blockSize, progress, buffersize,

            conf.getInt("io.bytes.per.checksum", 512));

        leasechecker.put(src, result);

        return result;

      }

    其中构造了一个DFSOutputStream,在其构造函数中,同过RPC调用NameNode的create来创建一个文件。
    当然,构造函数中还做了一件重要的事情,就是streamer.start(),也即启动了一个pipeline,用于写数据,在写入数据的过程中,我们会仔细分析。

      DFSOutputStream(String src, FsPermission masked, boolean overwrite,

          short replication, long blockSize, Progressable progress,

          int buffersize, int bytesPerChecksum) throws IOException {

        this(src, blockSize, progress, bytesPerChecksum);

        computePacketChunkSize(writePacketSize, bytesPerChecksum);

        try {

          namenode.create(

              src, masked, clientName, overwrite, replication, blockSize);

        } catch(RemoteException re) {

          throw re.unwrapRemoteException(AccessControlException.class,

                                         QuotaExceededException.class);

        }

        streamer.start();

      }

    3.2、NameNode

    NameNode的create函数调用namesystem.startFile函数,其又调用startFileInternal函数,实现如下:

      private synchronized void startFileInternal(String src,

                                                  PermissionStatus permissions,

                                                  String holder,

                                                  String clientMachine,

                                                  boolean overwrite,

                                                  boolean append,

                                                  short replication,

                                                  long blockSize

                                                  ) throws IOException {

        ......

       //创建一个新的文件,状态为under construction,没有任何data block与之对应

       long genstamp = nextGenerationStamp();

       INodeFileUnderConstruction newNode = dir.addFile(src, permissions,

          replication, blockSize, holder, clientMachine, clientNode, genstamp);

       ......

      }

    3.3、客户端

    下面轮到客户端向新创建的文件中写入数据了,一般会使用FSDataOutputStream的write函数,最终会调用DFSOutputStream的writeChunk函数:

    按照hdfs的设计,对block的数据写入使用的是pipeline的方式,也即将数据分成一个个的package,如果需要复制三分,分别写入DataNode 1, 2, 3,则会进行如下的过程:

    • 首先将package 1写入DataNode 1
    • 然后由DataNode 1负责将package 1写入DataNode 2,同时客户端可以将pacage 2写入DataNode 1
    • 然后DataNode 2负责将package 1写入DataNode 3, 同时客户端可以讲package 3写入DataNode 1,DataNode 1将package 2写入DataNode 2
    • 就这样将一个个package排着队的传递下去,直到所有的数据全部写入并复制完毕

      protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum)

                                                            throws IOException {

          //创建一个package,并写入数据

          currentPacket = new Packet(packetSize, chunksPerPacket,

                                       bytesCurBlock);

          currentPacket.writeChecksum(checksum, 0, cklen);

          currentPacket.writeData(b, offset, len);

          currentPacket.numChunks++;

          bytesCurBlock += len;

          //如果此package已满,则放入队列中准备发送

          if (currentPacket.numChunks == currentPacket.maxChunks ||

              bytesCurBlock == blockSize) {

              ......

              dataQueue.addLast(currentPacket);

              //唤醒等待dataqueue的传输线程,也即DataStreamer

              dataQueue.notifyAll();

              currentPacket = null;

              ......

          }

      }


    DataStreamer的run函数如下:

      public void run() {

        while (!closed && clientRunning) {

          Packet one = null;

          synchronized (dataQueue) {

            //如果队列中没有package,则等待

            while ((!closed && !hasError && clientRunning

                   && dataQueue.size() == 0) || doSleep) {

              try {

                dataQueue.wait(1000);

              } catch (InterruptedException  e) {

              }

              doSleep = false;

            }

            try {

              //得到队列中的第一个package

              one = dataQueue.getFirst();

              long offsetInBlock = one.offsetInBlock;

              //由NameNode分配block,并生成一个写入流指向此block

              if (blockStream == null) {

                nodes = nextBlockOutputStream(src);

                response = new ResponseProcessor(nodes);

                response.start();

              }

              ByteBuffer buf = one.getBuffer();

              //将package从dataQueue移至ackQueue,等待确认

              dataQueue.removeFirst();

              dataQueue.notifyAll();

              synchronized (ackQueue) {

                ackQueue.addLast(one);

                ackQueue.notifyAll();

              }

              //利用生成的写入流将数据写入DataNode中的block

              blockStream.write(buf.array(), buf.position(), buf.remaining());

              if (one.lastPacketInBlock) {

                blockStream.writeInt(0); //表示此block写入完毕

              }

              blockStream.flush();

            } catch (Throwable e) {

            }

          }

          ......

      }

    其中重要的一个函数是nextBlockOutputStream,实现如下:

      private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {

        LocatedBlock lb = null;

        boolean retry = false;

        DatanodeInfo[] nodes;

        int count = conf.getInt("dfs.client.block.write.retries", 3);

        boolean success;

        do {

          ......

          //由NameNode为文件分配DataNode和block

          lb = locateFollowingBlock(startTime);

          block = lb.getBlock();

          nodes = lb.getLocations();

          //创建向DataNode的写入流

          success = createBlockOutputStream(nodes, clientName, false);

          ......

        } while (retry && --count >= 0);

        return nodes;

      }

    locateFollowingBlock中通过RPC调用namenode.addBlock(src, clientName)函数

    3.4、NameNode

    NameNode的addBlock函数实现如下:

      public LocatedBlock addBlock(String src,

                                   String clientName) throws IOException {

        LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);

        return locatedBlock;

      }

    FSNamesystem的getAdditionalBlock实现如下:

      public LocatedBlock getAdditionalBlock(String src,

                                             String clientName

                                             ) throws IOException {

        long fileLength, blockSize;

        int replication;

        DatanodeDescriptor clientNode = null;

        Block newBlock = null;

        ......

        //为新的block选择DataNode

        DatanodeDescriptor targets[] = replicator.chooseTarget(replication,

                                                               clientNode,

                                                               null,

                                                               blockSize);

        ......

        //得到文件路径中所有path的INode,其中最后一个是新添加的文件对的INode,状态为under construction

        INode[] pathINodes = dir.getExistingPathINodes(src);

        int inodesLen = pathINodes.length;

        INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction)

                                                    pathINodes[inodesLen - 1];

        //为文件分配block, 并设置在那写DataNode上

        newBlock = allocateBlock(src, pathINodes);

        pendingFile.setTargets(targets);

        ......

        return new LocatedBlock(newBlock, targets, fileLength);

      }

    3.5、客户端

    在分配了DataNode和block以后,createBlockOutputStream开始写入数据。

      private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,

                      boolean recoveryFlag) {

          //创建一个socket,链接DataNode

          InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());

          s = socketFactory.createSocket();

          int timeoutValue = 3000 * nodes.length + socketTimeout;

          s.connect(target, timeoutValue);

          s.setSoTimeout(timeoutValue);

          s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);

          long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +

                              datanodeWriteTimeout;

          DataOutputStream out = new DataOutputStream(

              new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout),

                                       DataNode.SMALL_BUFFER_SIZE));

          blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));

          //写入指令

          out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );

          out.write( DataTransferProtocol.OP_WRITE_BLOCK );

          out.writeLong( block.getBlockId() );

          out.writeLong( block.getGenerationStamp() );

          out.writeInt( nodes.length );

          out.writeBoolean( recoveryFlag );

          Text.writeString( out, client );

          out.writeBoolean(false);

          out.writeInt( nodes.length - 1 );

          //注意,次循环从1开始,而非从0开始。将除了第一个DataNode以外的另外两个DataNode的信息发送给第一个DataNode, 第一个DataNode可以根据此信息将数据写给另两个DataNode

          for (int i = 1; i < nodes.length; i++) {

            nodes[i].write(out);

          }

          checksum.writeHeader( out );

          out.flush();

          firstBadLink = Text.readString(blockReplyStream);

          if (firstBadLink.length() != 0) {

            throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);

          }

          blockStream = out;

      }

    客户端在DataStreamer的run函数中创建了写入流后,调用blockStream.write将数据写入DataNode

    3.6、DataNode

    DataNode的DataXceiver中,收到指令DataTransferProtocol.OP_WRITE_BLOCK则调用writeBlock函数:

      private void writeBlock(DataInputStream in) throws IOException {

        DatanodeInfo srcDataNode = null;

        //读入头信息

        Block block = new Block(in.readLong(),

            dataXceiverServer.estimateBlockSize, in.readLong());

        int pipelineSize = in.readInt(); // num of datanodes in entire pipeline

        boolean isRecovery = in.readBoolean(); // is this part of recovery?

        String client = Text.readString(in); // working on behalf of this client

        boolean hasSrcDataNode = in.readBoolean(); // is src node info present

        if (hasSrcDataNode) {

          srcDataNode = new DatanodeInfo();

          srcDataNode.readFields(in);

        }

        int numTargets = in.readInt();

        if (numTargets < 0) {

          throw new IOException("Mislabelled incoming datastream.");

        }

        //读入剩下的DataNode列表,如果当前是第一个DataNode,则此列表中收到的是第二个,第三个DataNode的信息,如果当前是第二个DataNode,则受到的是第三个DataNode的信息

        DatanodeInfo targets[] = new DatanodeInfo[numTargets];

        for (int i = 0; i < targets.length; i++) {

          DatanodeInfo tmp = new DatanodeInfo();

          tmp.readFields(in);

          targets[i] = tmp;

        }

        DataOutputStream mirrorOut = null;  // stream to next target

        DataInputStream mirrorIn = null;    // reply from next target

        DataOutputStream replyOut = null;   // stream to prev target

        Socket mirrorSock = null;           // socket to next target

        BlockReceiver blockReceiver = null; // responsible for data handling

        String mirrorNode = null;           // the name:port of next target

        String firstBadLink = "";           // first datanode that failed in connection setup

        try {

          //生成一个BlockReceiver, 其有成员变量DataInputStream in为从客户端或者上一个DataNode读取数据,还有成员变量DataOutputStream mirrorOut,用于向下一个DataNode写入数据,还有成员变量OutputStream out用于将数据写入本地。

          blockReceiver = new BlockReceiver(block, in,

              s.getRemoteSocketAddress().toString(),

              s.getLocalSocketAddress().toString(),

              isRecovery, client, srcDataNode, datanode);

          // get a connection back to the previous target

          replyOut = new DataOutputStream(

                         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));

          //如果当前不是最后一个DataNode,则同下一个DataNode建立socket连接

          if (targets.length > 0) {

            InetSocketAddress mirrorTarget = null;

            // Connect to backup machine

            mirrorNode = targets[0].getName();

            mirrorTarget = NetUtils.createSocketAddr(mirrorNode);

            mirrorSock = datanode.newSocket();

            int timeoutValue = numTargets * datanode.socketTimeout;

            int writeTimeout = datanode.socketWriteTimeout +

                                 (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);

            mirrorSock.connect(mirrorTarget, timeoutValue);

            mirrorSock.setSoTimeout(timeoutValue);

            mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);

            //创建向下一个DataNode写入数据的流

            mirrorOut = new DataOutputStream(

                 new BufferedOutputStream(

                             NetUtils.getOutputStream(mirrorSock, writeTimeout),

                             SMALL_BUFFER_SIZE));

            mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));

            mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );

            mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );

            mirrorOut.writeLong( block.getBlockId() );

            mirrorOut.writeLong( block.getGenerationStamp() );

            mirrorOut.writeInt( pipelineSize );

            mirrorOut.writeBoolean( isRecovery );

            Text.writeString( mirrorOut, client );

            mirrorOut.writeBoolean(hasSrcDataNode);

            if (hasSrcDataNode) { // pass src node information

              srcDataNode.write(mirrorOut);

            }

            mirrorOut.writeInt( targets.length - 1 );

            //此出也是从1开始,将除了下一个DataNode的其他DataNode信息发送给下一个DataNode

            for ( int i = 1; i < targets.length; i++ ) {

              targets[i].write( mirrorOut );

            }

            blockReceiver.writeChecksumHeader(mirrorOut);

            mirrorOut.flush();

          }

          //使用BlockReceiver接受block

          String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;

          blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,

                                     mirrorAddr, null, targets.length);

          ......

        } finally {

          // close all opened streams

          IOUtils.closeStream(mirrorOut);

          IOUtils.closeStream(mirrorIn);

          IOUtils.closeStream(replyOut);

          IOUtils.closeSocket(mirrorSock);

          IOUtils.closeStream(blockReceiver);

        }

      }

    BlockReceiver的receiveBlock函数中,一段重要的逻辑如下:

      void receiveBlock(

          DataOutputStream mirrOut, // output to next datanode

          DataInputStream mirrIn,   // input from next datanode

          DataOutputStream replyOut,  // output to previous datanode

          String mirrAddr, BlockTransferThrottler throttlerArg,

          int numTargets) throws IOException {

          ......

          //不断的接受package,直到结束

          while (receivePacket() > 0) {}

          if (mirrorOut != null) {

            try {

              mirrorOut.writeInt(0); // mark the end of the block

              mirrorOut.flush();

            } catch (IOException e) {

              handleMirrorOutError(e);

            }

          }

          ......

      }

    BlockReceiver的receivePacket函数如下:

      private int receivePacket() throws IOException {

        //从客户端或者上一个节点接收一个package

        int payloadLen = readNextPacket();

        buf.mark();

        //read the header

        buf.getInt(); // packet length

        offsetInBlock = buf.getLong(); // get offset of packet in block

        long seqno = buf.getLong();    // get seqno

        boolean lastPacketInBlock = (buf.get() != 0);

        int endOfHeader = buf.position();

        buf.reset();

        setBlockPosition(offsetInBlock);

        //将package写入下一个DataNode

        if (mirrorOut != null) {

          try {

            mirrorOut.write(buf.array(), buf.position(), buf.remaining());

            mirrorOut.flush();

          } catch (IOException e) {

            handleMirrorOutError(e);

          }

        }

        buf.position(endOfHeader);       

        int len = buf.getInt();

        offsetInBlock += len;

        int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*

                                                                checksumSize;

        int checksumOff = buf.position();

        int dataOff = checksumOff + checksumLen;

        byte pktBuf[] = buf.array();

        buf.position(buf.limit()); // move to the end of the data.

        ......

        //将数据写入本地的block

        out.write(pktBuf, dataOff, len);

        /// flush entire packet before sending ack

        flush();

        // put in queue for pending acks

        if (responder != null) {

          ((PacketResponder)responder.getRunnable()).enqueue(seqno,

                                          lastPacketInBlock);

        }

        return payloadLen;

      }

  • 相关阅读:
    hadoop再次集群搭建(3)-如何选择相应的hadoop版本
    48. Rotate Image
    352. Data Stream as Disjoint Interval
    163. Missing Ranges
    228. Summary Ranges
    147. Insertion Sort List
    324. Wiggle Sort II
    215. Kth Largest Element in an Array
    快速排序
    280. Wiggle Sort
  • 原文地址:https://www.cnblogs.com/JohnLiang/p/2243446.html
Copyright © 2011-2022 走看看