需求
某系统的逻辑数据库表结构以文件形式保存,为了分析该数据库及表结构,需对各表文件进行处理,保存到指定表中以供分析。
文件结构下图所示。包含有1k个文件夹,末级文件单位大概4w多个。需要读取每个文件内容,将字段和表名存储到oracle库表。并且监视各文件夹,若有更新,则及时将变更同步到表中。
分析
c#的文件处理比较容易实现,选择c#语言做这个工具。思路是:选择文件目录,然后根据用户指定的扫描频率进行文件更新操作监控。如有新文件,则执行读取、分析、写表操作。
遇到的问题
1、文件数量越来越多,第一次将4w多文件执行读取时,就用的foreach文件循环,执行了1个多小时,也没有执行完,在同事提醒下,使用多线程。
2、在查了资料后,选择使用“AsParallel”功能来实现并行,并设置WithDegreeOfParallelism进程数来调整并行进程数量(并不是越多越好,要根据运行机器cpu情况)。
3、由于文件夹数量众多,各文件夹下文件也多少不一,选择在文件夹扫描 和 文件处理 两个操作上都使用并行处理。
4、单个文件读取时,最初使用按行读取,比较耗时,然后想到一个文件所有行读取到一个数组,数组元素依然可以并行处理。
5、起初按照一个文件行做处理,将表面+字段名抽取,执行一个sql语句写表。由于连接数据库数量过多,效率很低,改为每个文件各行sql语句连接起来,执行一次数据库写入操作。
代码实现
窗体初始化时,加入代码: Control.CheckForIllegalCrossThreadCalls = false;// 允许跨进程访问控件 第一步:选择文件夹: FolderBrowserDialog dilog = new FolderBrowserDialog(); dilog.Description = "请选择文件夹"; if (dilog.ShowDialog() == DialogResult.OK || dilog.ShowDialog() == DialogResult.Yes) { textBox1.Text = dilog.SelectedPath; // 记录操作文件夹路径 } 第二步:点击监控按钮,选择有最新修改的文件进行处理: if (pTimer != null) {// 多次点击监控时,先停止之前开启的监听操作; pTimer.Stop(); pTimer = null; } path = textBox1.Text.Trim(); if (string.IsNullOrEmpty(path)) { label2.Text = textBox2.Text = "路径非法!"; return; } // 启动时,获取上次时间 try { string filepath = path + "//last.txt"; //要上传的文件夹的路径 if (!File.Exists(filepath)) //不存在文件,创建 { FileStream fs = File.Create(filepath); //创建 fs.Close(); //记录最终时间 lastRead = DateTime.Parse("2021-01-01 11:03:24"); //更新最后操作时间 writeFile(); } StreamReader sr = new StreamReader(path + "//last.txt", Encoding.GetEncoding("GB2312")); strLine = sr.ReadLine(); if (strLine != null) { Console.WriteLine(strLine); lastRead = DateTime.Parse(strLine); label2.Text = textBox2.Text = "上次获取时间节点为:" + lastRead.ToString("yyyy-MM-dd HH:mm:ss"); } sr.Dispose(); sr.Close(); } catch (Exception ex) { label2.Text = "获取最后修改日期 处理错误" + ex.Message; lastRead = DateTime.Now; } checkFiles(null, null); //定时执行 pTimer = new System.Timers.Timer(20000);//每隔20秒执行一次,没用winfrom自带的 pTimer.Elapsed += checkFiles;//委托,要执行的方法 pTimer.AutoReset = true;//获取该定时器自动执行 pTimer.Enabled = true;//这个一定要写,要不然定时器不会执行的 第三步:层级访问文件夹函数: private void checkFiles(object sender, System.Timers.ElapsedEventArgs e) { // 引入执行标识,如果在执行状态下,则退出 if (bz) return; // 设置标识状态和文件数量为初始值; bz = true; filesCount = 0; // 记录执行时间,用于对比是否提高执行效率 Stopwatch watch = new Stopwatch(); watch.Start(); // 获取文件夹及子文件夹 的文件,识别表名、字段名 getFolders(path); // 停止记录执行时间 watch.Stop(); Console.WriteLine("总共开销{0}分钟。", watch.ElapsedMilliseconds / (1000 * 60)); //记录最终时间 lastRead = DateTime.Now; label2.Text = "该时间段共有 " + filesCount.ToString() + " 个文件:" + lastRead.ToString("yyyy-MM-dd HH:mm:ss") + ",耗时总共 " + (watch.ElapsedMilliseconds / (1000 * 60)).ToString() + " 分钟。"; //更新最后操作时间 writeFile(); //恢复标识,让轮询任务继续执行 bz = false; } // 展开文件夹及子目录文件夹 private void getFolders(string rootPath) { if (textBox2.Lines.Length > 2000) textBox2.Text = "";// 判断日志长度 getNewFiles(rootPath); // 处理该文件夹下文件并 记录文件数量; DirectoryInfo root = new DirectoryInfo(rootPath); // 文件夹并行处理 root.GetDirectories().AsParallel().WithDegreeOfParallelism(2).ForAll(d => getFolders(d.FullName)); // 串行处理 //foreach (DirectoryInfo d in root.GetDirectories()) //{ // getFolders(d.FullName); //} } // 读取文件 private void getNewFiles(string folderPath) { DirectoryInfo root = new DirectoryInfo(folderPath); // 文件并行处理 root.GetFiles().AsParallel().WithDegreeOfParallelism(4).ForAll(f => readFile(f, root.Name)); } // 单个文件处理函数 private void readFile(FileInfo file, string folderName) { // 判断规则 if (file.Name.Contains(".") || file.LastWriteTime < lastRead) return; string strLine = "", tname = "", sname = ""; filesCount++; textBox2.Text += folderName + " " + file.FullName + " "; //读取该文件所有行到数组; string[] flines = System.IO.File.ReadAllLines(file.FullName); // var matchLines = (from line in flines.AsParallel() where (line.IndexOf(":NAME:") >= 0 && line.IndexOf('.') > 0) select line); StringBuilder sb = new StringBuilder(); if (matchLines.Count<string>() > 0) { sb.AppendLine("BEGIN "); foreach (string line in matchLines) { strLine = line.Replace(":NAME:", ""); tname = strLine.Substring(0, strLine.IndexOf('.')); sname = strLine.Replace(tname + ".", ""); sb.AppendLine( "EXECUTE IMMEDIATE 'INSERT INTO tables select ''" + tname + "'', ''" + sname + "'', ''" + folderName + "'' from dual " + "where not exists(select 1 from tables where tablename=''" + tname + "'' and colname=''" + sname + "'')';"); } sb.AppendLine("END; "); } if (sb.Length > 0) { try { string _connStr1 = ConfigurationManager.ConnectionStrings["orcl"].ToString(); OracleHelper.ExecuteNonQuery(_connStr1, CommandType.Text, sb.ToString()); } catch (Exception ex) { label2.Text = file.FullName + " 处理错误" + ex.Message; } } } // 写文件操作 private void writeFile() { //创建文件流 FileStream myfs = new FileStream(path + "//last.txt", FileMode.Open); //打开方式 //1:Create 用指定的名称创建一个新文件,如果文件已经存在则改写旧文件 //2:CreateNew 创建一个文件,如果文件存在会发生异常,提示文件已经存在 //3:Open 打开一个文件 指定的文件必须存在,否则会发生异常 //4:OpenOrCreate 打开一个文件,如果文件不存在则用指定的名称新建一个文件并打开它. //5:Append 打开现有文件,并在文件尾部追加内容. //创建写入器 StreamWriter mySw = new StreamWriter(myfs);//将文件流给写入器 //将录入的内容写入文件 mySw.Write(lastRead); //关闭写入器 mySw.Close(); //关闭文件流 myfs.Close(); }
优化意见
从最初执行40多分钟(没有执行完便关闭了),到执行20多分钟(没有执行完关掉了),到最后3分钟。并行处理起了很大的作用。还有优化的空间不吝赐教。
参考的文件
1、使用.NET进行多线程文件处理 http://www.voidcn.com/article/p-wntpjeyo-bte.html
2、C#读取文件所有行到数组的方法 https://www.jb51.net/article/68885.htm
3、C# 并行编程 之 PLINQ并行度的指定 和 ForAll的使用 https://blog.csdn.net/wangzhiyu1980/article/details/46355633
4、线程间操作无效:从不是创建控件“textBox1”的线程访问它 https://www.cnblogs.com/xunzhiyou/p/4931506.html
5、C# Oracle同时执行多条sql语句 http://www.voidcn.com/article/p-kwkokmkd-so.html
by Vincent