现实业务中经常遇到需要队列处理的问题。
问题场景:
客户端记录设备运行数据,传输给服务器。在传输中可能存在延迟或中断情况。
当中断时,系统传输数据可能因为无法传输或电脑重启,会导致服务器数据记录不连续。
设想的解决方案:
当客户端采集到设备数据后,将数据先插入处理队列。
另一个服务程序从队列中取出数据发送到服务器。
数据记录与数据上传做到完全独立,并且是异步操作。
数据记录在硬盘上,保障了数据的连续性和可靠性。
如果一直无法上传到服务器,会导致队列中等待处理的数据积攒过多。
现有解决方案:
使用数据库做队列(可能遇到的问题:客户端性能有限,数据库可靠性差,系统断电重启,可能导致数据库损坏等)
使用专业的消息队列(学习成本高,部署复杂,功能复杂,性能高)
自己实现简单的队列服务(基于文件系统实现,无需部署,简单可靠,性能基本满足需求)
基于文件的队列处理设计
主要用的C#函数:File.AppendAllText、StreamWriter.WriteLine、File.ReadAllText、File.WriteAllText、File.Delete
好处:
基于系统文件处理函数
队列大小依赖于磁盘大小
队列内容为一行文本,无格式要求
单文件队列
原理:
将业务数据追加到待处理文件中。
处理程序从文件头部读取业务数据,处理完成后,将头部处理过的业务数据删除。
遇到的问题:
当业务处理程序一直失败时,队列文件会随着时间的增长越来越大。导致文件读写缓慢。
因为业务数据写入与业务数据读取操作的是同一个文件,在大量并发时,存在文件锁定问题。
因为待处理文件过大,磁盘操作数据量过大,导致磁盘性能不足。
多文件队列
处理示意图
原理:
业务数据写入时,按日期生成文件写入。一天一个文件,避免单文件过大问题。
业务程序,处理时,将最早的待处理队列文件改名为正在处理文件。避免业务写入和业务处理同时操作一个文件问题。
业务程序处理完成后,记录已经处理到的文件行数,避免多次重写队列文件。
使用方法:
LsLib.FileQueue fileQueue = new LsLib.FileQueue(); // 插入待处理队列 fileQueue.Push(new string[] { "业务数据内容,字符串(中间不能有换行)" }); // 清空队列 fileQueue.Clear(); // 队列待处理数量 fileQueue.Count(); // 获取待处理数据 fileQueue.GetList(10); // 设置最早的数据处理完成 fileQueue.Pop(10);
源代码:
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Windows.Forms; namespace LsLib { /// <summary> /// 文件堆栈 /// </summary> class FileQueue { private string QueueName = "file"; private string FileQueuePre = ""; private string FileQueueRunPath = ""; private string FileQueueRunIndexPath = ""; private string FileQueueCountPath = ""; public FileQueue(string queueName = "file") { this.QueueName = queueName; this.FileQueuePre = Application.StartupPath + "\queue\" + queueName; this.FileQueueRunPath = Application.StartupPath + "\queue\" + queueName + "_run.dat"; this.FileQueueRunIndexPath = Application.StartupPath + "\queue\" + queueName + "_run_index.dat"; this.FileQueueCountPath = Application.StartupPath + "\queue\" + queueName + "_count.dat"; } /// <summary> /// 插入堆栈 /// </summary> /// <param name="str"></param> /// <returns></returns> public bool Push(string[] strList) { int tryIndex = 0; string filePath = this.FileQueuePre + "_list" + DateTime.Now.ToString("_yyyyMMdd") + ".dat"; while (tryIndex < 5) { try { using (StreamWriter sw = new StreamWriter(filePath, true)) { foreach (var str in strList) { sw.WriteLine(str); } } SetCount(strList.Length); return true; } catch (Exception ex) { tryIndex++; Thread.Sleep(100); } } return false; } // 设置队列待处理数量 private int SetCount(int i) { int count = 0; if (File.Exists(this.FileQueueCountPath)) { count = int.Parse(File.ReadAllText(this.FileQueueCountPath)); } count += i; File.WriteAllText(this.FileQueueCountPath, count.ToString()); return count; } /// <summary> /// 清空堆栈 /// </summary> /// <returns></returns> public bool Clear() { string[] fileList = Directory.GetFiles(Application.StartupPath + "\queue\", QueueName + "_*.dat"); foreach (var file in fileList) { File.Delete(file); } return true; } /// <summary> /// 堆栈待处理数量 /// </summary> /// <returns></returns> public int Count() { int count = 0; if (File.Exists(this.FileQueueCountPath)) { count = int.Parse(File.ReadAllText(this.FileQueueCountPath)); } return count; } /// <summary> /// 获取待处理列表 /// </summary> /// <param name="count"></param> /// <returns></returns> public List<string> GetList(int count = 1) { List<string> list = new List<string>(); bool isFirst = false; if (!File.Exists(this.FileQueueRunPath)) { string[] fileList = Directory.GetFiles(Application.StartupPath + "\queue\", QueueName + "_list_*.dat"); if (fileList.Length == 0) { return list; } Array.Sort(fileList); File.Move(fileList[0], this.FileQueueRunPath); isFirst = true; } int startIndex = 0; int totalCount = 0; if (File.Exists(this.FileQueueRunIndexPath)) { string strIndex = File.ReadAllText(this.FileQueueRunIndexPath); string[] arrIndex = strIndex.Split(','); startIndex = int.Parse(arrIndex[0]); totalCount = int.Parse(arrIndex[1]); } int index = 0; using (StreamReader sm = File.OpenText(this.FileQueueRunPath)) { while (true) { string str = sm.ReadLine(); if (str == null) { break; } str = str.Trim(); if (str == "") { continue; } totalCount++; if (index < startIndex) { index++; continue; } if (list.Count < count) { list.Add(str); } if (list.Count >= count && !isFirst) { break; } } } if (isFirst) { File.WriteAllText(this.FileQueueRunIndexPath, "0," + totalCount); } return list; } /// <summary> /// 出栈 /// </summary> /// <param name="count"></param> /// <returns></returns> public bool Pop(int count = 1) { if (!File.Exists(this.FileQueueRunIndexPath)) { return false; } string strIndex = File.ReadAllText(this.FileQueueRunIndexPath); string[] arrIndex = strIndex.Split(','); int startIndex = int.Parse(arrIndex[0]) + count; int totalCount = int.Parse(arrIndex[1]); SetCount(-1 * count); if (startIndex >= totalCount) { File.Delete(this.FileQueueRunIndexPath); File.Delete(this.FileQueueRunPath); } else { File.WriteAllText(this.FileQueueRunIndexPath, startIndex + "," + totalCount); } return true; } } }