zoukankan      html  css  js  c++  java
  • HDFS pipeline写 -- datanode

    站在DataNode的视角,看看pipeline写的流程,本文不分析客户端部分,从客户端写数据之前拿到了3个可写的block位置说起。

    每个datanode会创建一个线程DataXceiverServer,接收上游过来的TCP连接,对于每个新建的TCP连接,都会创建一个叫做DataXceiver的线程处理这个连接. 这个线程不断的从TCP连接中读op,然后调用processOp(op)处理这个op,这里以write block 这个op为例.

    对于datanode来说,write block操作由DataXceiver的writeBlock函数实现.

    大体步骤如下:

    1. new 一个BlockReceiver对象,随后用于接收上游(client或者datanode)的block数据.

    2. 根据传进来的DatanodeInfo数组,向数组的第一个元素代表的datanode建立TCP连接,targets参数是从上游的TCP连接中解析出来的,逻辑在Receiver的opWriteBlock方法中,Receiver是DataXceiver的基类.然后调用Sender的writeBlock方法给下游datanode发送write block相关元信息,包括DatanodeInfo数组(刨去第一个元素),clientname,block的当前gs,minBytesRcvd,maxBytesRcvd(对于append,recovery操作有用)等。然后读取下游的回复封装在BlockOpResponseProto对象中,可以通过内部成员firstBadLink知道建pipeline中第一个失败的datanode节点。接着将BlockOpResponseProto回复给上游
      (datanode或者client),最后调用第一步new的BlockReceiver的receiveBlock方法用于接收一个完整的block.如下:

      receiveBlock内部根据clientname发现是一个客户端在写block,创建一个PacketResponder线程用于处理下游datanode对packet的ack.PacketResponder后面分析。接着,不断的调用receivePacket()方法从上游(datanode或者client)接收一个个的packet,接收一个完整的packet的逻辑是由内部的PacketReceiver来处理的.
      对于一个接收到的packet,写入block file文件,同时checksum信息写meta文件,然后放入PacketResponder的ack queue队列,然后将packet写给下游的datanode。最后调用PacketResponder的 close方法,这个方法会等到ack queue为空,即所有packet都已经从下游收到,并且已经给上游ack.

    3. receiveBlock()结束后,关掉和上下游的连接.

    清空ack queue的逻辑由专门处理下游ack包的PacketResponder线程处理,逻辑如下:

    1. 如果datanode是pipeline的中间node(通过PacketResponder的type属性来决定,LAST_IN_PIPELINE和HAS_DOWNSTREAM_IN_PIPELINE),
      那么从下游读一个PipelineAck,从ack中拿到seqno,然后从ack queue中get(不删除)第一个packet,拿出seqno,记作expected_seq_no,然后比较是否相等,如果不相等,说明写出错. 如果seqno相同,往下.

    2. 如果从ack queue中get的packet是block的最后一个packet,说明一个block接收完成.那么调用finalizeBlock方法.finalizeBlock方法逻辑如下:

      关闭block file和meta file文件,调用FsDatasetImpl的finalizeBlock(block)将block文件以及对应的meta文件移动到对应的block pool下的finalized目录下,然后生成一个FinalizedReplica对象,将bpid->FinalizedReplica的映射关系记录在内存中的volumnMap中,对象位于FsDatasetImpl下的ReplicaMap volumnMap(从ReplicaMap中定位一个ReplicaInfo,需要拿着bpid和block id去找)最后调用datanode的closeBlock()方法,将block回报给namenode,该方法逻辑如下:

      拿着block的bpid从BlockPoolManager中拿到相应的BPOfferService,通知namenode这个block。在data node这边,data node和每个namenode的接口由一
      个BPServiceActor来承担,这是一个线程, 这个线程会向namenode汇报received block或者指示namenode去删除block.最后调用DatanodeProtocolClientSideTranslatorPB bpNamenode的blockReceivedAndDeleted()将block信息汇报上去.

    3. 给从下游接收的ack回复给上游。

    4. 将packet从ack queue的头部删除。

    可以看出,一个block的写操作对于每个data node来说,由两个线程参与,一个是DataXceiver,用于接收上游的数据,一个是PacketResponder,用于处理下游回来的ack。还没有接收到下游的ack并且没有给上游回复ack的packet都存在在ack queue中。

    参考资料

    hadoop-hdfs-2.4.1.jar

  • 相关阅读:
    http://www.cplusplus.com/reference/string/string/find_last_of/

    SQL Server数据库设计表和字段的经验
    AMP产品识别
    水晶头AMP识别
    双绞线的规范和制作经验谈
    VB.net 产生随机验证码
    手把手教您架设Windows2003共享服务器
    使用EasyRecovery Pro 6.04恢复RAW格式硬盘的数据实战
    .NET中各种数据库连接大全
  • 原文地址:https://www.cnblogs.com/foxmailed/p/4137949.html
Copyright © 2011-2022 走看看