zoukankan      html  css  js  c++  java
  • C#.net多线程并行处理实例:处理文件夹及嵌套文件夹下所有文件内容字符串

     需求 

    某系统的逻辑数据库表结构以文件形式保存,为了分析该数据库及表结构,需对各表文件进行处理,保存到指定表中以供分析。

    文件结构下图所示。包含有1k个文件夹,末级文件单位大概4w多个。需要读取每个文件内容,将字段和表名存储到oracle库表。并且监视各文件夹,若有更新,则及时将变更同步到表中。

      分析  

    c#的文件处理比较容易实现,选择c#语言做这个工具。思路是:选择文件目录,然后根据用户指定的扫描频率进行文件更新操作监控。如有新文件,则执行读取、分析、写表操作。

      遇到的问题  

    1、文件数量越来越多,第一次将4w多文件执行读取时,就用的foreach文件循环,执行了1个多小时,也没有执行完,在同事提醒下,使用多线程。

    2、在查了资料后,选择使用“AsParallel”功能来实现并行,并设置WithDegreeOfParallelism进程数来调整并行进程数量(并不是越多越好,要根据运行机器cpu情况)。

    3、由于文件夹数量众多,各文件夹下文件也多少不一,选择在文件夹扫描 和 文件处理 两个操作上都使用并行处理。

    4、单个文件读取时,最初使用按行读取,比较耗时,然后想到一个文件所有行读取到一个数组,数组元素依然可以并行处理。

    5、起初按照一个文件行做处理,将表面+字段名抽取,执行一个sql语句写表。由于连接数据库数量过多,效率很低,改为每个文件各行sql语句连接起来,执行一次数据库写入操作。

      代码实现  

            窗体初始化时,加入代码:
    
    Control.CheckForIllegalCrossThreadCalls = false;// 允许跨进程访问控件
    
    第一步:选择文件夹:
    
    FolderBrowserDialog dilog = new FolderBrowserDialog();
            dilog.Description = "请选择文件夹";
    if (dilog.ShowDialog() == DialogResult.OK || dilog.ShowDialog() == DialogResult.Yes)
    {
    textBox1.Text = dilog.SelectedPath; // 记录操作文件夹路径
    }
    
        第二步:点击监控按钮,选择有最新修改的文件进行处理:
    
    if (pTimer != null)
    {// 多次点击监控时,先停止之前开启的监听操作;
    pTimer.Stop();
    pTimer = null;
    }
    path = textBox1.Text.Trim();
    if (string.IsNullOrEmpty(path))
    {
        label2.Text = textBox2.Text = "路径非法!";
        return;
    }
    // 启动时,获取上次时间
    try
    {
        string filepath = path + "//last.txt"; //要上传的文件夹的路径
        if (!File.Exists(filepath)) //不存在文件,创建
        {
            FileStream fs = File.Create(filepath); //创建
            fs.Close();
            //记录最终时间
            lastRead = DateTime.Parse("2021-01-01 11:03:24");
            //更新最后操作时间
            writeFile();
        }
    
        StreamReader sr = new StreamReader(path + "//last.txt", Encoding.GetEncoding("GB2312"));
    
        strLine = sr.ReadLine();
        if (strLine != null)
        {
            Console.WriteLine(strLine);
            lastRead = DateTime.Parse(strLine);
            label2.Text = textBox2.Text = "上次获取时间节点为:" + lastRead.ToString("yyyy-MM-dd HH:mm:ss");
        }
        sr.Dispose();
        sr.Close();
    }
    catch (Exception ex)
    {
        label2.Text = "获取最后修改日期 处理错误" + ex.Message;
        lastRead = DateTime.Now;
    }
    checkFiles(null, null);
    //定时执行
    pTimer = new System.Timers.Timer(20000);//每隔20秒执行一次,没用winfrom自带的
    pTimer.Elapsed += checkFiles;//委托,要执行的方法
    pTimer.AutoReset = true;//获取该定时器自动执行
    pTimer.Enabled = true;//这个一定要写,要不然定时器不会执行的
    第三步:层级访问文件夹函数:
    
    private void checkFiles(object sender, System.Timers.ElapsedEventArgs e)
    {
    
        // 引入执行标识,如果在执行状态下,则退出
        if (bz)
            return;
    
        // 设置标识状态和文件数量为初始值;
        bz = true;
        filesCount = 0;
    
        // 记录执行时间,用于对比是否提高执行效率
    
        Stopwatch watch = new Stopwatch();
    
        watch.Start();
    
        // 获取文件夹及子文件夹 的文件,识别表名、字段名
        getFolders(path);
    
        // 停止记录执行时间
    
        watch.Stop();
        Console.WriteLine("总共开销{0}分钟。", watch.ElapsedMilliseconds / (1000 * 60));
    
    
        //记录最终时间
        lastRead = DateTime.Now;
        label2.Text = "该时间段共有 " + filesCount.ToString() + " 个文件:" + lastRead.ToString("yyyy-MM-dd HH:mm:ss") +
        ",耗时总共 " + (watch.ElapsedMilliseconds / (1000 * 60)).ToString() + " 分钟。";
        //更新最后操作时间
        writeFile();
    
        //恢复标识,让轮询任务继续执行
        bz = false;
    
    }
    
    // 展开文件夹及子目录文件夹
    private void getFolders(string rootPath)
    {
        if (textBox2.Lines.Length > 2000) textBox2.Text = "";// 判断日志长度
    
        getNewFiles(rootPath); // 处理该文件夹下文件并 记录文件数量;
    
        DirectoryInfo root = new DirectoryInfo(rootPath);
    
        // 文件夹并行处理
        root.GetDirectories().AsParallel().WithDegreeOfParallelism(2).ForAll(d => getFolders(d.FullName));
    
        // 串行处理
        //foreach (DirectoryInfo d in root.GetDirectories())
        //{
        // getFolders(d.FullName);
        //}
    
    }
    
    // 读取文件
    
    private void getNewFiles(string folderPath)
    {
        DirectoryInfo root = new DirectoryInfo(folderPath);
    
        // 文件并行处理
        root.GetFiles().AsParallel().WithDegreeOfParallelism(4).ForAll(f => readFile(f, root.Name));
    
    }
    
    // 单个文件处理函数
    
    private void readFile(FileInfo file, string folderName)
    {
        // 判断规则
        if (file.Name.Contains(".") || file.LastWriteTime < lastRead)
            return;
    
        string strLine = "", tname = "", sname = "";
    
        filesCount++;
        textBox2.Text += folderName + " " + file.FullName + "
    ";
        //读取该文件所有行到数组;
        string[] flines = System.IO.File.ReadAllLines(file.FullName);
        //
        var matchLines = (from line in flines.AsParallel()
                          where (line.IndexOf(":NAME:") >= 0 && line.IndexOf('.') > 0)
                          select line);
        StringBuilder sb = new StringBuilder();
    
        if (matchLines.Count<string>() > 0)
        {
            sb.AppendLine("BEGIN ");
            foreach (string line in matchLines)
            {
                strLine = line.Replace(":NAME:", "");
                tname = strLine.Substring(0, strLine.IndexOf('.'));
                sname = strLine.Replace(tname + ".", "");
    
                sb.AppendLine(
                "EXECUTE IMMEDIATE 'INSERT INTO tables select ''" + tname + "'', ''" + sname + "'', ''" + folderName + "'' from dual " +
                "where not exists(select 1 from tables where tablename=''" + tname + "'' and colname=''" + sname + "'')';");
            }
            sb.AppendLine("END; ");
    
        }
    
        if (sb.Length > 0)
        {
            try
            {
                string _connStr1 = ConfigurationManager.ConnectionStrings["orcl"].ToString();
                OracleHelper.ExecuteNonQuery(_connStr1, CommandType.Text, sb.ToString());
            }
            catch (Exception ex)
            {
                label2.Text = file.FullName + " 处理错误" + ex.Message;
            }
        }
    
    }
    
    // 写文件操作
    
    private void writeFile()
    {
        //创建文件流
        FileStream myfs = new FileStream(path + "//last.txt", FileMode.Open);
        //打开方式
        //1:Create 用指定的名称创建一个新文件,如果文件已经存在则改写旧文件
        //2:CreateNew 创建一个文件,如果文件存在会发生异常,提示文件已经存在
        //3:Open 打开一个文件 指定的文件必须存在,否则会发生异常
        //4:OpenOrCreate 打开一个文件,如果文件不存在则用指定的名称新建一个文件并打开它.
        //5:Append 打开现有文件,并在文件尾部追加内容.
    
        //创建写入器
        StreamWriter mySw = new StreamWriter(myfs);//将文件流给写入器
                                                   //将录入的内容写入文件
        mySw.Write(lastRead);
        //关闭写入器
        mySw.Close();
        //关闭文件流
        myfs.Close();
    }

      优化意见  

    从最初执行40多分钟(没有执行完便关闭了),到执行20多分钟(没有执行完关掉了),到最后3分钟。并行处理起了很大的作用。还有优化的空间不吝赐教。

    参考的文件

    1、使用.NET进行多线程文件处理  http://www.voidcn.com/article/p-wntpjeyo-bte.html

    2、C#读取文件所有行到数组的方法  https://www.jb51.net/article/68885.htm

    3、C# 并行编程 之 PLINQ并行度的指定 和 ForAll的使用  https://blog.csdn.net/wangzhiyu1980/article/details/46355633

    4、线程间操作无效:从不是创建控件“textBox1”的线程访问它  https://www.cnblogs.com/xunzhiyou/p/4931506.html

    5、C# Oracle同时执行多条sql语句  http://www.voidcn.com/article/p-kwkokmkd-so.html

    by Vincent

  • 相关阅读:
    httpclient 使用问题记录:org.apache.http.HttpException: Unsupported Content-Coding:GLZip
    Gitserver端密码变更,但是本地gitconfig配置未变更账号和密码问题解决
    线程池ThreadPoolExecutor学习
    Java 网络编程
    org.apache.ibatis.binding.BindingException: Invalid bound statement Mybatis绑定错误问题解决
    Java string类
    maven3.6.2 版本 在idea 2019.2.2下遇到的问题解决记录
    python
    django-URL与视图配置
    python 的datetime模块使用
  • 原文地址:https://www.cnblogs.com/jhf57101/p/14922480.html
Copyright © 2011-2022 走看看