zoukankan      html  css  js  c++  java
  • hadoop_并行写操作思路_2

    如果想实现将 Client端的 File并行写入到 各个Datanode中,

    首先, 应该修改的是,DistributedFileSystem中的create方法,

    在create 内部调用FSNamesystem中的方法的时候,

    应该增加向NameNode发送,上传文件的大小所需要的blocks的数目。

     

    然后,调用分配块的相关方法, 在NameNode中 所存放的系统树中添加相关的节点后( INodeFile)

    还要为该INodeFile中的blocks 表分配block实体, 且 INodeFile.blocks.lenght = (File)/block

    接下来,保留client 与 datanodes 之间的packet 数据传输单位的 方法,不对其进行修改。

     

    文件总共分为 n 份, n份并发写入到 Datanode中的block中。

    每一份开启一个线程 ( 线程通过 创建一个DataStreamer实例来创建 )

    以DataStreamer 所谓线程的并发单位。

     

    以pipeline中的向一个datanode的block 进行写操作为例,

    其过程中有两个线程, 这两个线程分别对应这两个流: output input

    input 是用来接收来自上游节点的数据流

    output是用来将本节点接收到的写往block中的data 写入到下游 datanode的block中。

     

    下图是实现 并发写的算法图示:

     ================实现并发写的算法伪代码如下=========================

    ------------create INodeFile at NameNode for File--------------------
    
    create ( Path fileName, int blockSize ... )
    {
        INodeFile inodeFile = new INodeFile ( fileName ) ;
    //在原始代码中, 在 create 阶段仅仅实例化 一个inodeFile (UnderConstruction)
    //并将 该 fileName 对应的 inodeFile 加入到系统目录树中
    //但却并未 给 inodeFile.blocks[] 数组中的 block 分配实体
    
    
    //想要实现的算法是,根据client端发来的blockSize 为文件对应的inodeFile.blocks
    //分配块实例
    
       NameNode.FSDirectory.rootDir.addInode ( inodeFile ) ;
    //rootDir is a INode list , stores file system's structure
    
        for  i -> ( 0 , blockSize-1 )
           inodeFile.Blocks[i] = FSNamesystem.allocateBlock () ;
    }
    
    //allocates all the blocks for whole File
    //at client we can get the map relationship
    //by DistributedFileSystem.NameNode.FSNamesystem.FSDirectory.
    //rootDir[rootDir.length-1]
    //to get the INodeFile and by INodeFile we can get blocks table
    //to write to which block in which datanode

     --------------------write to block at client----------------------------

    n = sizeof (File) / BlockSize 
    
    File File_part[n]
    
    DataStreamer dataStreamers[n] 
    
    for  i-> ( 0 , n-1 )
    {
       File_part[i] = File [i*BlockSize, (i+1)*BlockSize-1 ]
    }
    
    for  i-> ( 0, n-1 )
    {
       dataStreamers[i] = new DataStreamer( File_part[i] , i..) 
       dataStreamers[i].run ()    
    }
    
    //add a new DataStreamer constructor into src
    
    DataStreamer ( File filePart , int number )
    {
        File f = new File ( filePart ) ;
        this.number = number 
    // add a member variable to remember 
    //which block in block list should this thread write to 
    }
    
    
    
    
    DataStreamer.run ()
    {
        (File) f -> packetList [...]
    //decompose file into packets
      
    
    //create connection to datanode by socket
    //we are going to create a write outputstream 
    
      //put packet into dataQueue
    //get first packet from dataQueue
    
    //package the packet into outputstream
    
    //and do not forget add the ID (number ) which means which block
    //should the packet stream write to 
    
    
    //put the current packet to ackQueue
    //receive reponse message from datanode
    //receive success , remove the packet from ackQueue
    
    //shutdown connection 
    
    }
    
    //send packets to datanode on by on 
    }

     大体上的思路是这样的,其中还没有考虑清楚的地方就是,如何才能在并发写的时候,可以将一个文件的写向的多个块的状态

    强制转换为 rbw (datanode上的 replica ) 在namenode 上是Underconstruction状态。

     

    因为在前两篇文章中,我们可以知道,在对一个文件进行写操作的时候,只有文件对应的INodeFile的

    blocks 的 最后一个block元素 才是可以可以写的,也是出于rbw状态的,那么在并发写的时候,如何保证并发写入的块同时都是出于这个状态

    还有就是 提交单位从 最后一个块写完 标志着整个文件的 写完 实现  将会被 改变成  并发写的 所有块都写完才标志着 整个文件的成功提交 ?

    这些问题暂时还需要考虑。╮(╯_╰)╭

    ===========1_8========================

    1. 通过FileSystem 创建的实例  create 一个 新的File

    2.通过创建一个 FSImage 获得 FSNamesystem , FSDirectory

    3.通过 FSNamesystem.dir.rootDir 获得 存放 最新创建 文件的inode, 然后 将INode强制转换为 INodeFile,

    通过INodeFile类中的 appendBlocks 一次性 为其分配 指定个数的 blocks。

    4.每一个 block 开启一个 对应的OutputStream 的流,通过多线程的调用 向流中写入将 大文件分割好的小文件 。//IOUtils

  • 相关阅读:
    迭代模型
    螺旋模型
    瀑布模型
    V模型
    codeforces411div.2
    专题1:记忆化搜索/DAG问题/基础动态规划
    Python
    字符串的相关操作方法
    Python基本数据类型
    编码
  • 原文地址:https://www.cnblogs.com/inuyasha1027/p/hadoop_parallel4.html
Copyright © 2011-2022 走看看