zoukankan      html  css  js  c++  java
  • 【并发】4、文件锁,多线程队列拷贝文件,并读取的操作

      最近有这样一个功能点,我实现了一个多线程的队列,生产线程ftp获取文件,然后扫描指定目录,获取文件基础信息,然后put进入队列,然后消费者通过这个信息解析文件进行入库,因为文件会比较多,所以到时候会起多个消费线程去解析数据,并且考虑到复用,不用的文件,消费消除要分化操作(策略模式),但是遇到一个问题,就是生产线程可能重复扫描到一个还未解析完成的文件

      1。改进,新建一个consum目录,扫描到一个文件,就移动到consum目录,这样ftp操作就无脑向生产文件夹拉文件就行

      2.还有个文件,就是如果消费线程如果解析不够快,也有可能导致文件被重复扫描到,那么为了避免,就需要扫到一个文件,就把文件移动到指定的目录下,等待消费

      3.新文件,当生产队列不够快,消费队列空闲比较多了的时候,不同的文件操作,会导致线程锁死,因为生产线程文件还未写入,消费线程就开始读取,这个时候线程卡死,造成死锁

      4.为了解决这个问题,我们再写文件的时候对文件上一个文件锁,读取文件的消费线程,等待生产线程写入完成之后再进行操作,这里有个demo来模拟这个线程锁

    package io;
    
    import org.junit.Test;
    
    import java.io.ByteArrayOutputStream;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.nio.channels.FileChannel;
    import java.nio.channels.FileLock;
    
    /**
     * @ProjectName: cutter-point
     * @Package: io
     * @ClassName: FileLockTest
     * @Author: xiaof
     * @Description: 测试多线程下的文件锁
     * @Date: 2019/3/7 9:34
     * @Version: 1.0
     */
    public class FileLockTest {
    
        //第一个线程不断吧文件从文件A拷贝到B,然后从B拷贝到A
        //第二个线程不断遍历B文件夹,并遍历B文件夹的数据,输出到控制台
        private final static String baseDir1 = "C:\Users\xiaof\Desktop\1\dir-t1";
        private final static String baseDir2 = "C:\Users\xiaof\Desktop\1\dir-t2";
    
        @Test
        public void test1() {
    //        String fileName = "多线程队列.png";
            FileOutputStream fileOutputStream = null;
            FileInputStream fileInputStream = null;
            //反复吧文件拷贝到两个目录中
            try {
                while(true) {
                    System.out.println(Thread.currentThread() + "-休眠1s开始新一轮拷贝");
                    Thread.sleep(1000);
                    //判断文件目录是否有文件
                    File dir1 = new File(baseDir1);
                    String souDir = "";
                    String desDir = "";
                    if(dir1.listFiles().length > 0) {
                        souDir = baseDir1;
                        desDir = baseDir2;
                    } else {
                        //吧文件从2拷贝到1
                        souDir = baseDir2;
                        desDir = baseDir1;
                    }
    
                    //吧文件从1拷贝到2
                    //1.遍历所有文件,依次上锁
                    File sourFileDir = new File(souDir);
                    for(int i = 0; i < sourFileDir.listFiles().length; ++i) {
                        File tmpFile = sourFileDir.listFiles()[i];
                        //输出
                        fileInputStream = new FileInputStream(tmpFile);
                        fileOutputStream = new FileOutputStream(desDir + "/" + tmpFile.getName());
                        //对写的文件进行上锁,读的文件不需要上锁
                        FileChannel fileChannel = fileOutputStream.getChannel();
                        FileLock fileLock = null;
                        //操作之前判断是否已经被锁住
                        while(true) {
                            fileLock = fileChannel.tryLock();
                            if(fileLock != null) {
                                //已上锁
                                break;
                            } else {
                                System.out.println(Thread.currentThread() + "-文件已被锁定,休眠1s");
                                Thread.sleep(1000);
                            }
                        }
    
                        //拷贝数据
                        byte buf[] = new byte[1024];
                        int len = 0;
                        while((len = fileInputStream.read(buf)) != -1) {
                            fileOutputStream.write(buf, 0, len);
                        }
    
                        fileOutputStream.flush();
    
                        //最后删除源文件
                        fileLock.release();//解锁
                        fileChannel.close();
                        fileOutputStream.close();
                        fileInputStream.close();
                        fileInputStream = null;
                        fileOutputStream = null;
    
                        if(tmpFile.delete()) {
                            System.out.println("删除源文件");
                        }
    
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
    
        }
    
        @Test
        public void test2() {
            //读取文件并输出
            FileOutputStream fileOutputStream = null;
            FileInputStream fileInputStream = null;
            try{
                //判断文件是否存在
                while (true) {
                    Thread.sleep(1000);
    
                    File dir1 = new File(baseDir1);
                    String souDir = "";
                    String desDir = "";
    
                    souDir = baseDir1;
                    desDir = baseDir2;
    
    //                if(dir1.listFiles().length > 0) {
    //                    souDir = baseDir1;
    //                    desDir = baseDir2;
    //                } else {
    //                    //吧文件从2拷贝到1
    //                    souDir = baseDir2;
    //                    desDir = baseDir1;
    //                }
    
                    File sourFileDir = new File(souDir);
                    for(int i = 0; i < sourFileDir.listFiles().length; ++i) {
                        File tmpFile = sourFileDir.listFiles()[i];
                        //输出
                        fileInputStream = new FileInputStream(tmpFile);
    
                        //拷贝数据
                        byte buf[] = new byte[1024];
                        int len = 0;
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                        while((len = fileInputStream.read(buf)) != -1) {
                            byteArrayOutputStream.write(buf, 0, len);
                        }
    
                        System.out.println("读取目录:" + souDir);
                        System.out.println(byteArrayOutputStream.toString());
                        //最后删除源文件
                        fileInputStream.close();
                        fileInputStream = null;
    
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
    }

     

  • 相关阅读:
    「日常训练」Single-use Stones (CFR476D2D)
    「日常训练」Greedy Arkady (CFR476D2C)
    「Haskell 学习」二 类型和函数(上)
    「学习记录」《数值分析》第二章计算实习题(Python语言)
    「日常训练」Alena And The Heater (CFR466D2D)
    Dubbo 消费者
    Dubbo 暴露服务
    Rpc
    git fail to push some refs....
    Spring Cloud (6)config 客户端配置 与GitHub通信
  • 原文地址:https://www.cnblogs.com/cutter-point/p/10488077.html
Copyright © 2011-2022 走看看