zoukankan      html  css  js  c++  java
  • 实践 12:多线程读写文件

    多线程读写同一个文件分多种情况:

    • 多线程同时读同一个文件,在这种情况下并不会造成冲突

    • 多线程同时写同一个文件,会造成写数据丢失

    • 多线程同时对同一个文件进行写和读,会造成脏读

    如果要处理多线程读写文件造成的数据不一致的问题,第一个想到的就是加锁。在java.concurrent.locksReadWriteLock分别定义了乐观锁读锁和悲观锁写锁,将以上的情况都考虑到了,可以很好地处理多线程读写同一个文件的情况。但是既然加锁,必然会导致多线程在读写文件时效率较低,在不同情形下似乎有更好的解决方案:

    1. 通过ReadWriteLock为文件读写过程加锁,防止数据与预想不一致,同时降低了多线程处理的效率

    2. 如果多线程频繁写入少量数据,可创建一个类缓存需要写入的数据,并且按时批量写入数据,减少频繁操作文件及加锁操作带来的问题。

    3. 可通过RandomAccessFile规划好不同位置,多线程同时操作不同位置的写入

     

    有关RandomAccessFile,之前在JAVA篇:Java IO (三)访问文件--转换流和文件流 对RandomAccessFile进行了简单的了解。

     

    1 实践一 多线程写文件RandomAccessFile不加锁

    RandomAccessFile可选模式是rrwrws,或者rwd,写入时不会清空原数据,会在指定位置覆盖原本内容写入新内容。

     

    所以:

    1. 清空RandomAccessFile打开的文件的方法为 rw.setLength(0)

    2. 为了防止多线程间写入内容互相覆盖需要规划好写入的位置,插入的话会更加麻烦

    3. 规划好写入的位置有两种,a)如果每行写入的字数相同很简单可以计算得到足够的位置 b)预留足够的位置

    4. 预留足够的位置的结果其实会有些瑕疵,结果如下所示:

       

       

       

      
      /* 多线程写入文件指定位置 */
        class RWrite implements Runnable{
            public int pos;
            public String text;
            public String fileName;
    ​
            public RWrite(int pos,String text,String fileName){
                this.pos = pos;
                this.text = text;
                this.fileName = fileName;
    ​
            }
            @Override
            public void run(){
                try {
                    RandomAccessFile rw = new RandomAccessFile(this.fileName,"rw");
                    rw.seek(pos);
                    rw.writeBytes(text);
                    rw.close();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    ​
    ​
        public void test(){
            ExecutorService pool = Executors.newFixedThreadPool(10);
            int linwsize = 20;//必须预留足够的一行的空间,否则会导致覆盖,但是预留空间过大,也会出现空的字符
            int lines = 50;//需要写入100行
    ​
            String fileName = "a.csv";
    ​
            RandomAccessFile rw = null;
            try {
                rw = new RandomAccessFile(fileName,"rw");
                rw.setLength(0);//清空文件
                /*写入标题栏*/
                rw.writeBytes("index,text\n");
                rw.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
    ​
    ​
            for(int i=1;i<=lines;i++){
                pool.execute(new RWrite(i*linwsize,i+", text"+i+"\n",fileName));
            }
            pool.shutdown();
    ​
        }
     

    2 实践二 将写入文件交给一个线程进行

    类似于生产者消费者模式,只是只有一个消费者。

    本来只想简单地写一下的,但是因为在某处将0写成了9调试了许久。

       /* 将写入工作交给一个线程 */
        /* 加工数据的Task。生产者 */
        class DataTask implements Runnable{
            BlockingQueue<String> data_put;
            private int start;
            private int end;
    ​
            public DataTask(BlockingQueue<String> data_put,int start,int end){
                this.data_put = data_put;
                this.start = start;
                this.end = end;
            }
            @Override
            public void run(){
                /*System.out.println(String.format("%s-%s:%d-%d开始",
                        System.currentTimeMillis(),
                        Thread.currentThread().getName(),
                        this.start,this.end));*/
                for(int i=start;i<=end;i++){
                    
                    try {
                        /* 加工数据需要时间,随机 */
                        Thread.sleep(10+new Random().nextInt(20));
                        String s = String.format("%s-%s:%d-%d[%d]\n",
                                System.currentTimeMillis(),
                                Thread.currentThread().getName(),
                                this.start,this.end,i);
                        data_put.put(i+"\n");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        return;
    ​
                    }
                }
                System.out.println(String.format("%s-%s:%d-%d结束",
                        System.currentTimeMillis(),
                        Thread.currentThread().getName(),
                        this.start,this.end));
    ​
            }
        }
        /* 缓存提交的数据,并写入 */
        class WriteTask implements Runnable{
            private BlockingQueue<String> data_in = new ArrayBlockingQueue<>(10);
            private byte[] buffer = new byte[1024];
            private int th = (int)(1024*0.8);
            int length=0;
            private String fileName;
    ​
            public WriteTask(String fileName){
                this.fileName = fileName;
                try {
                    /* 清空要写入数据的文件 */
                    FileOutputStream fileOutputStream = new FileOutputStream(fileName);
                    fileOutputStream.close();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            public BlockingQueue<String> get_Queue(){
                return data_in;
            }
            private void  write(){
                if(length==0) return;
                try {
                    //System.out.println(length);
                    //System.out.println(new String(buffer));
                    System.out.println("开始写入……");
                    FileOutputStream fileOutputStream = new FileOutputStream(fileName,true);
                    fileOutputStream.write(buffer,0,length);
                    fileOutputStream.close();
                    System.out.println(length+"写入完成。");
                    length = 0;
    ​
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
    ​
    ​
            }
            private void close(){
                //System.out.println(new String(buffer));
                this.write();
               // System.out.println(length);
                //data_in = null;
            }
            @Override
            public void run(){
                while (true){
                    try {
                        byte[] tmp= data_in.take().getBytes();
                        System.arraycopy(tmp,0,buffer,length,tmp.length);
                        length = length+tmp.length;
    ​
                        if(length>=th){
                            this.write();
                        }
                    } catch (InterruptedException e) {
                        //e.printStackTrace();
                        break;
    ​
                    }
                }
            }
        }
    ​
        public void test3(){
            ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
    ​
            String fileName = "b.csv";
    ​
            WriteTask writeTask = new WriteTask(fileName);
            pool.execute(writeTask);
    ​
            int num = 20;
            int writenum = 100;
    ​
            for(int i=0;i<num;i++){
                //System.out.println(i*writenum+"---"+((i+1)*writenum-1));
                pool.execute(new DataTask(writeTask.get_Queue(),i*writenum,((i+1)*writenum-1)));
            }
            pool.shutdown();
    ​
            while (true){
                try {
                    pool.awaitTermination(500,TimeUnit.MILLISECONDS);
                    if(pool.getActiveCount()==1){
                        writeTask.close();
                        Thread.sleep(10);
                        pool.shutdownNow();
                    }
                    if(pool.getActiveCount()==0){
                        break;
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
            }
    ​
    ​
    ​
        }

    3 实践三 多线程复制文件RandomAccessFile不加锁

    使用相同的偏移量,进行读写,也能防止多线程写入时发生冲突。

    复制文件包含读写操作,好处是不必自己规划偏移量。

       /* 多线程复制文件 */
        /* 多线程写入文件指定位置 */
        class RCopy implements Runnable{
            public int pos;
            public int len;
            public String readFile;
            public String writeFlie;
    ​
            public RCopy(String readFile,String writeFlie,int pos,int len){
                this.pos = pos;
                this.len = len;
                this.readFile = readFile;
                this.writeFlie = writeFlie;
    ​
            }
            @Override
            public void run(){
                byte[] bytes = new byte[len];
                try {
                    RandomAccessFile rr = new RandomAccessFile(this.readFile,"r");
                    RandomAccessFile rw = new RandomAccessFile(this.writeFlie,"rw");
                    rr.seek(pos);
                    rw.seek(pos);
                    rr.read(bytes);
                    rw.write(bytes);
                    rr.close();
                    rw.close();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    ​
        public void test2(){
            ExecutorService pool = Executors.newFixedThreadPool(10);
    ​
    ​
            String readFile = "b.csv";
            String writeFlie = "a.csv";
    ​
            long totalLen = 0;
            int len = 1024; /* 每个task写入的大小 */try {
                //读取需要复制文件的大小
                RandomAccessFile file = new RandomAccessFile(readFile,"r");
                totalLen = file.length();
                System.out.println("length:"+totalLen);
                //清空需要写入的文件
                file = new RandomAccessFile(writeFlie,"rw");
                file.setLength(0);
    ​
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
    ​
            int tasknum = 11;
    ​
    ​
            for(int i=0;i<totalLen;i = i+len){
                int alen = len;
                if(i+len>totalLen) alen = (int)totalLen-i;
                //System.out.println(i+":"+alen);
                pool.execute(new RCopy(readFile,writeFlie,i,alen));
            }
            pool.shutdown();
    ​
        }

     

    X 参考

    当你深入了解,你就会发现世界如此广袤,而你对世界的了解则是如此浅薄,请永远保持谦卑的态度。
  • 相关阅读:
    [UVA100] The 3n + 1 problem.题解
    [SP1] TEST
    LCA【模板】
    [P1000] 超级玛丽游戏.题解
    [P3367]【模板】并查集.题解
    并查集【模板】
    洛谷 P1890 【gcd区间】
    浅谈分块算法经典问题&优化
    Floyd算法详(cha)解
    逆序对
  • 原文地址:https://www.cnblogs.com/liwxmyself/p/15645080.html
Copyright © 2011-2022 走看看