zoukankan      html  css  js  c++  java
  • 【并发】4、文件锁,多线程队列拷贝文件,并读取的操作

      最近有这样一个功能点,我实现了一个多线程的队列,生产线程ftp获取文件,然后扫描指定目录,获取文件基础信息,然后put进入队列,然后消费者通过这个信息解析文件进行入库,因为文件会比较多,所以到时候会起多个消费线程去解析数据,并且考虑到复用,不用的文件,消费消除要分化操作(策略模式),但是遇到一个问题,就是生产线程可能重复扫描到一个还未解析完成的文件

      1。改进,新建一个consum目录,扫描到一个文件,就移动到consum目录,这样ftp操作就无脑向生产文件夹拉文件就行

      2.还有个文件,就是如果消费线程如果解析不够快,也有可能导致文件被重复扫描到,那么为了避免,就需要扫到一个文件,就把文件移动到指定的目录下,等待消费

      3.新文件,当生产队列不够快,消费队列空闲比较多了的时候,不同的文件操作,会导致线程锁死,因为生产线程文件还未写入,消费线程就开始读取,这个时候线程卡死,造成死锁

      4.为了解决这个问题,我们再写文件的时候对文件上一个文件锁,读取文件的消费线程,等待生产线程写入完成之后再进行操作,这里有个demo来模拟这个线程锁

    package io;
    
    import org.junit.Test;
    
    import java.io.ByteArrayOutputStream;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.nio.channels.FileChannel;
    import java.nio.channels.FileLock;
    
    /**
     * @ProjectName: cutter-point
     * @Package: io
     * @ClassName: FileLockTest
     * @Author: xiaof
     * @Description: 测试多线程下的文件锁
     * @Date: 2019/3/7 9:34
     * @Version: 1.0
     */
    public class FileLockTest {
    
        //第一个线程不断吧文件从文件A拷贝到B,然后从B拷贝到A
        //第二个线程不断遍历B文件夹,并遍历B文件夹的数据,输出到控制台
        private final static String baseDir1 = "C:\Users\xiaof\Desktop\1\dir-t1";
        private final static String baseDir2 = "C:\Users\xiaof\Desktop\1\dir-t2";
    
        @Test
        public void test1() {
    //        String fileName = "多线程队列.png";
            FileOutputStream fileOutputStream = null;
            FileInputStream fileInputStream = null;
            //反复吧文件拷贝到两个目录中
            try {
                while(true) {
                    System.out.println(Thread.currentThread() + "-休眠1s开始新一轮拷贝");
                    Thread.sleep(1000);
                    //判断文件目录是否有文件
                    File dir1 = new File(baseDir1);
                    String souDir = "";
                    String desDir = "";
                    if(dir1.listFiles().length > 0) {
                        souDir = baseDir1;
                        desDir = baseDir2;
                    } else {
                        //吧文件从2拷贝到1
                        souDir = baseDir2;
                        desDir = baseDir1;
                    }
    
                    //吧文件从1拷贝到2
                    //1.遍历所有文件,依次上锁
                    File sourFileDir = new File(souDir);
                    for(int i = 0; i < sourFileDir.listFiles().length; ++i) {
                        File tmpFile = sourFileDir.listFiles()[i];
                        //输出
                        fileInputStream = new FileInputStream(tmpFile);
                        fileOutputStream = new FileOutputStream(desDir + "/" + tmpFile.getName());
                        //对写的文件进行上锁,读的文件不需要上锁
                        FileChannel fileChannel = fileOutputStream.getChannel();
                        FileLock fileLock = null;
                        //操作之前判断是否已经被锁住
                        while(true) {
                            fileLock = fileChannel.tryLock();
                            if(fileLock != null) {
                                //已上锁
                                break;
                            } else {
                                System.out.println(Thread.currentThread() + "-文件已被锁定,休眠1s");
                                Thread.sleep(1000);
                            }
                        }
    
                        //拷贝数据
                        byte buf[] = new byte[1024];
                        int len = 0;
                        while((len = fileInputStream.read(buf)) != -1) {
                            fileOutputStream.write(buf, 0, len);
                        }
    
                        fileOutputStream.flush();
    
                        //最后删除源文件
                        fileLock.release();//解锁
                        fileChannel.close();
                        fileOutputStream.close();
                        fileInputStream.close();
                        fileInputStream = null;
                        fileOutputStream = null;
    
                        if(tmpFile.delete()) {
                            System.out.println("删除源文件");
                        }
    
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    
        }
    
        @Test
        public void test2() {
            //读取文件并输出
            FileOutputStream fileOutputStream = null;
            FileInputStream fileInputStream = null;
            try{
                //判断文件是否存在
                while (true) {
                    Thread.sleep(1000);
    
                    File dir1 = new File(baseDir1);
                    String souDir = "";
                    String desDir = "";
    
                    souDir = baseDir1;
                    desDir = baseDir2;
    
    //                if(dir1.listFiles().length > 0) {
    //                    souDir = baseDir1;
    //                    desDir = baseDir2;
    //                } else {
    //                    //吧文件从2拷贝到1
    //                    souDir = baseDir2;
    //                    desDir = baseDir1;
    //                }
    
                    File sourFileDir = new File(souDir);
                    for(int i = 0; i < sourFileDir.listFiles().length; ++i) {
                        File tmpFile = sourFileDir.listFiles()[i];
                        //输出
                        fileInputStream = new FileInputStream(tmpFile);
    
                        //拷贝数据
                        byte buf[] = new byte[1024];
                        int len = 0;
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                        while((len = fileInputStream.read(buf)) != -1) {
                            byteArrayOutputStream.write(buf, 0, len);
                        }
    
                        System.out.println("读取目录:" + souDir);
                        System.out.println(byteArrayOutputStream.toString());
                        //最后删除源文件
                        fileInputStream.close();
                        fileInputStream = null;
    
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    }

     

  • 相关阅读:
    Javascript一天学完系列(四)函数上下文bind &call
    Javascript一天学完系列(三)JavaScript面向对象
    Javascript一天学完系列(二)Callbacks回调函数
    Python(切片)
    水果篮子(母函数)
    判断链表是否有环
    链表部分逆置
    Python(List和Tuple类型)
    HDU1426(DFS)
    HDU4474(数位BFS)
  • 原文地址:https://www.cnblogs.com/cutter-point/p/10488077.html
Copyright © 2011-2022 走看看