  • 1_使用Java文件的并发写


    将一个文件切分成若干个 文件块 然后将文件块


    下面是简单的试着实现代码,暂时 先进行记录一下:

    import java.io.FileInputStream ;
    import java.io.FileOutputStream ;
    import java.io.File;
    import java.io.IOException ;
    import java.nio.channels.FileChannel ;
    import java.nio.ByteBuffer ;
    import java.lang.Thread;
    public class paralle 
        public static void main(String[] args) throws Exception 
             int MB64 = 1024 ;
             File main_f = new File("d:\test.txt") ;
             System.out.println("success create file"+ main_f.getName()) ;
             int pieceNum = (int)(main_f.length()/MB64) ;
             System.out.println("we will divide the file into "+pieceNum+"pieces") ;
             ByteBuffer [] blocks = new ByteBuffer[pieceNum];
             File [] subFiles = new File[pieceNum] ;
             ThreadWrite [] subThread = new ThreadWrite [pieceNum] ;
             FileChannel finChannel = new FileInputStream (main_f).getChannel() ;
             for ( int i = 0 ; i < pieceNum ; i++)
                 blocks[i] = ByteBuffer.allocate(MB64);
                 finChannel.read(blocks[i]) ;
                 subThread[i] = new ThreadWrite(subFiles[i] , blocks[i] , i,main_f.getName()) ;
                 subThread[i].start() ;
    class ThreadWrite extends Thread
        File f ;
        int currentNum ;
        String name ;
        ByteBuffer buffer ;
        FileChannel foutChannel ;
        ThreadWrite(File f ,ByteBuffer buffer , int currentNum,String name)
            this.f = f ;
            this.currentNum = currentNum ;
            this.buffer = buffer ;
            this.name = name ;
       public void run()
               name = new String ( "d:\test1\"+name +"_"+currentNum+".txt") ;
               f = new File(name);
               foutChannel = new FileOutputStream (f).getChannel() ;
               buffer.clear() ;
               System.out.println("create new file :"+name) ;
           catch (IOException e )

    思路很简单, 就是 实现设置一个 MB64 的整数值 对应的就是 block的单位 大小, 然后 获取待分割 的所谓的大文件,

    然后 根据 block的大小 将大文件 分割成 n 份 , n = File.length() / MB64 ;

    接下来, 创建一个 长度 为 n 的 继承了 Thread 可以实现多线程的 类的数组,

    进行一个 n 的循环, 在循环中 为 分割的 每个File 的block 创建一个 子文件, 该文件中存放的是

    File 中的各个 分割的 block 的内容, 并且在 文件输出的时候, 是以线程并发的方式 写进 子文件中的,

    并且 子文件 的命名方式 是以 大文件+当前循环码(第几个block-1)+".txt"的生成方式,进行文件的生成的。

    这样,在进行整体文件分割的时候, 可以很容易看出来, 那个是主文件, 那个是子文件。


    package parallel_write;
    import java.io.FileInputStream ;
    import java.io.FileOutputStream ;
    import java.io.File ;
    import java.io.IOException ;
    import java.nio.channels.FileChannel ;
    import java.nio.ByteBuffer ;
    import java.lang.Thread ;
    public class Main {
        public static void main(String[] args) throws Exception
            String path = new String ("test.txt") ;
            //test.txt file in included in the project floder
            Divide_LargeFile divFile = new Divide_LargeFile(path) ;
    class Divide_LargeFile
        final static int MB64 = 1024 ;
        File main_f ;
        int blockNum ;
        ByteBuffer [] blocks ;
        File [] subFiles ;
        ThreadWrite [] subThread ;
        Divide_LargeFile ( String path ) 
            this.main_f = new File ( "test.txt" ) ;
            System.out.println("success create file "+main_f.getName()) ;
        public void divideFile ()throws Exception
            this.blockNum = (int)(main_f.length()/MB64) ;
            System.out.println("large file is divided into "+blockNum+" blocks") ;
            ByteBuffer [] blocks = new ByteBuffer[blockNum] ;
            //we can regard block as a contianer which gets size MB64 bytes everytime from large fine
            this.subFiles = new File[blockNum] ;
            // new some File objects , but system didn't allocate capacity for 
            //each File objects in the array
            this.subThread = new ThreadWrite[blockNum] ;
            //number of subThread equals to the subFile's number
            FileChannel finChannel = new FileInputStream(main_f).getChannel() ;
            for ( int i = 0 ; i < blockNum; i++)
                blocks[i]  = ByteBuffer.allocate(MB64);
                blocks[i].flip() ;
                subThread[i] = new ThreadWrite(subFiles[i] , blocks[i],i, main_f.getName()) ;
         finChannel.close() ;
    } }
    class ThreadWrite extends Thread { File f ; int currentNum ; String fileName ; ByteBuffer buffer ; FileChannel foutChannel ; ThreadWrite ( File f , ByteBuffer buffer , int curNum , String fileName) { this.f = f ; this.buffer = buffer ; this.currentNum = curNum ; this.fileName = fileName ; } public void run () { try { fileName = new String (fileName+"_"+currentNum+".txt") ; f = new File (fileName) ; f.createNewFile(); foutChannel = new FileOutputStream(f).getChannel() ; foutChannel.write(buffer) ; buffer.clear() ; System.out.println("create new file"+fileName) ; } catch (IOException e ) { e.printStackTrace(); } } }


    如何在Client 端 调用 相关的hadoop 方法 来根据 相关Path 在HDFS 端 创建一个 File,

    并一次性 为该 File 在NameNode上面 添加 相关的节点, 并且可以 通过 相关调用 在Datanode 上面

    一次性为 File 分割成 block 的数目 n 分配相应数量 的 replica。





