zoukankan      html  css  js  c++  java
  • hdfs源码分析第二弹

     以写文件为例,串联整个流程的源码:

     FSDataOutputStream out = fs.create(outFile);

    1. DistributedFileSystem

    继承并实现了FileSystem,该对象是终端用户和hadoop分布式文件系统交互的接口。

    原文说明:

    /****************************************************************
     * Implementation of the abstract FileSystem for the DFS system.
     * This object is the way end-user code interacts with a Hadoop
     * DistributedFileSystem.
     *
     *****************************************************************/

    调用create方法:

     @Override
      public FSDataOutputStream create(final Path f, final FsPermission permission,
        final EnumSet<CreateFlag> cflags, final int bufferSize,
        final short replication, final long blockSize, final Progressable progress,
        final ChecksumOpt checksumOpt) throws IOException {
        statistics.incrementWriteOps(1);
        Path absF = fixRelativePart(f);
        return new FileSystemLinkResolver<FSDataOutputStream>() {
          @Override
          public FSDataOutputStream doCall(final Path p)
              throws IOException, UnresolvedLinkException {
            final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
                    cflags, replication, blockSize, progress, bufferSize,
                    checksumOpt);
            return dfs.createWrappedOutputStream(dfsos, statistics);
          }
          @Override
          public FSDataOutputStream next(final FileSystem fs, final Path p)
              throws IOException {
            return fs.create(p, permission, cflags, bufferSize,
                replication, blockSize, progress, checksumOpt);
          }
        }.resolve(this, absF);
      }

    2. DFSClient

    调用Create方法:

     /**
       * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
       * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
       * a hint to where the namenode should place the file blocks.
       * The favored nodes hint is not persisted in HDFS. Hence it may be honored
       * at the creation time only. HDFS could move the blocks during balancing or
       * replication, to move the blocks from favored nodes. A value of null means
       * no favored nodes for this create
       */
      public DFSOutputStream create(String src, 
                                 FsPermission permission,
                                 EnumSet<CreateFlag> flag, 
                                 boolean createParent,
                                 short replication,
                                 long blockSize,
                                 Progressable progress,
                                 int buffersize,
                                 ChecksumOpt checksumOpt,
                                 InetSocketAddress[] favoredNodes) throws IOException {
        checkOpen();
        if (permission == null) {
          permission = FsPermission.getFileDefault();
        }
        FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
        if(LOG.isDebugEnabled()) {
          LOG.debug(src + ": masked=" + masked);
        }
        String[] favoredNodeStrs = null;
        if (favoredNodes != null) {
          favoredNodeStrs = new String[favoredNodes.length];
          for (int i = 0; i < favoredNodes.length; i++) {
            favoredNodeStrs[i] = 
                favoredNodes[i].getHostName() + ":" 
                             + favoredNodes[i].getPort();
          }
        }
        final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
            src, masked, flag, createParent, replication, blockSize, progress,
            buffersize, dfsClientConf.createChecksum(checksumOpt),
            favoredNodeStrs);
        beginFileLease(result.getFileId(), result);
        return result;
      }

    3. DFSOutputStream

      DFSOutputStream根据字节流创建文件。客户端应用先将数据写入流的缓存中,然后数据分解成包的形式,每个报文包(packet)通常为64k,一个报文包由多个块(chuck)组成,每个块通常为512比特,且存在一个关联的checksum(类似于文件的md5值)。

      当客户端应用向当前包报文写入数据时,数据排队进入数据队列(dataQueue),DataStreamer线程从数据队列中接收这些数据,然后发送到管道的第一个数据节点(datanode),并将它从数据队列中移动到响应队列(ackQueue)。响应处理器(ResponseProcessor)接收数据节点的响应。 当从所有的数据节点接收到一个成功的响应包报文时,ResponseProcessor将相应的包报文从响应队列中移除。

      当发送错误时,所有未完成的报文从响应队列中移除。从最初的管道线中关闭旧的坏的数据节点,然后新建一个管道线。此时DataStreamer开始从数据节点中发送数据包了。

    原文如下:

    /****************************************************************
     * DFSOutputStream creates files from a stream of bytes.
     *
     * The client application writes data that is cached internally by
     * this stream. Data is broken up into packets, each packet is
     * typically 64K in size. A packet comprises of chunks. Each chunk
     * is typically 512 bytes and has an associated checksum with it.
     *
     * When a client application fills up the currentPacket, it is
     * enqueued into dataQueue.  The DataStreamer thread picks up
     * packets from the dataQueue, sends it to the first datanode in
     * the pipeline and moves it from the dataQueue to the ackQueue.
     * The ResponseProcessor receives acks from the datanodes. When an
     * successful ack for a packet is received from all datanodes, the
     * ResponseProcessor removes the corresponding packet from the
     * ackQueue.
     *
     * In case of error, all outstanding packets and moved from
     * ackQueue. A new pipeline is setup by eliminating the bad
     * datanode from the original pipeline. The DataStreamer now
     * starts sending packets from the dataQueue.
    ****************************************************************/
     static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
          FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
          short replication, long blockSize, Progressable progress, int buffersize,
          DataChecksum checksum, String[] favoredNodes) throws IOException {
        HdfsFileStatus stat = null;
    
        // Retry the create if we get a RetryStartFileException up to a maximum
        // number of times
        boolean shouldRetry = true;
        int retryCount = CREATE_RETRY_COUNT;
        while (shouldRetry) {
          shouldRetry = false;
          try {
            stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
                new EnumSetWritable<CreateFlag>(flag), createParent, replication,
                blockSize, SUPPORTED_CRYPTO_VERSIONS);
            break;
          } catch (RemoteException re) {
            IOException e = re.unwrapRemoteException(
                AccessControlException.class,
                DSQuotaExceededException.class,
                FileAlreadyExistsException.class,
                FileNotFoundException.class,
                ParentNotDirectoryException.class,
                NSQuotaExceededException.class,
                RetryStartFileException.class,
                SafeModeException.class,
                UnresolvedPathException.class,
                SnapshotAccessControlException.class,
                UnknownCryptoProtocolVersionException.class);
            if (e instanceof RetryStartFileException) {
              if (retryCount > 0) {
                shouldRetry = true;
                retryCount--;
              } else {
                throw new IOException("Too many retries because of encryption" +
                    " zone operations", e);
              }
            } else {
              throw e;
            }
          }
        }
        Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
        final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes);
        out.start();
        return out;
      }

    Packet

     private static class Packet {
        private static final long HEART_BEAT_SEQNO = -1L;
        long seqno; // sequencenumber of buffer in block
        final long offsetInBlock; // offset in block
        boolean syncBlock; // this packet forces the current block to disk
        int numChunks; // number of chunks currently in packet
        final int maxChunks; // max chunks in packet
        private byte[] buf;
        private boolean lastPacketInBlock; // is this the last packet in block?
    
        /**
         * buf is pointed into like follows:
         *  (C is checksum data, D is payload data)
         *
         * [_________CCCCCCCCC________________DDDDDDDDDDDDDDDD___]
         *           ^        ^               ^               ^
         *           |        checksumPos     dataStart       dataPos
         *           checksumStart
         * 
         * Right before sending, we move the checksum data to immediately precede
         * the actual data, and then insert the header into the buffer immediately
         * preceding the checksum data, so we make sure to keep enough space in
         * front of the checksum data to support the largest conceivable header. 
         */
        int checksumStart;
        int checksumPos;
        final int dataStart;
        int dataPos;
    
        /**
         * Create a new packet.
         * 
         * @param pktSize maximum size of the packet, 
         *                including checksum data and actual data.
         * @param chunksPerPkt maximum number of chunks per packet.
         * @param offsetInBlock offset in bytes into the HDFS block.
         */
        private Packet(byte[] buf, int chunksPerPkt, long offsetInBlock, long seqno,
            int checksumSize) {
          this.lastPacketInBlock = false;
          this.numChunks = 0;
          this.offsetInBlock = offsetInBlock;
          this.seqno = seqno;
    
          this.buf = buf;
    
          checksumStart = PacketHeader.PKT_MAX_HEADER_LEN;
          checksumPos = checksumStart;
          dataStart = checksumStart + (chunksPerPkt * checksumSize);
          dataPos = dataStart;
          maxChunks = chunksPerPkt;
        }
    }

    DataStreamer

    DataStreamer负责发送数据报文包到管道中的数据节点。它从名称节点获取到新的blockid和block位置后,开始发送流报文到它的管道中。每个报文包有一个唯一的序列号。当块中所有报文发送完成并接受到响应报文时,DataStreamer将会关闭当前的block。

      private synchronized void start() {
        streamer.start();
      }

    原文如下:

      //
      // The DataStreamer class is responsible for sending data packets to the
      // datanodes in the pipeline. It retrieves a new blockid and block locations
      // from the namenode, and starts streaming packets to the pipeline of
      // Datanodes. Every packet has a sequence number associated with
      // it. When all the packets for a block are sent out and acks for each
      // if them are received, the DataStreamer closes the current block.
      //

    继承了Daemon(后台线程),间接继承了Thread类,因此其核心方法为run():

     /*
         * streamer thread is the only thread that opens streams to datanode, 
         * and closes them. Any error recovery is also done by this thread.
         */
        @Override
        public void run() {
          long lastPacket = Time.now();
          TraceScope traceScope = null;
          if (traceSpan != null) {
            traceScope = Trace.continueSpan(traceSpan);
          }
          while (!streamerClosed && dfsClient.clientRunning) {
    
            // if the Responder encountered an error, shutdown Responder
            if (hasError && response != null) {
              try {
                response.close();
                response.join();
                response = null;
              } catch (InterruptedException  e) {
                DFSClient.LOG.warn("Caught exception ", e);
              }
            }
    
            Packet one;
            try {
              // process datanode IO errors if any
              boolean doSleep = false;
              if (hasError && (errorIndex >= 0 || restartingNodeIndex >= 0)) {
                doSleep = processDatanodeError();
              }
    
              synchronized (dataQueue) {
                // wait for a packet to be sent.
                long now = Time.now();
                while ((!streamerClosed && !hasError && dfsClient.clientRunning 
                    && dataQueue.size() == 0 && 
                    (stage != BlockConstructionStage.DATA_STREAMING || 
                     stage == BlockConstructionStage.DATA_STREAMING && 
                     now - lastPacket < dfsClient.getConf().socketTimeout/2)) || doSleep ) {
                  long timeout = dfsClient.getConf().socketTimeout/2 - (now-lastPacket);
                  timeout = timeout <= 0 ? 1000 : timeout;
                  timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                     timeout : 1000;
                  try {
                    dataQueue.wait(timeout);
                  } catch (InterruptedException  e) {
                    DFSClient.LOG.warn("Caught exception ", e);
                  }
                  doSleep = false;
                  now = Time.now();
                }
                if (streamerClosed || hasError || !dfsClient.clientRunning) {
                  continue;
                }
                // get packet to be sent.
                if (dataQueue.isEmpty()) {
                  one = createHeartbeatPacket();
                } else {
                  one = dataQueue.getFirst(); // regular data packet
                }
              }
              assert one != null;
    
              // get new block from namenode.
              if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
                if(DFSClient.LOG.isDebugEnabled()) {
                  DFSClient.LOG.debug("Allocating new block");
                }
                setPipeline(nextBlockOutputStream());
                initDataStreaming();
              } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
                if(DFSClient.LOG.isDebugEnabled()) {
                  DFSClient.LOG.debug("Append to block " + block);
                }
                setupPipelineForAppendOrRecovery();
                initDataStreaming();
              }
    
              long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
              if (lastByteOffsetInBlock > blockSize) {
                throw new IOException("BlockSize " + blockSize +
                    " is smaller than data size. " +
                    " Offset of packet in block " + 
                    lastByteOffsetInBlock +
                    " Aborting file " + src);
              }
    
              if (one.lastPacketInBlock) {
                // wait for all data packets have been successfully acked
                synchronized (dataQueue) {
                  while (!streamerClosed && !hasError && 
                      ackQueue.size() != 0 && dfsClient.clientRunning) {
                    try {
                      // wait for acks to arrive from datanodes
                      dataQueue.wait(1000);
                    } catch (InterruptedException  e) {
                      DFSClient.LOG.warn("Caught exception ", e);
                    }
                  }
                }
                if (streamerClosed || hasError || !dfsClient.clientRunning) {
                  continue;
                }
                stage = BlockConstructionStage.PIPELINE_CLOSE;
              }
              
              // send the packet
              synchronized (dataQueue) {
                // move packet from dataQueue to ackQueue
                if (!one.isHeartbeatPacket()) {
                  dataQueue.removeFirst();
                  ackQueue.addLast(one);
                  dataQueue.notifyAll();
                }
              }
    
              if (DFSClient.LOG.isDebugEnabled()) {
                DFSClient.LOG.debug("DataStreamer block " + block +
                    " sending packet " + one);
              }
    
              // write out data to remote datanode
              try {
                one.writeTo(blockStream);
                blockStream.flush();   
              } catch (IOException e) {
                // HDFS-3398 treat primary DN is down since client is unable to 
                // write to primary DN. If a failed or restarting node has already
                // been recorded by the responder, the following call will have no 
                // effect. Pipeline recovery can handle only one node error at a
                // time. If the primary node fails again during the recovery, it
                // will be taken out then.
                tryMarkPrimaryDatanodeFailed();
                throw e;
              }
              lastPacket = Time.now();
              
              // update bytesSent
              long tmpBytesSent = one.getLastByteOffsetBlock();
              if (bytesSent < tmpBytesSent) {
                bytesSent = tmpBytesSent;
              }
    
              if (streamerClosed || hasError || !dfsClient.clientRunning) {
                continue;
              }
    
              // Is this block full?
              if (one.lastPacketInBlock) {
                // wait for the close packet has been acked
                synchronized (dataQueue) {
                  while (!streamerClosed && !hasError && 
                      ackQueue.size() != 0 && dfsClient.clientRunning) {
                    dataQueue.wait(1000);// wait for acks to arrive from datanodes
                  }
                }
                if (streamerClosed || hasError || !dfsClient.clientRunning) {
                  continue;
                }
    
                endBlock();
              }
              if (progress != null) { progress.progress(); }
    
              // This is used by unit test to trigger race conditions.
              if (artificialSlowdown != 0 && dfsClient.clientRunning) {
                Thread.sleep(artificialSlowdown); 
              }
            } catch (Throwable e) {
              // Log warning if there was a real error.
              if (restartingNodeIndex == -1) {
                DFSClient.LOG.warn("DataStreamer Exception", e);
              }
              if (e instanceof IOException) {
                setLastException((IOException)e);
              } else {
                setLastException(new IOException("DataStreamer Exception: ",e));
              }
              hasError = true;
              if (errorIndex == -1 && restartingNodeIndex == -1) {
                // Not a datanode issue
                streamerClosed = true;
              }
            }
          }
          if (traceScope != null) {
            traceScope.close();
          }
          closeInternal();
        }

    ResponseProcessor

    处理数据节点的响应。当接收到响应时,将一个包报文从响应队列中删除。

    DataStreamer的run方法启动了ResponseProcessor线程:

        /**
         * Initialize for data streaming
         */
        private void initDataStreaming() {
          this.setName("DataStreamer for file " + src +
              " block " + block);
          response = new ResponseProcessor(nodes);
          response.start();
          stage = BlockConstructionStage.DATA_STREAMING;
        }

     

    原文描述:

        //
        // Processes responses from the datanodes.  A packet is removed
        // from the ackQueue when its response arrives.
        //

    继承了Daemon(后台线程),间接继承了Thread类,因此其核心方法为run():

     public void run() {
    
            setName("ResponseProcessor for block " + block);
            PipelineAck ack = new PipelineAck();
    
            while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
              // process responses from datanodes.
              try {
                // read an ack from the pipeline
                long begin = Time.monotonicNow();
                ack.readFields(blockReplyStream);
                long duration = Time.monotonicNow() - begin;
                if (duration > dfsclientSlowLogThresholdMs
                    && ack.getSeqno() != Packet.HEART_BEAT_SEQNO) {
                  DFSClient.LOG
                      .warn("Slow ReadProcessor read fields took " + duration
                          + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: "
                          + ack + ", targets: " + Arrays.asList(targets));
                } else if (DFSClient.LOG.isDebugEnabled()) {
                  DFSClient.LOG.debug("DFSClient " + ack);
                }
    
                long seqno = ack.getSeqno();
                // processes response status from datanodes.
                for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
                  final Status reply = ack.getReply(i);
                  // Restart will not be treated differently unless it is
                  // the local node or the only one in the pipeline.
                  if (PipelineAck.isRestartOOBStatus(reply) &&
                      shouldWaitForRestart(i)) {
                    restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
                        Time.now();
                    setRestartingNodeIndex(i);
                    String message = "A datanode is restarting: " + targets[i];
                    DFSClient.LOG.info(message);
                   throw new IOException(message);
                  }
                  // node error
                  if (reply != SUCCESS) {
                    setErrorIndex(i); // first bad datanode
                    throw new IOException("Bad response " + reply +
                        " for block " + block +
                        " from datanode " + 
                        targets[i]);
                  }
                }
                
                assert seqno != PipelineAck.UNKOWN_SEQNO : 
                  "Ack for unknown seqno should be a failed ack: " + ack;
                if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
                  continue;
                }
    
                // a success ack for a data packet
                Packet one;
                synchronized (dataQueue) {
                  one = ackQueue.getFirst();
                }
                if (one.seqno != seqno) {
                  throw new IOException("ResponseProcessor: Expecting seqno " +
                                        " for block " + block +
                                        one.seqno + " but received " + seqno);
                }
                isLastPacketInBlock = one.lastPacketInBlock;
    
                // Fail the packet write for testing in order to force a
                // pipeline recovery.
                if (DFSClientFaultInjector.get().failPacket() &&
                    isLastPacketInBlock) {
                  failPacket = true;
                  throw new IOException(
                        "Failing the last packet for testing.");
                }
                  
                // update bytesAcked
                block.setNumBytes(one.getLastByteOffsetBlock());
    
                synchronized (dataQueue) {
                  lastAckedSeqno = seqno;
                  ackQueue.removeFirst();
                  dataQueue.notifyAll();
    
                  one.releaseBuffer(byteArrayManager);
                }
              } catch (Exception e) {
                if (!responderClosed) {
                  if (e instanceof IOException) {
                    setLastException((IOException)e);
                  }
                  hasError = true;
                  // If no explicit error report was received, mark the primary
                  // node as failed.
                  tryMarkPrimaryDatanodeFailed();
                  synchronized (dataQueue) {
                    dataQueue.notifyAll();
                  }
                  if (restartingNodeIndex == -1) {
                    DFSClient.LOG.warn("DFSOutputStream ResponseProcessor exception "
                         + " for block " + block, e);
                  }
                  responderClosed = true;
                }
              }
            }
          }

     小结:

      从上面的源码分析我们可以知道:

      DFSOutputStream是hdfs写文件的主类,它通过DataStreamer来写文件,并通过ResponseProcessor来处理数据节点的返回信息。

  • 相关阅读:
    单例模式 (线程安全)
    Hystrix (容错,回退,降级,缓存)
    Feign负载均衡
    Ribbon远程调用
    Eureka服务注册与发现
    适配器模式
    docker学习(二)
    使用Eclipse进行远程调试(转)
    docker学习(一)
    Mysql批量更新的一个坑-&allowMultiQueries=true允许批量更新(转)
  • 原文地址:https://www.cnblogs.com/davidwang456/p/4778810.html
Copyright © 2011-2022 走看看