using System; using System.Collections.Generic; using System.Text; using System.IO; using Newtonsoft.Json; using Newtonsoft.Json.Linq; using System.Collections; using System.Data; using System.Data.SqlClient; using Common; using System.Threading; namespace JSONToDB { class Program { static object lockObj = new object(); static Settings1 sObj = new Settings1(); static DBHelper mydb = new DBHelper(sObj.Connection.ToString()); static dbhelpers dbhelpers = new dbhelpers(); //读取N个月内的数据 static int GetDateInMonth = sObj.GetDateInMonth; static void Main(string[] args) { /* 1、找到对应目录的文件, 2、解析json存到数据库 */ Json2db(); // Console.WriteLine("任意键退出!"); // Console.ReadKey(); } #region 读取某个目录的文件解析json插入数据库 /// <summary> /// 读取某个目录的文件解析json插入数据库 /// </summary> static protected void Json2db() { Console.WriteLine("========【程序已启动,正在运行】========"); string FilePath = sObj.FilePath; string FileType = "." + sObj.FileType; string FileRule = sObj.FileRule; //一、读取文件夹下待处理复合规则:SynOrder_20170101.txt的文件! //①、将文件夹下文件读取到列表中 //--时间过滤规则集合 List<string> listDataTimeRules = new List<string>(); //--符合规则的待导入文件路径集合 List<string> ListFilepaths = new List<string>(); //--状态"在处理中或已经处理的"的待导入文件路径集合 List<string> ListSuccessFilepaths = new List<string>(); //--状态"在处理中或已经处理的"的待导入文件路径集合 List<string> ListFailFilepaths = new List<string>(); //--记录导入失败记录次数 //int FailCunt = 0; for (int i = 0; i < GetDateInMonth; i++) { DateTime currentDateTime = DateTime.Now.AddMonths(-i); listDataTimeRules.Add(FileRule + currentDateTime.Year.ToString() + ((int.Parse(currentDateTime.Month.ToString())) < 10 ? "0" + currentDateTime.Month.ToString() : currentDateTime.Month.ToString()));//命名规则添加到集合 } for (int r = 0; r < listDataTimeRules.Count; r++) { foreach (string path in (IEnumerable)Directory.GetFiles(FilePath, listDataTimeRules[r] + "??" + FileType, SearchOption.TopDirectoryOnly))//循环读取文件夹下的带插入文档 { ListFilepaths.Add(path);//添加到待发送结果集合 } } int filepathsCount = ListFilepaths.Count; if (filepathsCount > 0) { #region lock lock (lockObj) { #region 目录下有文件时执行 //②、将待处理文件导入日志插入到数据库(过滤重复读取的) foreach (string inserttingFile in ListFilepaths) { string fileName = inserttingFile.Trim().Substring(inserttingFile.LastIndexOf('\') + 1, inserttingFile.Length - inserttingFile.LastIndexOf('\') - 1); string[] OptimeArr = inserttingFile.Split('_'); string temp = OptimeArr[OptimeArr.Length - 1].ToString();//SynOrder //将文件名格式为:20160718转化成时间格式 string optime = DateTime.ParseExact(temp.Substring(0, temp.Length - 4), "yyyyMMdd", new System.Globalization.CultureInfo("zh-CN", true)).ToString(); //开始插入数据库 string nowStatus = GetFileStatus(optime, fileName); if (nowStatus == "NULL")//为空则添加新导入日志记录 { InsertFileLog(optime, fileName, inserttingFile, "读取中", "读取中"); //符合条件的新待查到如文件(之前未处理的) ListSuccessFilepaths.Add(inserttingFile); } //以下为遗留处理失败的导入文件 else if (nowStatus == "导入成功" || nowStatus == "导入中") { //记录已经处理的文件 ListFailFilepaths.Add(inserttingFile); } else//“读取中”和“导入失败” 及其他 { ListSuccessFilepaths.Add(inserttingFile); } } //过滤后是否还有待导入的文件 if (ListSuccessFilepaths.Count > 0) { // 当前文件序号 int no = 0; int failno = 0; int successno = 0; if (ListFailFilepaths.Count > 0) { Console.WriteLine("已处理文件不需再次处理:"); foreach (string failitem in ListFailFilepaths) { ++failno; Console.WriteLine(failno + "、" + failitem.Substring(failitem.LastIndexOf('\') + 1, failitem.Length - failitem.LastIndexOf('\') - 1)); } } Console.WriteLine("检察到新待导入文件共:" + ListSuccessFilepaths.Count + "个:"); foreach (string failitem in ListSuccessFilepaths) { ++successno; Console.WriteLine(successno + "、" + failitem.Substring(failitem.LastIndexOf('\') + 1, failitem.Length - failitem.LastIndexOf('\') - 1)); } //二、导入到数据库!开始处理数据 Console.WriteLine("开始处理文件!"); foreach (string inserttingFile in ListSuccessFilepaths) { ++no; string fileName = inserttingFile.Trim().Substring(inserttingFile.LastIndexOf('\') + 1, inserttingFile.Length - inserttingFile.LastIndexOf('\') - 1); string[] OptimeArr = inserttingFile.Split('_'); string temp = OptimeArr[OptimeArr.Length - 1].ToString();//SynOrder //将中文20160718格式转化成时间格式 string optime = DateTime.ParseExact(temp.Substring(0, temp.Length - 4), "yyyyMMdd", new System.Globalization.CultureInfo("zh-CN", true)).ToString(); //处理方法++++++++++++ ReadingFileToDB(optime, fileName, inserttingFile.Trim(), no); } } else { Console.WriteLine("未检察到新待导入文件,已处理文件不需再次处理:"); foreach (string failitem in ListFailFilepaths) { Console.WriteLine(failitem.Substring(failitem.LastIndexOf('\') + 1, failitem.Length - failitem.LastIndexOf('\') - 1)); } } #endregion //} #endregion } #endregion } if (filepathsCount == 0) { Console.WriteLine("当前配置的“" + FilePath + "”目录下,未找到符合规则:“" + FileRule + "20170101" + FileType + "”格式文件。"); Thread.Sleep(60 * 1000); } //Console.WriteLine("========【操作已结束,正在退出】========"); Console.WriteLine("========【本轮操作已结束,检察导入文件夹下是否有更新待导入文件】========"); Thread.Sleep(60 * 1000); //Console.ReadKey(); } /// <summary> /// 读取并处理文件 /// </summary> /// <param name="optime">时间戳</param> /// <param name="fileName">当前文件的名称</param> /// <param name="curPath">当前文件的路径</param> /// <param name="no">当前目录集合的index</param> private static void ReadingFileToDB(string optime, string fileName, string curPath, int no) { // 当前行 String line; //二级数据列数最大值 int MaxInsertItem = sObj.MaxCount;//服务个数最大值 string Status = "导入失败"; string Remark = ""; string tempid = string.Empty; StreamReader srHeader = new StreamReader(curPath, Encoding.UTF8); using (DataTable dt = new DataTable()) { try { if (UpdateFileLogStatus(optime, fileName, "导入中", Remark)) { Console.WriteLine(no + "、" + fileName + "文件解析中..."); } #region 1、遍历第一行 取出表列头 // 1、遍历第一行 取出表列头 while ((line = srHeader.ReadLine()) != null && line.ToString().Length != 0) { dt.Columns.Add("OPTime"); JObject objHeader = (JObject)JsonConvert.DeserializeObject(line); foreach (KeyValuePair<string, JToken> item in objHeader) { Newtonsoft.Json.Linq.JArray nj; if (item.Value.GetType().ToString() == "Newtonsoft.Json.Linq.JArray") { nj = (Newtonsoft.Json.Linq.JArray)item.Value; int njCount = nj.Count; //处理复合数据的二级表列头 #region 处理复合数据的二级表列头 最大值:MaxInsertItem if (item.Key == "orderServices") { for (int iii = 0; iii < MaxInsertItem; iii++) { dt.Columns.Add(item.Key + "_acceptanceTime" + iii); dt.Columns.Add(item.Key + "_bossRecDate" + iii); dt.Columns.Add(item.Key + "_serviceCode" + iii); dt.Columns.Add(item.Key + "_serviceName" + iii); dt.Columns.Add(item.Key + "_serviceWayToHandle" + iii); dt.Columns.Add(item.Key + "_operatorAccount" + iii); dt.Columns.Add(item.Key + "_status" + iii); dt.Columns.Add(item.Key + "_bossOrderNo" + iii); dt.Columns.Add(item.Key + "_orderServiceResCode" + iii); dt.Columns.Add(item.Key + "_orderServiceResName" + iii); dt.Columns.Add(item.Key + "_resItem" + iii); dt.Columns.Add(item.Key + "_bizNumber" + iii); } } if (item.Key == "orderAttributes") { for (int iii = 0; iii < MaxInsertItem; iii++) { dt.Columns.Add(item.Key + "_attributeValue" + iii); dt.Columns.Add(item.Key + "_attributeName" + iii); } } #endregion } else { // 二级子节点为单个值添加以及表头 dt.Columns.Add(item.Key); } } if (dt.Columns.Count > 1) { srHeader.Close(); srHeader.Dispose(); break; } } #endregion #region 2、解析json数据加载到dt当中 //定义一个解析json数据源开始时间 DateTime Analysis_StartTime = DateTime.Now; // 2、解析json数据加载到dt当中 using (StreamReader sr = new StreamReader(curPath, Encoding.UTF8)) { while ((line = sr.ReadLine()) != null && line.ToString().Length != 0) { int j = 0; j++; DataRow dr = dt.NewRow(); dr["OPTime"] = optime; JObject objBody = (JObject)JsonConvert.DeserializeObject(line); foreach (KeyValuePair<string, JToken> item in objBody) { Newtonsoft.Json.Linq.JArray nj; if (item.Value.GetType().ToString() == "Newtonsoft.Json.Linq.JArray") { #region 复合节点处理 //复合节点 nj = (Newtonsoft.Json.Linq.JArray)item.Value; int njCount = nj.Count; if (njCount > 0)//是否有二级数据 { if (njCount >= MaxInsertItem) { njCount = MaxInsertItem; } //二级节点数据少于最大值,数据用Null补齐数据 else { if (item.Key == "orderServices") { for (int iii = 0; iii < MaxInsertItem - njCount; iii++) { dr[item.Key + "_acceptanceTime" + (njCount + iii)] = null; dr[item.Key + "_bossRecDate" + (njCount + iii)] = null; dr[item.Key + "_serviceCode" + (njCount + iii)] = null; dr[item.Key + "_serviceName" + (njCount + iii)] = null; dr[item.Key + "_serviceWayToHandle" + (njCount + iii)] = null; dr[item.Key + "_operatorAccount" + (njCount + iii)] = null; dr[item.Key + "_status" + (njCount + iii)] = null; dr[item.Key + "_bossOrderNo" + (njCount + iii)] = null; dr[item.Key + "_orderServiceResCode" + (njCount + iii)] = null; dr[item.Key + "_orderServiceResName" + (njCount + iii)] = null; dr[item.Key + "_resItem" + (njCount + iii)] = null; dr[item.Key + "_bizNumber" + (njCount + iii)] = null; } } if (item.Key == "orderAttributes") { for (int iii = 0; iii < MaxInsertItem - njCount; iii++) { dr[item.Key + "_attributeValue" + (njCount + iii)] = null; dr[item.Key + "_attributeName" + (njCount + iii)] = null; } } } //将数据添加到内存datetable for (int ii = 0; ii < njCount; ii++) { //当子节点仍然为json对象时候 JObject objBody2 = (JObject)JsonConvert.DeserializeObject(nj[ii].ToString()); foreach (KeyValuePair<string, JToken> item2 in objBody2) { string tempItem2value = item2.Value.ToString().TrimStart('"').TrimEnd('"'); if (item2.Key == "acceptanceTime" && tempItem2value.IndexOf('.') > -1) { //特殊处理1-1、特殊处理字段acceptanceTime 时间格式不正确 tempItem2value = tempItem2value.Substring(0, tempItem2value.IndexOf('.')).Trim(); } if (item2.Key == "acceptanceTime" && tempItem2value.ToString().Length == 0) { //特殊处理1-2、特殊处理字段acceptanceTime 有些为空字符 tempItem2value = null; } if (item2.Key == "bossRecDate" && tempItem2value.ToString().Length == 0 || item2.Key == "bossRecDate" && tempItem2value.ToString() == "null") { //特殊处理2、字段bossRecDate 有些为空字符 tempItem2value = null; } dr[item.Key + "_" + item2.Key + ii] = tempItem2value; } } } #region 为空时特殊处理orderAttributes这个字段有些时候是null // 5、为空时特殊处理orderAttributes这个字段有些时候是null else if (item.Key == "orderAttributes") { for (int ii = 0; ii < MaxInsertItem; ii++) { dr[item.Key + "_attributeValue" + ii] = null; dr[item.Key + "_attributeName" + ii] = null; } } else if (item.Key == "orderServices") { for (int ii = 0; ii < MaxInsertItem; ii++) { dr[item.Key + "_acceptanceTime" + ii] = null; dr[item.Key + "_bossRecDate" + ii] = null; dr[item.Key + "_serviceCode" + ii] = null; dr[item.Key + "_serviceName" + ii] = null; dr[item.Key + "_serviceWayToHandle" + ii] = null; dr[item.Key + "_operatorAccount" + ii] = null; dr[item.Key + "_status" + ii] = null; dr[item.Key + "_bossOrderNo" + ii] = null; dr[item.Key + "_orderServiceResCode" + ii] = null; dr[item.Key + "_orderServiceResName" + ii] = null; dr[item.Key + "_resItem" + ii] = null; dr[item.Key + "_bizNumber" + ii] = null; } } #endregion #endregion } else { // 单节点处理 #region 单节点处理 string tempItemValue = item.Value.ToString().TrimStart('"').TrimEnd('"').Trim(); if (item.Key == "orderTime" && tempItemValue.Length == 0) { //特殊处理3、字段orderTime 有些为空字符 tempItemValue = null; } if (item.Key == "orderCompleteTime" && tempItemValue.Length == 0) { //特殊处理4、字段orderCompleteTime 有些为空字符 tempItemValue = null; } dr[item.Key] = tempItemValue; #endregion // debug 提取错误定位 //if (item.Key == "orderPhone") //{ // tempid = item.Value.ToString(); //} } } dt.Rows.Add(dr); //Console.WriteLine("执行到:" + j + " 时间:" + dr["orderServices_acceptanceTime"].ToString() + "|" + dr["orderServices_bossRecDate"]); } Console.WriteLine(no + "、" + fileName + "解析完毕!"); // 解析json文件耗时 TimeSpan Analysis_ts = DateTime.Now - Analysis_StartTime; Console.WriteLine(no + "、" + fileName + "文件,数据共:" + dt.Rows.Count + "条,解析完成,耗时:" + Analysis_ts + " ,开始执行数据插入操作..."); Console.WriteLine("文件导入中..."); // 实例化一个SqlBulkCopy对象 SqlBulkCopy bulkCopy = new SqlBulkCopy(sObj.Connection); // 设置超时时间 bulkCopy.BulkCopyTimeout = 900; bulkCopy.BatchSize = dt.Rows.Count; // 指定表名 bulkCopy.DestinationTableName = "Mid_SynOrder"; // 将table和数据库表的字段名对应 for (int k = 0; k < dt.Columns.Count; k++) { bulkCopy.ColumnMappings.Add(dt.Columns[k].ToString(), dt.Columns[k].ToString()); } //定义一个执行sql开始时间 DateTime Query_StartTime = DateTime.Now; // 执行写入操作 bulkCopy.WriteToServer(dt); // 执行sql批量插入耗时 TimeSpan Query_ts = DateTime.Now - Query_StartTime; Console.WriteLine(no + "、" + fileName + "文件数据共:" + dt.Rows.Count + " 条,插入完成,共耗时:" + Query_ts + "。"); Remark = "未出现异常,当前数据已成功插入数据库" + "完成当前时间" + DateTime.Now.ToLocalTime(); if (UpdateFileLogStatus(optime, fileName, "导入成功", Remark)) { Console.WriteLine(no + "、" + fileName + "导入成功!"); } } #endregion } catch (Exception e) { string Error = e.Message.ToString(); Status = "导入失败"; Remark = "出现异常,异常详情:" + Error + "当前时间" + DateTime.Now.ToLocalTime(); Console.WriteLine("异常:" + Error + "orderphone" + tempid);//+ "orderphone" + tempid if (UpdateFileLogStatus(optime, fileName, Status, Remark)) { Console.WriteLine("文件导入导入失败!!!"); } } } } #region 获取带处理的文件的状态 /// <summary> /// 获取带处理的文件的状态 /// </summary> /// <param name="optime"></param> /// <returns></returns> static protected string GetFileStatus(string OPTime, string FileName) { string FileStatus = string.Empty; if (OPTime != "" && FileName != "") { string SqlStr = " SELECT status FROM dbo.Mid_SynOrder_Log WHERE OPTime='" + OPTime + "' and FileName='" + FileName + "'"; DataTable dt = mydb.RunTable(SqlStr); if (dt.Rows.Count > 0) { FileStatus = dt.Rows[0]["status"].ToString(); } else { FileStatus = "NULL"; } } return FileStatus; } #endregion #region 插入导入文件的记录 /// <summary> /// 插入导入文件的记录 /// </summary> /// <param name="OPTime"></param> /// <param name="FileName"></param> /// <param name="FilePath"></param> /// <param name="Status"></param> /// <param name="Remark"></param> /// <returns></returns> static protected bool InsertFileLog(string OPTime, string FileName, string FilePath, string Status, string Remark) { string sqlStr = " INSERT INTO dbo.Mid_SynOrder_Log(OPTime, FileName, FilePath, Status, Remark) VALUES ('" + OPTime + "','" + FileName + "','" + FilePath + "','" + Status + "','" + Remark + "') "; bool b = dbhelpers.OperaBoolDB(sqlStr); return b; } #endregion #region 修改导入文件的记录的状态 /// <summary> /// 修改导入文件的记录的状态 /// </summary> /// <param name="OPTime"></param> /// <param name="FileName"></param> /// <param name="Status"></param> /// <returns></returns> static protected bool UpdateFileLogStatus(string OPTime, string FileName, string Status) { string sqlStr = " update dbo.Mid_SynOrder_Log set Status ='" + Status + "' where OPTime='" + OPTime + "' and FileName='" + FileName + "' "; bool b = dbhelpers.OperaBoolDB(sqlStr); return b; } /// <summary> /// 更新状态并添加备注信息 /// </summary> /// <param name="OPTime"></param> /// <param name="FileName"></param> /// <param name="Status"></param> /// <param name="Remark"></param> /// <returns></returns> static protected bool UpdateFileLogStatus(string OPTime, string FileName, string Status, string Remark) { string sqlStr = " update dbo.Mid_SynOrder_Log set Status ='" + Status + "',Remark ='" + Remark + "' where OPTime='" + OPTime + "' and FileName='" + FileName + "' "; bool b = dbhelpers.OperaBoolDB(sqlStr); return b; } #endregion } }