zoukankan      html  css  js  c++  java
  • c# 基于文件系统实现的队列处理类

    现实业务中经常遇到需要队列处理的问题。

    问题场景:

    客户端记录设备运行数据,传输给服务器。在传输中可能存在延迟或中断情况。
    当中断时,系统传输数据可能因为无法传输或电脑重启,会导致服务器数据记录不连续。

    设想的解决方案:

    当客户端采集到设备数据后,将数据先插入处理队列。
    另一个服务程序从队列中取出数据发送到服务器。
    数据记录与数据上传做到完全独立,并且是异步操作。
    数据记录在硬盘上,保障了数据的连续性和可靠性。
    如果一直无法上传到服务器,会导致队列中等待处理的数据积攒过多。

    现有解决方案:

    使用数据库做队列(可能遇到的问题:客户端性能有限,数据库可靠性差,系统断电重启,可能导致数据库损坏等)
    使用专业的消息队列(学习成本高,部署复杂,功能复杂,性能高)
    自己实现简单的队列服务(基于文件系统实现,无需部署,简单可靠,性能基本满足需求)

    基于文件的队列处理设计

    主要用的C#函数:File.AppendAllText、StreamWriter.WriteLine、File.ReadAllText、File.WriteAllText、File.Delete

    好处:

    基于系统文件处理函数
    队列大小依赖于磁盘大小
    队列内容为一行文本,无格式要求

    单文件队列

    原理:

    将业务数据追加到待处理文件中。
    处理程序从文件头部读取业务数据,处理完成后,将头部处理过的业务数据删除。

    遇到的问题:

    当业务处理程序一直失败时,队列文件会随着时间的增长越来越大。导致文件读写缓慢。
    因为业务数据写入与业务数据读取操作的是同一个文件,在大量并发时,存在文件锁定问题。
    因为待处理文件过大,磁盘操作数据量过大,导致磁盘性能不足。

    多文件队列

    处理示意图

    原理:

    业务数据写入时,按日期生成文件写入。一天一个文件,避免单文件过大问题。
    业务程序,处理时,将最早的待处理队列文件改名为正在处理文件。避免业务写入和业务处理同时操作一个文件问题。
    业务程序处理完成后,记录已经处理到的文件行数,避免多次重写队列文件。

    使用方法:

    LsLib.FileQueue fileQueue = new LsLib.FileQueue();
    
    // 插入待处理队列
    fileQueue.Push(new string[] { "业务数据内容,字符串(中间不能有换行)" });
    
    // 清空队列
    fileQueue.Clear();
    
    // 队列待处理数量
    fileQueue.Count();
    
    // 获取待处理数据
    fileQueue.GetList(10);
    
    // 设置最早的数据处理完成
    fileQueue.Pop(10);

    源代码:

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Windows.Forms;
    
    namespace LsLib
    {
        /// <summary>
        /// 文件堆栈
        /// </summary>
        class FileQueue
        {
            private string QueueName = "file";
            private string FileQueuePre = "";
            private string FileQueueRunPath = "";
            private string FileQueueRunIndexPath = "";
            private string FileQueueCountPath = "";
    
            public FileQueue(string queueName = "file")
            {
                this.QueueName = queueName;
                this.FileQueuePre = Application.StartupPath + "\queue\" + queueName;
                this.FileQueueRunPath = Application.StartupPath + "\queue\" + queueName + "_run.dat";
                this.FileQueueRunIndexPath = Application.StartupPath + "\queue\" + queueName + "_run_index.dat";
                this.FileQueueCountPath = Application.StartupPath + "\queue\" + queueName + "_count.dat";
            }
    
            /// <summary>
            /// 插入堆栈
            /// </summary>
            /// <param name="str"></param>
            /// <returns></returns>
            public bool Push(string[] strList)
            {
                int tryIndex = 0;
                string filePath = this.FileQueuePre + "_list" + DateTime.Now.ToString("_yyyyMMdd") + ".dat";
    
                while (tryIndex < 5)
                {
                    try
                    {
                        using (StreamWriter sw = new StreamWriter(filePath, true))
                        {
                            foreach (var str in strList)
                            {
                                sw.WriteLine(str);
                            }
                        }
    
                        SetCount(strList.Length);
    
                        return true;
                    }
                    catch (Exception ex)
                    {
                        tryIndex++;
                        Thread.Sleep(100);
                    }
                }
    
                return false;
            }
    
            // 设置队列待处理数量
            private int SetCount(int i)
            {
                int count = 0;
                if (File.Exists(this.FileQueueCountPath))
                {
                    count = int.Parse(File.ReadAllText(this.FileQueueCountPath));
                }
    
                count += i;
    
                File.WriteAllText(this.FileQueueCountPath, count.ToString());
    
                return count;
            }
    
            /// <summary>
            /// 清空堆栈
            /// </summary>
            /// <returns></returns>
            public bool Clear()
            {
                string[] fileList = Directory.GetFiles(Application.StartupPath + "\queue\", QueueName + "_*.dat");
    
                foreach (var file in fileList)
                {
                    File.Delete(file);
                }
    
                return true;
            }
    
            /// <summary>
            /// 堆栈待处理数量
            /// </summary>
            /// <returns></returns>
            public int Count()
            {
                int count = 0;
                if (File.Exists(this.FileQueueCountPath))
                {
                    count = int.Parse(File.ReadAllText(this.FileQueueCountPath));
                }
    
                return count;
            }
    
            /// <summary>
            /// 获取待处理列表
            /// </summary>
            /// <param name="count"></param>
            /// <returns></returns>
            public List<string> GetList(int count = 1)
            {
                List<string> list = new List<string>();
    
                bool isFirst = false;
                if (!File.Exists(this.FileQueueRunPath))
                {
                    string[] fileList = Directory.GetFiles(Application.StartupPath + "\queue\", QueueName + "_list_*.dat");
                    if (fileList.Length == 0)
                    {
                        return list;
                    }
    
                    Array.Sort(fileList);
    
                    File.Move(fileList[0], this.FileQueueRunPath);
                    isFirst = true;
                }
    
                int startIndex = 0;
                int totalCount = 0;
    
                if (File.Exists(this.FileQueueRunIndexPath))
                {
                    string strIndex = File.ReadAllText(this.FileQueueRunIndexPath);
                    string[] arrIndex = strIndex.Split(',');
    
                    startIndex = int.Parse(arrIndex[0]);
                    totalCount = int.Parse(arrIndex[1]);
                }
    
                int index = 0;
                using (StreamReader sm = File.OpenText(this.FileQueueRunPath))
                {
                    while (true)
                    {
                        string str = sm.ReadLine();
                        if (str == null)
                        {
                            break;
                        }
                        str = str.Trim();
    
                        if (str == "")
                        {
                            continue;
                        }
    
                        totalCount++;
                        if (index < startIndex)
                        {
                            index++;
                            continue;
                        }
    
                        if (list.Count < count)
                        {
                            list.Add(str);
                        }
    
                        if (list.Count >= count && !isFirst)
                        {
                            break;
                        }
                    }
                }
    
                if (isFirst)
                {
                    File.WriteAllText(this.FileQueueRunIndexPath, "0," + totalCount);
                }
    
                return list;
            }
    
            /// <summary>
            /// 出栈
            /// </summary>
            /// <param name="count"></param>
            /// <returns></returns>
            public bool Pop(int count = 1)
            {
                if (!File.Exists(this.FileQueueRunIndexPath))
                {
                    return false;
                }
    
                string strIndex = File.ReadAllText(this.FileQueueRunIndexPath);
                string[] arrIndex = strIndex.Split(',');
    
                int startIndex = int.Parse(arrIndex[0]) + count;
                int totalCount = int.Parse(arrIndex[1]);
    
                SetCount(-1 * count);
    
                if (startIndex >= totalCount)
                {
                    File.Delete(this.FileQueueRunIndexPath);
                    File.Delete(this.FileQueueRunPath);
                }
                else
                {
                    File.WriteAllText(this.FileQueueRunIndexPath, startIndex + "," + totalCount);
                }
    
                return true;
            }
        }
    }
  • 相关阅读:
    JDK内置工具使用
    awk 数组
    c++面试题
    C++内存分配
    awk 数字比较
    awk脚本 排序
    awk 读取 shell 变量的方法
    NVelocity系列:Getting Start With NVelocity
    Castle.ActiveRecord分页示例
    NVelocity系列:NVelocity配置详解
  • 原文地址:https://www.cnblogs.com/zjfree/p/10790211.html
Copyright © 2011-2022 走看看