zoukankan      html  css  js  c++  java
  • HDFS源码分析DataXceiver之整体流程

    《HDFS源码分析之DataXceiverServer》一文中,我们了解到在DataNode中,有一个后台工作的线程DataXceiverServer。它被用于接收来自客户端或其他数据节点的数据读写请求,为每个数据读写请求创建一个单独的线程去处理。而处理每次读写请求时所创建的线程,就是本文要讲的DataXceiver。本文,我们来看下DataXceiver的具体实现,着重讲解下它得到数据读写请求后的整体处理流程。

            首先,我们先看下DataXceiver的成员变量,具体如下:

    [java] view plain copy
     
    1. // 封装了Socket、输入流、输出流的Peer,是DataXceiver线程工作的主要依托者  
    2. private Peer peer;  
    3.   
    4. // 通讯两端地址:远端地址remoteAddress、本地端地址localAddress,均是从peer(即socket)中获得的  
    5. private final String remoteAddress; // address of remote side  
    6. private final String localAddress;  // local address of this daemon  
    7.   
    8. // DataNode节点进程实例datanode  
    9. private final DataNode datanode;  
    10.   
    11. // DataNode节点配置信息dnConf  
    12. private final DNConf dnConf;  
    13.   
    14. // DataXceiverServer线程实例dataXceiverServer  
    15. private final DataXceiverServer dataXceiverServer;  
    16.   
    17. // 连接DataNode是否使用主机名,取参数dfs.datanode.use.datanode.hostname,参数未配置的话默认为false,不使用  
    18. private final boolean connectToDnViaHostname;  
    19.   
    20. // 接收到一个操作op的开始时间  
    21. private long opStartTime; //the start time of receiving an Op  
    22.   
    23. // InputStream输入流socketIn  
    24. private final InputStream socketIn;  
    25.   
    26. // OutputStream输出流socketOut  
    27. private OutputStream socketOut;  
    28.   
    29. // 数据块接收器BlockReceiver对象blockReceiver  
    30. private BlockReceiver blockReceiver = null;  
    31.   
    32. /** 
    33.  * Client Name used in previous operation. Not available on first request 
    34.  * on the socket. 
    35.  * previousOpClientName为之前操作的客户端名字,它对于socket上的第一个请求不可用 
    36.  */  
    37. private String previousOpClientName;  

            既然DataXceiver是为处理数据读写请求而创建的线程,那么Socket、输入流、输出流就是必不可少的成员。而首当其冲的Peer,便封装了Socket、输入流、输出流的Peer,是DataXceiver线程工作的主要依托者,而接下来的输入流socketIn、输出流socketOut都是来自peer的socket。另外,DataXceiver还提供了通讯两端地址:远端地址remoteAddress、本地端地址localAddress,均是从peer(即socket)中获得的。

            既然是由DataNode上的DataXceiverServer线程创建的,那么自然少不了datanode、dataXceiverServer、dnConf等变量,并且,它是专门用来处理数据读写请求的,自然也需要像数据块接收器BlockReceiver对象blockReceiver这种成员变量。dnConf是DNConf类型的数据节点DataNode上的配置信息。

            剩下的几个,便是在处理具体的数据读写请求时用到的connectToDnViaHostname、opStartTime、previousOpClientName等变量。其中,connectToDnViaHostname标识连接DataNode是否使用主机名,取参数dfs.datanode.use.datanode.hostname,参数未配置的话默认为false,不使用,opStartTime为接收到一个操作op的开始时间,最后的previousOpClientName为之前操作的客户端名字,它对于socket上的第一个请求不可用。

            下面我们再看下它的构造方法,只有一个private的,如下:

    [java] view plain copy
     
    1. /** 
    2.  * 私有构造函数,需要Peer、DataNode、DataXceiverServer三个参数 
    3.  */  
    4. private DataXceiver(Peer peer, DataNode datanode,  
    5.     DataXceiverServer dataXceiverServer) throws IOException {  
    6.   
    7. / peer、datanode、dataXceiverServer等成员变量直接赋值  
    8.   this.peer = peer;  
    9.   this.dnConf = datanode.getDnConf();  
    10.     
    11.   // 输入流socketIn、输出流socketOut来自peer的socket  
    12.   this.socketIn = peer.getInputStream();  
    13.   this.socketOut = peer.getOutputStream();  
    14.     
    15.   this.datanode = datanode;  
    16.   this.dataXceiverServer = dataXceiverServer;  
    17.     
    18.   // connectToDnViaHostname取自数据节点配置信息dnConf  
    19.   this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;  
    20.     
    21.   // 远端remoteAddress和本地localAddress地址取自Peer  
    22.   remoteAddress = peer.getRemoteAddressString();  
    23.   localAddress = peer.getLocalAddressString();  
    24.   
    25.   if (LOG.isDebugEnabled()) {  
    26.     LOG.debug("Number of active connections is: "  
    27.         + datanode.getXceiverCount());  
    28.   }  
    29. }  

            但是,它提供了一个类的静态create()方法,用于DataXceiver对象的构造,代码如下:

    [java] view plain copy
     
    1. /** 
    2.  * 提供了一个静态方法create(),调用私有构造函数构造DataXceiver对象 
    3.  */  
    4. public static DataXceiver create(Peer peer, DataNode dn,  
    5.     DataXceiverServer dataXceiverServer) throws IOException {  
    6.   return new DataXceiver(peer, dn, dataXceiverServer);  
    7. }  

            上述构造方法及静态create()方法都很简单,不再赘述。

            接下来,我们再着重分析下,DataXceiver线程在启动后,是如何处理来自客户端或者其他数据节点发送的数据读写请求的。既然是线程,那么就不得不看看它的run()方法,代码如下:

    [java] view plain copy
     
    1. /** 
    2.  * Read/write data from/to the DataXceiverServer. 
    3.  * 从DataXceiverServer中读取或者往DataXceiverServer中写入数据 
    4.  */  
    5. @Override  
    6. public void run() {  
    7.   int opsProcessed = 0;  
    8.   Op op = null;  
    9.   
    10.   try {  
    11.     // 在dataXceiverServer中增加peer与该DataXceiver实例所在线程和DataXceiver实例的映射关系  
    12.     dataXceiverServer.addPeer(peer, Thread.currentThread(), this);  
    13.       
    14.     // peer中设置socket写入超时时间  
    15.     peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);  
    16.       
    17.     InputStream input = socketIn;  
    18.     try {  
    19.         
    20.     // IOStreamPair为一个输入输出流对,既包含输入流,也包含输出流  
    21.       IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,  
    22.         socketIn, datanode.getXferAddress().getPort(),  
    23.         datanode.getDatanodeId());  
    24.         
    25.       // 包装saslStreams的输入流in为BufferedInputStream,得到输入流input,其缓冲区大小取参数io.file.buffer.size的一半,  
    26.       // 参数未配置的话默认为512,且最大也不能超过512  
    27.       input = new BufferedInputStream(saslStreams.in,  
    28.         HdfsConstants.SMALL_BUFFER_SIZE);  
    29.         
    30.       // 从saslStreams中获取输出流socketOut  
    31.       socketOut = saslStreams.out;  
    32.     } catch (InvalidMagicNumberException imne) {  
    33.       LOG.info("Failed to read expected encryption handshake from client " +  
    34.           "at " + peer.getRemoteAddressString() + ". Perhaps the client " +  
    35.           "is running an older version of Hadoop which does not support " +  
    36.           "encryption");  
    37.       return;  
    38.     }  
    39.       
    40.     // 调用父类initialize()方法,完成初始化,实际上就是设置父类的输入流in  
    41.     super.initialize(new DataInputStream(input));  
    42.       
    43.     // We process requests in a loop, and stay around for a short timeout.  
    44.     // This optimistic behaviour allows the other end to reuse connections.  
    45.     // Setting keepalive timeout to 0 disable this behavior.  
    46.       
    47.     // 在一个do...while循环内完成请求的处理。  
    48.     do {  
    49.         
    50.     // 更新当前线程名称,通过线程名标识进度的一种手段,不错  
    51.       updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));  
    52.   
    53.       try {  
    54.          
    55.         // 由于第一次是创建一个新的socket使用,连接的时间可能会很长,所以连接超时时间设置的比较大,  
    56.         // 而后续使用的话,是复用socket,连接的超时时间限制就没必要设置那么大了  
    57.         if (opsProcessed != 0) {  
    58.             
    59.         // 如果不是第一次出来请求,确保dnConf的socketKeepaliveTimeout大于0,  
    60.         // 将其设置为设置peer(即socket)的读超时时间,  
    61.         // 取参数dfs.datanode.socket.reuse.keepalive,参数为配置的话,默认为4s  
    62.           assert dnConf.socketKeepaliveTimeout > 0;  
    63.           peer.setReadTimeout(dnConf.socketKeepaliveTimeout);  
    64.         } else {  
    65.             
    66.         // 最开始第一次处理请求时,设置peer(即socket)读超时时间为dnConf的socketTimeout  
    67.         // 即取参数dfs.client.socket-timeout,参数未配置的话默认为60s  
    68.           peer.setReadTimeout(dnConf.socketTimeout);  
    69.         }  
    70.           
    71.         // 通过readOp()方法读取操作符op  
    72.         op = readOp();  
    73.       } catch (InterruptedIOException ignored) {  
    74.         // Time out while we wait for client rpc  
    75.         // 如果是InterruptedIOException异常,跳出循环  
    76.         break;  
    77.       } catch (IOException err) {  
    78.         // Since we optimistically expect the next op, it's quite normal to get EOF here.  
    79.         if (opsProcessed > 0 &&  
    80.             (err instanceof EOFException || err instanceof ClosedChannelException)) {  
    81.           if (LOG.isDebugEnabled()) {  
    82.             LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");  
    83.           }  
    84.         } else {  
    85.           throw err;  
    86.         }  
    87.         break;  
    88.       }  
    89.   
    90.       // restore normal timeout  
    91.       // 重新存储正常的超时时间,即dnConf的socketTimeout  
    92.       if (opsProcessed != 0) {  
    93.         peer.setReadTimeout(dnConf.socketTimeout);  
    94.       }  
    95.   
    96.       // 设置操作的起始时间opStartTime  
    97.       opStartTime = now();  
    98.         
    99.       // 通过processOp()方法根据操作符op调用相应的方法处理操作符op  
    100.       processOp(op);  
    101.         
    102.       // 累加操作数  
    103.       ++opsProcessed;  
    104.     } while ((peer != null) &&  
    105.         (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));  
    106.     // 循环的条件便是:peer未关闭且复用超时时间socketKeepaliveTimeout大于0  
    107.       
    108.   } catch (Throwable t) {  
    109.     String s = datanode.getDisplayName() + ":DataXceiver error processing "  
    110.         + ((op == null) ? "unknown" : op.name()) + " operation "  
    111.         + " src: " + remoteAddress + " dst: " + localAddress;  
    112.     if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {  
    113.       // For WRITE_BLOCK, it is okay if the replica already exists since  
    114.       // client and replication may write the same block to the same datanode  
    115.       // at the same time.  
    116.       if (LOG.isTraceEnabled()) {  
    117.         LOG.trace(s, t);  
    118.       } else {  
    119.         LOG.info(s + "; " + t);  
    120.       }  
    121.     } else {  
    122.       LOG.error(s, t);  
    123.     }  
    124.   } finally {  
    125.       
    126.     if (LOG.isDebugEnabled()) {  
    127.       LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "  
    128.           + datanode.getXceiverCount());  
    129.     }  
    130.       
    131.     // 更新当前线程名称  
    132.     updateCurrentThreadName("Cleaning up");  
    133.       
    134.     // 关闭peer(socket)、输入流等资源  
    135.     if (peer != null) {  
    136.       dataXceiverServer.closePeer(peer);  
    137.       IOUtils.closeStream(in);  
    138.     }  
    139.   }  
    140. }  

            run()方法的处理流程逻辑十分清晰,概括如下:

            1、在dataXceiverServer中增加peer与该DataXceiver实例所在线程和DataXceiver实例的映射关系;

            2、peer中设置socket写入超时时间,取参数dfs.datanode.socket.write.timeout,参数未配置的话默认为8分钟;

            3、获取IOStreamPair类型的saslStreams,其为一个输入输出流对,既包含输入流,也包含输出流;

            4、包装saslStreams的输入流in为BufferedInputStream,得到输入流input,其缓冲区大小取参数io.file.buffer.size的一半,参数未配置的话默认为512,且最大也不能超过512;

            5、从saslStreams中获取输出流socketOut;

            6、调用父类initialize()方法,完成初始化,实际上就是设置父类的输入流in;

            7、在一个do...while循环内完成请求的处理,循环的条件便是--peer未关闭且复用超时时间socketKeepaliveTimeout大于0:

                  7.1、更新当前线程名称,通过线程名标识进度的一种手段,不错,线程名此时为Waiting for operation #100(100为操作处理次数累加器的下一个值);

                  7.2、处理读超时时间设置:由于第一次是创建一个新的socket使用,连接的时间可能会很长,所以连接超时时间设置的比较大,而后续使用的话,是复用socket,连接的超时时间限制就没必要设置那么大了。所以,最开始第一次处理请求时,设置peer(即socket)读超时时间为dnConf的socketTimeout,即取参数dfs.client.socket-timeout,参数未配置的话默认为60s;如果不是第一次出来请求,确保dnConf的socketKeepaliveTimeout大于0,将其设置为设置peer(即socket)的读超时时间,取参数dfs.datanode.socket.reuse.keepalive,参数为配置的话,默认为4s;

                  7.3、通过readOp()方法读取操作符op;

                  7.4、重新存储正常的超时时间,即dnConf的socketTimeout;

                  7.5、设置操作的起始时间opStartTime,为当前时间;

                  7.6、通过processOp()方法根据操作符op调用相应的方法处理操作符op;

                  7.7、累加操作数opsProcessed;

            8、更新当前线程名称:Cleaning up;

            9、关闭peer(socket)、输入流等资源。

            实际上,对于读写请求的处理的一个主线,便是在socket未关闭的情况下,不停的读取操作符,然后调用相应的方法处理,也就是do...while循环内的op = readOp()-----processOp(op)这一处理主线。

            下面,我们来看下读取操作符的readOp()方法,它位于DataXceiver的父类Receiver中。代码如下:

    [java] view plain copy
     
    1.  /** Read an Op.  It also checks protocol version. */  
    2.  protected final Op readOp() throws IOException {  
    3.      
    4. // 首先从输入流in中读入版本号version,short类型,占2个字节  
    5. final short version = in.readShort();  
    6.   
    7. // 校验版本号version是否与DataTransferProtocol中的DATA_TRANSFER_VERSION相等,该版本中为28  
    8.    if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {  
    9.      throw new IOException( "Version Mismatch (Expected: " +  
    10.          DataTransferProtocol.DATA_TRANSFER_VERSION  +  
    11.          ", Received: " +  version + " )");  
    12.    }  
    13.      
    14.    // 调用Op的read()方法,从输入流in中获取操作符op  
    15.    return Op.read(in);  
    16.  }  

            代码中有详细注释,不再解释。继续追踪Op的read()方法,代码如下:

    [java] view plain copy
     
    1. private static final int FIRST_CODE = values()[0].code;  
    2. /** Return the object represented by the code. */  
    3. private static Op valueOf(byte code) {  
    4.   final int i = (code & 0xff) - FIRST_CODE;  
    5.   return i < 0 || i >= values().length? null: values()[i];  
    6. }  
    7.   
    8. /** Read from in */  
    9. public static Op read(DataInput in) throws IOException {  
    10.   return valueOf(in.readByte());  
    11. }  

            很简单,通过read()方法从输入流读取byte,并通过valueOf()方法,首先将byte转化为int,然后减去Op操作符枚举类型的第一个值:WRITE_BLOCK,即80,得到i。如果i小于0或者大于枚举中操作符的个数,说明输入流中传入的操作符不在枚举范围内,否则利用i作为索引取出相应的操作符。枚举类型如下:

    [java] view plain copy
     
    1. WRITE_BLOCK((byte)80),  
    2. READ_BLOCK((byte)81),  
    3. READ_METADATA((byte)82),  
    4. REPLACE_BLOCK((byte)83),  
    5. COPY_BLOCK((byte)84),  
    6. BLOCK_CHECKSUM((byte)85),  
    7. TRANSFER_BLOCK((byte)86),  
    8. REQUEST_SHORT_CIRCUIT_FDS((byte)87),  
    9. RELEASE_SHORT_CIRCUIT_FDS((byte)88),  
    10. REQUEST_SHORT_CIRCUIT_SHM((byte)89);  

            比较简单,写数据块为80,读数据块为81等,不再一一介绍。操作符为int类型,也就意味着它占4个字节。

            接下来,我们再看下处理操作符的processOp()方法,同样在DataXceiver的父类Receiver中。代码如下:

    [java] view plain copy
     
    1.  /** Process op by the corresponding method. */  
    2.  protected final void processOp(Op op) throws IOException {  
    3.   
    4. // 通过调用相应的方法处理操作符  
    5.    switch(op) {  
    6.    case READ_BLOCK:// 读数据块调用opReadBlock()方法  
    7.      opReadBlock();  
    8.      break;  
    9.    case WRITE_BLOCK:// 写数据块调用opWriteBlock()方法  
    10.      opWriteBlock(in);  
    11.      break;  
    12.    case REPLACE_BLOCK:// 替换数据块调用opReplaceBlock()方法  
    13.      opReplaceBlock(in);  
    14.      break;  
    15.    case COPY_BLOCK:// 复制数据块调用REPLACE()方法  
    16.      opCopyBlock(in);  
    17.      break;  
    18.    case BLOCK_CHECKSUM:// 数据块检验调用opBlockChecksum()方法  
    19.      opBlockChecksum(in);  
    20.      break;  
    21.    case TRANSFER_BLOCK:// 移动数据块调用opTransferBlock()方法  
    22.      opTransferBlock(in);  
    23.      break;  
    24.    case REQUEST_SHORT_CIRCUIT_FDS:  
    25.      opRequestShortCircuitFds(in);  
    26.      break;  
    27.    case RELEASE_SHORT_CIRCUIT_FDS:  
    28.      opReleaseShortCircuitFds(in);  
    29.      break;  
    30.    case REQUEST_SHORT_CIRCUIT_SHM:  
    31.      opRequestShortCircuitShm(in);  
    32.      break;  
    33.    default:  
    34.      throw new IOException("Unknown op " + op + " in data stream");  
    35.    }  
    36.  }  

            一目了然,根据操作符的不同,调用不同的方法去处理。比如读数据块调用opReadBlock()方法,写数据块调用opWriteBlock()方法,替换数据块调用opReplaceBlock()方法等等,读者可自行阅读。
            至此,HDFS源码分析DataXceiver之整体流程全部叙述完毕。后续文章会陆续推出对于写数据块、读数数据块、替换数据块、移动数据块等的详细操作,以及DataXceiver线程中用到的数据块发送器BlockSender、数据块接收器BlockReceiver的详细分析,敬请期待!

  • 相关阅读:
    Bootstrap表格的使用
    [JS练习] 瀑布流照片墙
    [C#基础] 委托
    [C#基础] 泛型
    [C#基础] 继承
    [C#基础] 类
    [C#基础] 数据类型
    Unity获取手机的电量时间
    C#网络通信Socket详解
    记C#一次服务器搭建和数据库应用
  • 原文地址:https://www.cnblogs.com/jirimutu01/p/5556184.html
Copyright © 2011-2022 走看看