zoukankan      html  css  js  c++  java
  • 超实用文件监控多线程FTP上传工具

    这是自己很久以前写的一个多线程FTP 上传工具,支持多账户,自定义线程数,自定义文件监控目录,可用做文件发布使用,非常实用,今天有小伙伴问起,现分享出来:

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Linq;
     4 using System.IO;
     5 using System.Text;
     6 using System.Threading.Tasks;
     7 
     8 namespace NuFTPCmmndv5
     9 {
    10     public class TaskFile
    11     {
    12         public TaskFile()
    13         {
    14             GUID = Guid.NewGuid();
    15         }
    16         private long _fileSize { get; set; }
    17 
    18         public Guid GUID { get; set; }
    19         public string HOST { get; set; }
    20         public string DIR { get; set; }
    21         public string LCD { get; set; }
    22         public string Priority { get; set; }
    23         public string Filename { get; set; }
    24         public long Size
    25         {
    26             get
    27             {
    28                 if (File.Exists(this.LCD))
    29                 {
    30                     _fileSize = new FileInfo(this.LCD).Length;
    31                 };
    32                 return _fileSize;
    33             }
    34             set { _fileSize = value; }
    35         }
    36 
    37     }
    38 }
    View Code
      1 using System;
      2 using System.IO;
      3 using System.Collections.Generic;
      4 using System.ComponentModel;
      5 using System.Data;
      6 using System.Drawing;
      7 using System.Linq;
      8 using System.Text;
      9 using System.Threading;
     10 using System.Threading.Tasks;
     11 using System.Windows.Forms;
     12 
     13 using FluentScheduler;
     14 
     15 namespace NuFTPCmmndv5
     16 {
     17     public partial class FormMain : Form
     18     {
     19 
     20         private Setting setting;
     21         private static Semaphore sema = new Semaphore(1, 1);
     22         private object syncRoot = new object();
     23         public FormMain()
     24         {
     25             InitializeComponent();
     26             setting = new Setting();
     27         }
     28 
     29         private void FormMain_Load(object sender, EventArgs e)
     30         {
     31             setting = setting.loadSetting();
     32 
     33             if (Directory.Exists(setting.FolderToMonitor + "err\"))
     34                 Directory.CreateDirectory(setting.FolderToMonitor + "err\");
     35 
     36             if (Directory.Exists(setting.FolderToMonitor + "pending\"))
     37                 Directory.CreateDirectory(setting.FolderToMonitor + "pending\");
     38 
     39             foreach (var f in new DirectoryInfo(setting.FolderToMonitor + "pending\").GetFiles())
     40             {
     41                 f.CopyTo(setting.FolderToMonitor + "err\" + f.Name, true);
     42                 f.Delete();
     43             }
     44 
     45             SetStatus(setting.FolderToMonitor);
     46 
     47             //开启上传任务
     48             StartRunUploadTask();
     49 
     50             //开始监控任务
     51             StartRunMonitorTask(1);
     52         }
     53 
     54 
     55         /// <summary>
     56         /// 启动监控任务
     57         /// </summary>
     58         /// <param name="timer"></param>
     59         public void StartRunMonitorTask(int timer)
     60         {
     61             JobManager.AddJob(() =>
     62             {
     63                 sema.WaitOne();
     64                 SetToolStripStatusStatus(DateTime.Now + " (" + DataGridFiles.Rows.Count + ")");
     65                 RunTask();
     66                 sema.Release();
     67             },
     68             t =>
     69             {
     70                 t.WithName("StartRunMonitorTask").ToRunNow().AndEvery(timer).Seconds();
     71             });
     72         }
     73 
     74         /// <summary>
     75         /// 运行监控任务
     76         /// </summary>
     77         public void RunTask()
     78         {
     79             //lock (syncRoot)
     80             //{
     81                 #region
     82                 try
     83                 {
     84                     //每5分钟读取出错文件并追加到待上传
     85                     if (DateTime.Now.Second == 0 && DateTime.Now.Minute % 5 == 0)
     86                     {
     87                         foreach (var f in new DirectoryInfo(setting.FolderToMonitor + "err\").GetFiles())
     88                         {
     89                             f.CopyTo(setting.FolderToMonitor + f.Name, true);
     90                             f.Delete();
     91                             LogText("Retrying: " + f.Name);
     92                         }
     93                     }
     94                     //写监控
     95                     WriteMon(DateTime.Now.ToString());
     96 
     97                     //待上传文件大于200 报警
     98                     if (DataGridFiles.Rows.Count > 200)
     99                     {
    100                         WriteMon("=ERROR=NuFTPCmmndv4 pending upload: " + DataGridFiles.Rows.Count);
    101                     }
    102 
    103                     //删除超过30天日志
    104                     foreach (var f in new DirectoryInfo(AppDomain.CurrentDomain.BaseDirectory + "logs\").GetFiles())
    105                     {
    106                         if (f.LastWriteTime < DateTime.Now.AddDays(-30))
    107                             f.Delete();
    108                     }
    109                 }
    110                 catch (Exception ex)
    111                 {
    112                 }
    113                 #endregion
    114 
    115                 //读取dat文件
    116                 foreach (var f in new DirectoryInfo(setting.FolderToMonitor).GetFiles("*.dat"))
    117                 {
    118                     try
    119                     {
    120                         var task = new TaskFile();
    121 
    122                         #region
    123                         //按行读取
    124                         int curLine = 1;
    125                         using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
    126                         {
    127                             fileStream.Seek(0, SeekOrigin.Begin);
    128                             string[] lines = System.IO.File.ReadAllLines(f.FullName);
    129                             using (var streamReader = new StreamReader(fileStream, Encoding.Default))
    130                             {
    131                                 string content = streamReader.ReadLine();
    132                                 while (!string.IsNullOrEmpty(content))
    133                                 {
    134                                     if (content.Substring(0, 5) == "HOST=")
    135                                     {
    136                                         //FTP
    137                                         task.HOST = content.Substring(content.IndexOf("=") + 1);
    138                                     }
    139                                     else if (content.Substring(0, 4) == "LCD=")
    140                                     {
    141                                         //本地目录文件
    142                                         task.LCD = content.Substring(content.IndexOf("=") + 1);
    143                                     }
    144                                     else if (content.Substring(0, 4) == "DIR=")
    145                                     {
    146                                         //远程对应目录文件
    147                                         task.DIR = content.Substring(content.IndexOf("=") + 1);
    148                                         task.DIR = task.DIR.Replace("\", "/");
    149                                     }
    150                                     else if (content.Substring(0, 9) == "priority=")
    151                                     {
    152                                         //优先级
    153                                         task.Priority = content.Substring(content.IndexOf("=") + 1);
    154                                     }
    155 
    156                                     content = streamReader.ReadLine();
    157                                     curLine++;
    158                                 }
    159                             }
    160                         }
    161 
    162                         #endregion
    163 
    164                         //上传文件名
    165                         task.Filename = f.Name;
    166 
    167                         //拷贝到待上传目录
    168                         f.CopyTo(setting.FolderToMonitor + "pending\" + f.Name, true);
    169                         f.Delete();
    170 
    171                         //遍历账户配置
    172                         var account = setting.FTPAccounts.Select(a => a).Where(a => a.FTPName == task.HOST).FirstOrDefault();
    173                         if (account != null)
    174                         {
    175                             //是否已经存在
    176                             if (!account.Contains(task))
    177                             {
    178                                 //添加到待传队列
    179                                 account.AddQueue(task);
    180                                 //刷新GridView
    181                                 InvokeAddGridView(this.DataGridFiles, task);
    182                             }
    183                             else
    184                             {
    185                                 //存在则移除文件
    186                                 LogText(task.HOST + " The file already exists in the Queue:" + task.HOST + "/" + task.DIR);
    187                             }
    188                         }
    189                     }
    190                     catch (Exception ex)
    191                     {
    192                         LogText(ex.Message + ";" + ex.StackTrace);
    193                     }
    194                 }
    195             //}
    196         }
    197         /// <summary>
    198         /// 开启上传任务
    199         /// </summary>
    200         public void StartRunUploadTask()
    201         {
    202             foreach (var account in setting.FTPAccounts)
    203             {
    204                 account.setting = setting;
    205                 //注册上传完成事件
    206                 account.Completed += account_Completed;
    207                 //注册上传进度事件
    208                 account.ProcessProgress += account_ProcessProgress;
    209                 //注册删除上传事件
    210                 account.Deleted += account_Deleted;
    211                 //终止上传事件
    212                 account.Aborted += account_Aborted;
    213                 //注册上传日志事件
    214                 account.ProcessLog += account_ProcessLog;
    215                 //开始上传队列
    216                 account.Start();
    217             }
    218         }
    219 
    220 
    221         private void account_ProcessProgress(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
    222         {
    223             InvokeUpdateGridView(this.DataGridFiles, arg1, arg2);
    224         }
    225         private void account_Completed(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
    226         {
    227             InvokeRemoveGridView(this.DataGridFiles, arg1);
    228         }
    229         private void account_Aborted(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
    230         {
    231             foreach (FileInfo f in new DirectoryInfo(setting.FolderToMonitor + "pending\").GetFiles())
    232             {
    233                 if (arg1.Filename == f.Name)
    234                 {
    235                     f.CopyTo(setting.FolderToMonitor + "err\" + f.Name, true);
    236                     f.Delete();
    237                     break;
    238                 }
    239             }
    240         }
    241         private void account_Deleted(TaskFile arg1, FTPAccount.CompetedEventArgs arg2)
    242         {
    243             //删除行
    244             InvokeRemoveGridView(this.DataGridFiles, arg1);
    245             //删除文件
    246             try
    247             {
    248                 System.IO.File.Delete(setting.FolderToMonitor + "err\" + arg1.Filename);
    249             }
    250             catch (Exception ex)
    251             {
    252                 LogText(ex.Message + ";" + ex.StackTrace);
    253             }
    254         }
    255         private void account_ProcessLog(string obj)
    256         {
    257             LogText(obj);
    258         }
    259 
    260 
    261 
    262         public void InvokeAddGridView(DataGridView dataGridView, TaskFile task)
    263         {
    264             if (dataGridView.InvokeRequired)
    265             {
    266                 this.Invoke(new Action(() =>
    267                 {
    268                     AddGridView(dataGridView, task);
    269                 }));
    270                 return;
    271             }
    272             AddGridView(dataGridView, task);
    273         }
    274         public void AddGridView(DataGridView dataGridView, TaskFile task)
    275         {
    276             try
    277             {
    278                 int index = dataGridView.Rows.Add();
    279                 dataGridView.Rows[index].Cells[0].Value = task.LCD;
    280                 dataGridView.Rows[index].Cells[1].Value = FormatFileSize(task.Size);
    281                 dataGridView.Rows[index].Cells[2].Value = "Pending";
    282                 dataGridView.Rows[index].Cells[3].Value = task.Filename;
    283                 dataGridView.Rows[index].Cells[4].Value = task.GUID.ToString();
    284             }
    285             catch (Exception ex)
    286             {
    287                 LogText(ex.Message + ";" + ex.StackTrace);
    288             }
    289         }
    290 
    291 
    292         public String FormatFileSize(Int64 fileSize)
    293         {
    294             if (fileSize < 0)
    295                 return "0";
    296             else if (fileSize >= 1024 * 1024 * 1024)
    297                 return string.Format("{0:########0.00} GB", ((Double)fileSize) / (1024 * 1024 * 1024));
    298             else if (fileSize >= 1024 * 1024)
    299                 return string.Format("{0:####0.00} MB", ((Double)fileSize) / (1024 * 1024));
    300             else if (fileSize >= 1024)
    301                 return string.Format("{0:####0.00} KB", ((Double)fileSize) / 1024);
    302             else
    303                 return string.Format("{0} bytes", fileSize);
    304         }
    305 
    306 
    307         public void InvokeUpdateGridView(DataGridView dataGridView, TaskFile task, FTPAccount.CompetedEventArgs arg)
    308         {
    309             if (dataGridView.InvokeRequired)
    310             {
    311                 this.Invoke(new Action(() =>
    312                 {
    313                     UpdateGridView(dataGridView, task, arg);
    314                 }));
    315                 return;
    316             }
    317             UpdateGridView(dataGridView, task, arg);
    318         }
    319         public void UpdateGridView(DataGridView dataGridView, TaskFile task, FTPAccount.CompetedEventArgs arg)
    320         {
    321             try
    322             {
    323                 foreach (DataGridViewRow r in dataGridView.Rows)
    324                 {
    325                     if (!r.IsNewRow && (r.Cells["Guid"].Value.ToString() == task.GUID.ToString()))
    326                     {
    327                         if (arg.uploadStatus == FTPAccount.UploadStatus.Failed)
    328                         {
    329                             r.Cells["Completed"].Value = "Failed";
    330                             r.Cells["Completed"].Style.BackColor = Color.LightPink;
    331                         }
    332                         else if (arg.uploadStatus == FTPAccount.UploadStatus.Timeout)
    333                         {
    334                             r.Cells["Completed"].Value = "Timeout";
    335                             r.Cells["Completed"].Style.BackColor = Color.LightPink;
    336                         }
    337                         else if (arg.uploadStatus == FTPAccount.UploadStatus.Cancel)
    338                         {
    339                             r.Cells["Completed"].Value = "Cancel";
    340                             r.Cells["Completed"].Style.BackColor = Color.LightPink;
    341                         }
    342                         else if (arg.uploadStatus == FTPAccount.UploadStatus.Uploading && arg.CompetedPrecent != 0)
    343                         {
    344                             r.Cells["Completed"].Value = arg.CompetedPrecent + "%";
    345                             r.Cells["Completed"].Style.BackColor = Color.White;
    346                         }
    347                         else if (arg.uploadStatus == FTPAccount.UploadStatus.Uploading && arg.CompetedPrecent == 0)
    348                         {
    349                             r.Cells["Completed"].Value = "Uploading";
    350                             r.Cells["Completed"].Style.BackColor = Color.White;
    351                         }
    352                     }
    353                 }
    354                 dataGridView.Sort(dataGridView.Columns["Completed"], ListSortDirection.Ascending); 
    355             }
    356             catch (Exception ex)
    357             {
    358                 LogText(ex.Message + ";" + ex.StackTrace);
    359             }
    360         }
    361 
    362 
    363         public void InvokeRemoveGridView(DataGridView dataGridView, TaskFile task)
    364         {
    365             if (dataGridView.InvokeRequired)
    366             {
    367                 this.Invoke(new Action(() =>
    368                 {
    369                     RemoveGridView(dataGridView, task);
    370                 }));
    371                 return;
    372             }
    373             RemoveGridView(dataGridView, task);
    374         }
    375         public void RemoveGridView(DataGridView dataGridView, TaskFile task)
    376         {
    377             try
    378             {
    379                 foreach (DataGridViewRow r in dataGridView.Rows)
    380                 {
    381                     if (!r.IsNewRow && (r.Cells["Guid"].Value.ToString() == task.GUID.ToString()))
    382                     {
    383                         dataGridView.Rows.Remove(r);
    384                     }
    385                 }
    386             }
    387             catch (Exception ex)
    388             {
    389                 LogText(ex.Message + ";" + ex.StackTrace);
    390             }
    391         }
    392 
    393 
    394         public void WriteMon(string txt)
    395         {
    396 
    397         }
    398 
    399         public void LogText(string msg)
    400         {
    401             var dir = AppDomain.CurrentDomain.BaseDirectory + "logs\";
    402             if (!Directory.Exists(dir))
    403                 Directory.CreateDirectory(dir);
    404 
    405             using (var w = new StreamWriter(dir + DateTime.Now.ToString("yyyy-MM-dd") + ".log", true))
    406             {
    407                 w.WriteLine(DateTime.Now.ToString() + ": " + msg);
    408                 w.Close();
    409             }
    410 
    411             if (TextBoxLog.InvokeRequired)
    412             {
    413                 this.Invoke(new Action(() =>
    414                 {
    415                     TextBoxLog.AppendText(msg + "
    ");
    416                     if (this.TextBoxLog.Lines.Count() > setting.MAX_LOG_LINES)
    417                     {
    418                         this.TextBoxLog.Text = "";
    419                     }
    420                     Application.DoEvents();
    421                 }));
    422                 return;
    423             }
    424 
    425             TextBoxLog.AppendText(msg + "
    ");
    426             if (this.TextBoxLog.Lines.Count() > setting.MAX_LOG_LINES)
    427             {
    428                 this.TextBoxLog.Text = "";
    429             }
    430             Application.DoEvents();
    431         }
    432 
    433         public void SetStatus(string msg)
    434         {
    435             if (this.InvokeRequired)
    436             {
    437                 this.Invoke(new Action(() =>
    438                 {
    439                     this.LabelStatus.Text = msg;
    440                     Application.DoEvents();
    441                 }));
    442                 return;
    443             }
    444             this.LabelStatus.Text = msg;
    445             Application.DoEvents();
    446         }
    447 
    448 
    449         public void SetToolStripStatusStatus(string msg)
    450         {
    451             if (this.InvokeRequired)
    452             {
    453                 this.Invoke(new Action(() =>
    454                 {
    455                     this.toolStripStatusLabel2.Text = msg;
    456                     Application.DoEvents();
    457                 }));
    458                 return;
    459             }
    460             this.toolStripStatusLabel2.Text = msg;
    461             Application.DoEvents();
    462         }
    463 
    464 
    465         public void WriteLog(TextBox textBox, string msg)
    466         {
    467             if (textBox.InvokeRequired)
    468             {
    469                 textBox.Invoke(new Action(() =>
    470                 {
    471                     textBox.AppendText(msg + "
    ");
    472                 }));
    473                 return;
    474             }
    475             textBox.AppendText(msg + "
    ");
    476         }
    477 
    478         private void button2_Click(object sender, EventArgs e)
    479         {
    480 
    481             if (ButtonPauseResume.Text == "Pause")
    482             {
    483                 JobManager.Stop();
    484                 ButtonPauseResume.Text = "Resume";
    485             }
    486             else
    487             {
    488                 JobManager.Start();
    489                 ButtonPauseResume.Text = "Pause";
    490             }
    491         }
    492 
    493         /// <summary>
    494         /// 取消上传
    495         /// </summary>
    496         /// <param name="sender"></param>
    497         /// <param name="e"></param>
    498         private void cancelUploadToolStripMenuItem_Click(object sender, EventArgs e)
    499         {
    500             DataGridViewRow row;
    501             var task = new TaskFile();
    502             row = DataGridFiles.SelectedRows[0];
    503             var fileName = row.Cells["FileName"].Value.ToString();
    504 
    505             #region
    506 
    507             if (System.IO.File.Exists(setting.FolderToMonitor + "pending\" + fileName))
    508             {
    509                 var f = new FileInfo(setting.FolderToMonitor + "pending\" + fileName);
    510                 //按行读取
    511                 int curLine = 1;
    512                 using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
    513                 {
    514                     fileStream.Seek(0, SeekOrigin.Begin);
    515                     string[] lines = System.IO.File.ReadAllLines(f.FullName);
    516                     using (var streamReader = new StreamReader(fileStream, Encoding.Default))
    517                     {
    518                         string content = streamReader.ReadLine();
    519                         while (!string.IsNullOrEmpty(content))
    520                         {
    521                             if (content.Substring(0, 5) == "HOST=")
    522                             {
    523                                 //FTP
    524                                 task.HOST = content.Substring(content.IndexOf("=") + 1);
    525                             }
    526                             else if (content.Substring(0, 4) == "LCD=")
    527                             {
    528                                 //本地目录文件
    529                                 task.LCD = content.Substring(content.IndexOf("=") + 1);
    530                             }
    531                             else if (content.Substring(0, 4) == "DIR=")
    532                             {
    533                                 //远程对应目录文件
    534                                 task.DIR = content.Substring(content.IndexOf("=") + 1);
    535                                 task.DIR = task.DIR.Replace("\", "/");
    536                             }
    537                             else if (content.Substring(0, 9) == "priority=")
    538                             {
    539                                 //优先级
    540                                 task.Priority = content.Substring(content.IndexOf("=") + 1);
    541                             }
    542 
    543                             content = streamReader.ReadLine();
    544                             curLine++;
    545                         }
    546                     }
    547                 }
    548 
    549                 foreach (var account in setting.FTPAccounts)
    550                 {
    551                     if (account.FTPName == task.HOST)
    552                     {
    553                         account.cancelTask = task;
    554                         break;
    555                     }
    556                 }
    557 
    558                 //--------------Copyto file to err
    559                 f.CopyTo(setting.FolderToMonitor + "err\" + f.Name, true);
    560                 System.IO.File.Delete(setting.FolderToMonitor + "pending\" + f.Name);
    561                 //--------------Copyto file to err
    562             }
    563 
    564             #endregion
    565         }
    566 
    567         /// <summary>
    568         /// 删除上传
    569         /// </summary>
    570         /// <param name="sender"></param>
    571         /// <param name="e"></param>
    572         private void deleteUploadToolStripMenuItem_Click(object sender, EventArgs e)
    573         {
    574             DataGridViewRow row;
    575             var task = new TaskFile();
    576             row = DataGridFiles.SelectedRows[0];
    577             var fileName = row.Cells["FileName"].Value.ToString();
    578             var fileGuid = row.Cells["Guid"].Value.ToString();
    579             #region
    580 
    581             if (System.IO.File.Exists(setting.FolderToMonitor + "pending\" + fileName))
    582             {
    583                 var f = new FileInfo(setting.FolderToMonitor + "pending\" + fileName);
    584                 //按行读取
    585                 int curLine = 1;
    586                 using (var fileStream = new FileStream(f.FullName, FileMode.Open, FileAccess.Read))
    587                 {
    588                     fileStream.Seek(0, SeekOrigin.Begin);
    589                     string[] lines = System.IO.File.ReadAllLines(f.FullName);
    590                     using (var streamReader = new StreamReader(fileStream, Encoding.Default))
    591                     {
    592                         string content = streamReader.ReadLine();
    593                         while (!string.IsNullOrEmpty(content))
    594                         {
    595                             if (content.Substring(0, 5) == "HOST=")
    596                             {
    597                                 //FTP
    598                                 task.HOST = content.Substring(content.IndexOf("=") + 1);
    599                             }
    600                             else if (content.Substring(0, 4) == "LCD=")
    601                             {
    602                                 //本地目录文件
    603                                 task.LCD = content.Substring(content.IndexOf("=") + 1);
    604                             }
    605                             else if (content.Substring(0, 4) == "DIR=")
    606                             {
    607                                 //远程对应目录文件
    608                                 task.DIR = content.Substring(content.IndexOf("=") + 1);
    609                                 task.DIR = task.DIR.Replace("\", "/");
    610                             }
    611                             else if (content.Substring(0, 9) == "priority=")
    612                             {
    613                                 //优先级
    614                                 task.Priority = content.Substring(content.IndexOf("=") + 1);
    615                             }
    616 
    617                             content = streamReader.ReadLine();
    618                             curLine++;
    619                         }
    620                     }
    621                 }
    622 
    623                 if (!string.IsNullOrEmpty(task.HOST))
    624                 {
    625                     foreach (var account in setting.FTPAccounts)
    626                     {
    627                         if (account.FTPName == task.HOST)
    628                         {
    629                             account.delTask = task;
    630                             break;
    631                         }
    632                     }
    633                 }
    634                 
    635                 LogText("Delete File:" + "pending\" + f.Name);
    636                 System.IO.File.Delete(setting.FolderToMonitor + "pending\" + f.Name);
    637             }
    638 
    639             if (!string.IsNullOrEmpty(task.HOST))
    640                 RemoveGridView(this.DataGridFiles, task);
    641             else
    642             {
    643                 task.GUID = System.Guid.Parse(fileGuid);
    644                 RemoveGridView(this.DataGridFiles, task);
    645             }
    646 
    647             #endregion
    648         }
    649 
    650         private void DataGridFiles_CellMouseDown(object sender, DataGridViewCellMouseEventArgs e)
    651         {
    652             if (e.Button == MouseButtons.Right)
    653             {
    654                 if (e.RowIndex >= 0)
    655                 {
    656                     //若行已是选中状态就不再进行设置
    657                     if (DataGridFiles.Rows[e.RowIndex].Selected == false)
    658                     {
    659                         DataGridFiles.ClearSelection();
    660                         DataGridFiles.Rows[e.RowIndex].Selected = true;
    661                     }
    662                     //只选中一行时设置活动单元格
    663                     if (DataGridFiles.SelectedRows.Count == 1)
    664                     {
    665                         DataGridFiles.CurrentCell = DataGridFiles.Rows[e.RowIndex].Cells[e.ColumnIndex];
    666                     }
    667                     //弹出操作菜单
    668                     contextMenuStrip1.Show(MousePosition.X, MousePosition.Y);
    669                 }
    670             }
    671         }
    672 
    673         private void FormMain_FormClosing(object sender, FormClosingEventArgs e)
    674         {
    675             if (JobManager.GetSchedule("StartRunMonitorTask") != null)
    676             {
    677                 JobManager.Stop();
    678                 JobManager.RemoveJob("StartRunMonitorTask");
    679             }
    680             System.Environment.Exit(0);
    681             Application.Exit();
    682         }
    683 
    684         private void button1_Click(object sender, EventArgs e)
    685         {
    686             var fa = new FormAccounts(setting);
    687             fa.Owner = this;
    688             fa.ShowDialog();
    689         }
    690     }
    691 }
    View Code
      1 using System;
      2 using System.IO;
      3 using System.Net;
      4 using System.Collections.Generic;
      5 using System.Linq;
      6 using System.Text;
      7 using System.Threading.Tasks;
      8 using System.Xml;
      9 using System.Xml.Linq;
     10 using System.Xml.Serialization;
     11 using System.Threading;
     12 using System.Security.Cryptography;
     13 
     14 namespace NuFTPCmmndv5
     15 {
     16 
     17     public class FTPAccount
     18     {
     19         [XmlIgnore]
     20         private ProcessQueue<TaskFile> processQueue;
     21 
     22         [XmlIgnore]
     23         public Setting setting;
     24 
     25         public FTPAccount()
     26         {
     27             //processQueue = new ProcessQueue<TaskFile>(UpLoadFileTest);
     28             processQueue = new ProcessQueue<TaskFile>(UpLoadFile);
     29             cancelTask = new TaskFile();
     30             delTask = new TaskFile();
     31         }
     32 
     33         [XmlElement]
     34         /// <summary>
     35         /// FTP名
     36         /// </summary>
     37         public string FTPName { get; set; }
     38         [XmlElement]
     39         /// <summary>
     40         /// FTP对应IP地址
     41         /// </summary>
     42         public string IP { get; set; }
     43         [XmlElement]
     44         /// <summary>
     45         /// 端口号
     46         /// </summary>
     47         public int Port { get; set; }
     48         [XmlElement]
     49         /// <summary>
     50         /// 账户名
     51         /// </summary>
     52         public string Username { get; set; }
     53         [XmlElement]
     54         /// <summary>
     55         /// 密码
     56         /// </summary>
     57         public string Password { get; set; }
     58         [XmlElement]
     59         public int MaxThreadNum { get; set; }
     60 
     61 
     62         [XmlIgnore]
     63         public TaskFile cancelTask { get; set; }
     64         [XmlIgnore]
     65         public TaskFile delTask { get; set; }
     66 
     67 
     68         /// <summary>
     69         /// 开始处理上传队列
     70         /// </summary>
     71         public void Start()
     72         {
     73             processQueue.Start();
     74         }
     75 
     76         /// <summary>
     77         /// 添加到队列
     78         /// </summary>
     79         /// <param name="task"></param>
     80         public void AddQueue(TaskFile task)
     81         {
     82             processQueue.Enqueue(task);
     83         }
     84 
     85         /// <summary>
     86         /// 是否已经包含在队列
     87         /// </summary>
     88         /// <param name="task"></param>
     89         /// <returns></returns>
     90         public bool Contains(TaskFile task)
     91         {
     92             return processQueue.Contains(task);
     93         }
     94 
     95 
     96         /// <summary>
     97         /// 上传进度
     98         /// </summary>
     99         public event Action<TaskFile, CompetedEventArgs> ProcessProgress;
    100         /// <summary>
    101         /// 上传完成
    102         /// </summary>
    103         public event Action<TaskFile, CompetedEventArgs> Completed;
    104         /// <summary>
    105         /// 终止上传
    106         /// </summary>
    107         public event Action<TaskFile, CompetedEventArgs> Aborted;
    108         /// <summary>
    109         /// 删除上传
    110         /// </summary>
    111         public event Action<TaskFile, CompetedEventArgs> Deleted;
    112         /// <summary>
    113         /// 上传日志
    114         /// </summary>
    115         public event Action<string> ProcessLog;
    116 
    117 
    118         private void OnProcessProgress(TaskFile pendingValue, CompetedEventArgs args)
    119         {
    120             if (ProcessProgress != null)
    121             {
    122                 try
    123                 {
    124                     ProcessProgress(pendingValue, args);
    125                 }
    126                 catch { }
    127             }
    128         }
    129 
    130         private void OnCompleted(TaskFile pendingValue, CompetedEventArgs args)
    131         {
    132             if (Completed != null)
    133             {
    134                 try
    135                 {
    136                     Completed(pendingValue, args);
    137                 }
    138                 catch { }
    139             }
    140         }
    141         private void OnAborted(TaskFile pendingValue, CompetedEventArgs args)
    142         {
    143             if (Aborted != null)
    144             {
    145                 try
    146                 {
    147                     Aborted(pendingValue, args);
    148                 }
    149                 catch { }
    150             }
    151         }
    152         private void OnDeleted(TaskFile pendingValue, CompetedEventArgs args)
    153         {
    154             if (Deleted != null)
    155             {
    156                 try
    157                 {
    158                     Deleted(pendingValue, args);
    159                 }
    160                 catch { }
    161             }
    162         }
    163         private void OnProcessLog(string log)
    164         {
    165             if (ProcessLog != null)
    166             {
    167                 try
    168                 {
    169                     ProcessLog(log);
    170                 }
    171                 catch { }
    172             }
    173         }
    174 
    175 
    176         public void UpLoadFileTest(TaskFile task)
    177         {
    178             OnProcessLog("Thread:" + Thread.CurrentThread.ManagedThreadId);
    179             OnProcessLog("Uploading: " + FTPName + "://" + IP + "/" + task.DIR);
    180             OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Uploading });
    181             for (int i = 1; i <= 100; i++)
    182             {
    183                 OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = i, uploadStatus = UploadStatus.Uploading });
    184                 Thread.Sleep(GenerateRandomInteger(100,200));
    185             }
    186             OnProcessLog("File Uploaded Successfully: " + FTPName + "://" + IP + "/" + task.DIR);
    187             OnCompleted(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Completed });
    188             //移除文件
    189             File.Delete(setting.FolderToMonitor + "pending\" + task.Filename);
    190         }
    191 
    192         public int GenerateRandomInteger(int min = 0, int max = 2147483647)
    193         {
    194             var randomNumberBuffer = new byte[10];
    195             new RNGCryptoServiceProvider().GetBytes(randomNumberBuffer);
    196             return new Random(BitConverter.ToInt32(randomNumberBuffer, 0)).Next(min, max);
    197         }
    198 
    199         /// <summary>
    200         /// 上传文件
    201         /// </summary>
    202         /// <param name="task"></param>
    203         public void UpLoadFile(TaskFile task)
    204         {
    205             FileInfo _FileInfo = null;
    206             FtpWebRequest _FtpWebRequest;
    207             Stream ftpStream = null;
    208             FileStream _FileStream = null;
    209             int buffLength = 0;
    210             byte[] buffer;
    211 
    212             int contentLen = 0;
    213             long uploaded = 0;
    214 
    215             string _directory = "";
    216             string dirStr = "";
    217             FtpWebResponse response;
    218 
    219 
    220             var uploadDatetime = DateTime.Now;
    221 
    222             try
    223             {
    224                 _FileInfo = new FileInfo(task.LCD);
    225                 OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Uploading });
    226 
    227                 //创建目录
    228                 if (task.DIR.IndexOf("/") >= 0)
    229                 {
    230                     //"/data/test/1.XML"
    231                     _directory = task.DIR.Substring(1, task.DIR.LastIndexOf("/") - 1);
    232                     OnProcessLog("Creating DIR: " + FTPName + "://" + IP + "/" + _directory);
    233                     dirStr = "";
    234 
    235                     #region
    236                     foreach (var dir in _directory.Split(new char[] { '/' }, StringSplitOptions.RemoveEmptyEntries))
    237                     {
    238                         dirStr += dir + "/";
    239                         try
    240                         {
    241                             _FtpWebRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri("ftp://" + IP + "/" + dirStr));
    242                             _FtpWebRequest.Method = WebRequestMethods.Ftp.MakeDirectory;
    243                             _FtpWebRequest.UseBinary = true;
    244                             _FtpWebRequest.Credentials = new NetworkCredential(Username, Password);
    245 
    246                             response = (FtpWebResponse)_FtpWebRequest.GetResponse();
    247                             ftpStream = response.GetResponseStream();
    248                             ftpStream.Close();
    249                             response.Close();
    250                         }
    251                         catch (Exception ex)
    252                         {
    253                             if (ftpStream != null)
    254                             {
    255                                 ftpStream.Close();
    256                                 ftpStream.Dispose();
    257                             }
    258                         }
    259                     }
    260                     #endregion
    261                 }
    262 
    263                 OnProcessLog("Uploading: " + FTPName + "://" + IP + "/" + task.DIR);
    264 
    265                 _FtpWebRequest = (FtpWebRequest)FtpWebRequest.Create(new Uri("ftp://" + IP + "/" + task.DIR));
    266                 _FtpWebRequest.Credentials = new NetworkCredential(Username, Password);
    267                 _FtpWebRequest.KeepAlive = false;
    268                 _FtpWebRequest.Timeout = 20000;
    269                 _FtpWebRequest.Method = WebRequestMethods.Ftp.UploadFile;
    270                 _FtpWebRequest.UseBinary = true;
    271                 _FtpWebRequest.ContentLength = _FileInfo.Length;
    272 
    273                 buffLength = 1024;
    274                 buffer = new byte[buffLength];
    275 
    276                 _FileStream = _FileInfo.OpenRead();
    277                 ftpStream = _FtpWebRequest.GetRequestStream();
    278                 contentLen = _FileStream.Read(buffer, 0, buffLength);
    279                 uploaded = contentLen;
    280 
    281                 var cancel = false;
    282                 var delete = false;
    283                 var timeOut = false;
    284 
    285                 while (contentLen > 0)
    286                 {
    287                     if (cancelTask.Filename == task.Filename)
    288                         cancel = true;
    289 
    290                     if (delTask.Filename == task.Filename)
    291                         delete = true;
    292 
    293                     if (DateTime.Now.Subtract(uploadDatetime).Seconds > 600)
    294                         timeOut = true;
    295 
    296                     if (cancel)
    297                     {
    298                         OnProcessLog("Thread Cancel: " + FTPName + "://" + IP + "/" + task.DIR);
    299                         throw new Exception("Cancel");
    300                         cancel = false;
    301                     }
    302                     else if (delete)
    303                     {
    304                         OnProcessLog("Thread Delete: " + FTPName + "://" + IP + "/" + task.DIR);
    305                         throw new Exception("Delete");
    306                         delete = false;
    307                     }
    308                     else if (timeOut)
    309                     {
    310                         OnProcessLog("Thread Timeout: " + FTPName + "://" + IP + "/" + task.DIR);
    311                         throw new Exception("Timeout");
    312                         timeOut = false;
    313                     }
    314 
    315                     ftpStream.Write(buffer, 0, contentLen);
    316                     contentLen = _FileStream.Read(buffer, 0, buffLength);
    317                     uploaded += contentLen;
    318                     //上传进度
    319                     OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = (int)(uploaded / _FileInfo.Length) * 100, uploadStatus = UploadStatus.Uploading });
    320                 }
    321 
    322                 ftpStream.Close();
    323                 ftpStream.Dispose();
    324                 _FileStream.Close();
    325                 _FileStream.Dispose();
    326 
    327                 //上传完成
    328                 OnProcessLog("File Uploaded Successfully: " + FTPName + "://" + IP + "/" + task.DIR);
    329                 OnCompleted(task, new CompetedEventArgs() { CompetedPrecent = 100, uploadStatus = UploadStatus.Completed });
    330 
    331                 //移除文件
    332                 File.Delete(setting.FolderToMonitor + "pending\" + task.Filename);
    333             }
    334             catch (Exception ex)
    335             {
    336                 //上传失败
    337                 OnProcessLog("Failed to upload: " + FTPName + "://" + IP + "/" + task.DIR);
    338 
    339                 try
    340                 {
    341                     File.Move(setting.FolderToMonitor + "pending\" + task.Filename, setting.FolderToMonitor + "err\" + task.Filename);
    342                 }
    343                 catch (Exception ex1)
    344                 {
    345                     OnProcessLog("Moving Files Err:" + task.HOST + ": " + task.DIR + ex1.Message);
    346                     try
    347                     {
    348                         File.Copy(setting.FolderToMonitor + "pending\" + task.Filename, setting.FolderToMonitor + "err\" + task.Filename);
    349                         File.Delete(setting.FolderToMonitor + "pending\" + task.Filename);
    350                     }
    351                     catch (Exception ex2)
    352                     {
    353 
    354                         OnProcessLog("Uploading Err:" + task.HOST + "/" + task.DIR + ex2.Message);
    355                         OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Failed });
    356                     }
    357                 }
    358                 finally
    359                 {
    360                     OnProcessLog("Uploading Err:" + task.HOST + "/" + task.DIR + ex.Message);
    361 
    362                     if (ex.Message == "TimeOut")
    363                         OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Timeout });
    364                     else if (ex.Message == "Cancel")
    365                         OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Cancel });
    366                     else if (ex.Message == "Delete")
    367                         OnDeleted(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Delete });
    368                     else
    369                         OnProcessProgress(task, new CompetedEventArgs() { CompetedPrecent = 0, uploadStatus = UploadStatus.Failed });
    370                 }
    371 
    372             }
    373             finally
    374             {
    375                 if (ftpStream != null)
    376                 {
    377                     ftpStream.Close();
    378                     ftpStream.Dispose();
    379                 }
    380                 if (_FileStream != null)
    381                 {
    382                     _FileStream.Close();
    383                     _FileStream.Dispose();
    384                 }
    385             }
    386         }
    387 
    388 
    389         /// <summary>
    390         /// 完成事件数据
    391         /// </summary>
    392         public class CompetedEventArgs : EventArgs
    393         {
    394             public CompetedEventArgs()
    395             {
    396             }
    397             public UploadStatus uploadStatus { get; set; }
    398             /// <summary>
    399             /// 完成百分率
    400             /// </summary>
    401             public int CompetedPrecent { get; set; }
    402             /// <summary>
    403             /// 异常信息
    404             /// </summary>
    405             public Exception InnerException { get; set; }
    406         }
    407 
    408         public enum DoWorkResult
    409         {
    410             /// <summary>
    411             /// 继续运行,默认
    412             /// </summary>
    413             ContinueThread = 0,
    414             /// <summary>
    415             /// 终止当前线程
    416             /// </summary>
    417             AbortCurrentThread = 1,
    418             /// <summary>
    419             /// 终止全部线程
    420             /// </summary>
    421             AbortAllThread = 2
    422         }
    423 
    424         public enum UploadStatus
    425         {
    426             Completed = 0,
    427             Failed = 1,
    428             Timeout = 3,
    429             Cancel = 4,
    430             Uploading = 5,
    431             Delete = 6,
    432             Abort = 7,
    433         }
    434     }
    435 }
    View Code
      1 using System;
      2 using System.Collections.Generic;
      3 using System.Linq;
      4 using System.Text;
      5 using System.Collections;
      6 using System.Threading;
      7 
      8 namespace NuFTPCmmndv5
      9 {
     10     /// <summary>
     11     /// 表示一个线程(同步)安全的通用泛型处理队列
     12     /// </summary>
     13     /// <typeparam name="T">ProcessQueue中包含的数据类型</typeparam>
     14     public class ProcessQueue<T> : IEnumerable<T>, ICollection, IEnumerable, IDisposable
     15     {
     16         #region Instance Variables
     17 
     18         private object syncRoot = new object();
     19         private Queue<T> queue;
     20         private List<WorkerThread> threads = new List<WorkerThread>();
     21         private Action<T> operation;
     22         bool isDisposed;
     23         bool isRunning;
     24 
     25         private int _maxThreadCount;
     26 
     27         public int MaxThreadCount
     28         {
     29             get { return _maxThreadCount == 0 ? 5 : _maxThreadCount; }
     30             set { value = _maxThreadCount; }
     31         }
     32 
     33         #endregion
     34 
     35         #region Constructors
     36         /// <summary>
     37         /// 初始一个新的ProcessQueue 并指定具有特定容量的工作线程
     38         /// </summary>
     39         public ProcessQueue(Action<T> action) : this(action, 5) { }
     40         /// <summary>
     41         /// 初始一个新的ProcessQueue
     42         /// </summary>
     43         public ProcessQueue(int capacity, Action<T> action) : this(capacity, action, 5) { }
     44         /// <summary>
     45         /// 初始一个新的ProcessQueue
     46         /// </summary>
     47         public ProcessQueue(IEnumerable<T> collection, Action<T> action) : this(collection, action, 5) { }
     48 
     49         /// <summary>
     50         /// 初始一个新的ProcessQueue
     51         /// </summary>
     52         public ProcessQueue(Action<T> action, int threadCount)
     53         {
     54             queue = new Queue<T>();
     55             operation = action;
     56 
     57             SetThreadCount(MaxThreadCount);
     58         }
     59 
     60         /// <summary>
     61         /// 初始一个新的ProcessQueue
     62         /// </summary>
     63         /// <param name="capacity">初始容量</param>
     64         public ProcessQueue(int capacity, Action<T> action, int threadCount)
     65         {
     66             queue = new Queue<T>(capacity);
     67             operation = action;
     68 
     69             SetThreadCount(MaxThreadCount);
     70         }
     71 
     72         /// <summary>
     73         /// 初始一个新的ProcessQueue
     74         /// </summary>
     75         /// <param name="collection">将数据复制到ProcessQueue.</param>
     76         public ProcessQueue(IEnumerable<T> collection, Action<T> action, int threadCount)
     77         {
     78             queue = new Queue<T>(collection);
     79             operation = action;
     80 
     81             SetThreadCount(MaxThreadCount);
     82         }
     83         #endregion
     84 
     85         #region Processing Control
     86 
     87         /// <summary>
     88         /// 停止 (挂起) 
     89         /// </summary>
     90         public void Stop()
     91         {
     92             lock (syncRoot)
     93             {
     94                 foreach (WorkerThread thread in threads)
     95                 {
     96                     thread.Pause();
     97                 }
     98 
     99                 isRunning = false;
    100             }
    101         }
    102 
    103         /// <summary>
    104         /// 开始运行
    105         /// </summary>
    106         public void Start()
    107         {
    108             lock (syncRoot)
    109             {
    110                 //清空队列集合重新创建新的线程
    111                 RegenerateIfDisposed();
    112                 //如果新进的项目少于当前线程集合总的线程数,则创建当前新进的项目数
    113                 //如果新进 的项目多余当前线程集合的线程数,则创建同样多的数程集合的线程数
    114                 for (int i = 0; i < Math.Min(threads.Count, queue.Count); i++)
    115                 {
    116                     //设置信号让其运行
    117                     threads[i].Signal();
    118                 }
    119                 isRunning = true;
    120             }
    121         }
    122 
    123         /// <summary>
    124         /// 获取此ProcessQueue使用的工作线程数。 使用SetThreadCount更改此值。
    125         /// </summary>
    126         public int ThreadCount { get { return threads.Count; } }
    127 
    128         /// <summary>
    129         /// 设置此ProcessQueue使用的工作线程数,并根据需要分配或释放线程。
    130         /// </summary>
    131         /// <param name="threadCount">线程数</param>
    132         public void SetThreadCount(int threadCount)
    133         {
    134             //至少要有一个线程
    135             if (threadCount < 1) throw new ArgumentOutOfRangeException("threadCount", "The ProcessQueue class requires at least one worker thread.");
    136             //同步线程
    137             lock (syncRoot)
    138             {
    139                 // 等待队列
    140                 int pending = queue.Count;
    141                 // 创建一个指定最大工作线程数的线程集合,每个线程用来处理排队的项目
    142                 for (int i = threads.Count; i < threadCount; i++) 
    143                 {
    144                     //注意:在实例化工作线程WorkerThread 时,已经创建了一个ThreadProc 无限循环方法,改方法检测 signalEvent, abortEvent 信号
    145                     //在收到 abortEvent 时终止并退出,收到 Signal时 循环调用 ProcessItems() 用来处理队列里排队的项目
    146                     WorkerThread thread = new ProcessQueue<T>.WorkerThread(this);
    147                     //添加到列表
    148                     threads.Add(thread);
    149                     //线程启动
    150                     thread.Start();
    151                     //如果队列总有待排项目时
    152                     if (pending > 1)
    153                     {
    154                         //设置信号,让当前工作线程运行(不等待)
    155                         thread.Signal();
    156                     }
    157                     //待排队数减一
    158                     pending--;
    159                 }
    160 
    161                 //如果其它线程调用了SetThreadCount,或者多次调用了 SetThreadCount,从而导致当前实际的线程集合有可能远远大于最大线程数
    162                 //在这种情况下,需要移除多余的线程,从而保证当前threadCount有效
    163                 //移除的线程数 = 当前创建的工作线程集合总数 - 设置的最大线程数
    164                 int toRemove = threads.Count - threadCount;
    165                 if (toRemove > 0)
    166                 {
    167                     //IsSignaled 如果当前实例收到信号,则为 true;否则为 false
    168                     //从线程集合里取出正在等待的线程
    169                     foreach (WorkerThread thread in threads.Where(t => !t.IsSignaled).ToList())
    170                     {
    171                         //设置信号使得该线程终止
    172                         thread.Abort();
    173                         //从集合中移除改项
    174                         threads.Remove(thread);
    175                         //移除数减一
    176                         toRemove--;
    177                     }
    178                     //如果待移除的线程正在运行中
    179                     //则强制移除该线程直到移除完为止
    180                     while (toRemove > 0)
    181                     {
    182                         WorkerThread thread = threads[threads.Count - 1];
    183                         thread.Abort();
    184                         threads.Remove(thread);
    185                         toRemove--;
    186                     }
    187                 }
    188             }
    189         }
    190 
    191         /// <summary>
    192         /// 处理队列项
    193         /// </summary>
    194         /// <param name="item"></param>
    195         private void ProcessItem(T item)
    196         {
    197             operation(item);
    198         }
    199 
    200         /// <summary>
    201         /// 释放时重置线程
    202         /// </summary>
    203         private void RegenerateIfDisposed()
    204         {
    205             if (isDisposed)
    206             {
    207                 int threadCount = threads.Count;
    208 
    209                 threads.Clear();
    210 
    211                 SetThreadCount(threadCount);
    212             }
    213 
    214             isDisposed = false;
    215         }
    216         #endregion
    217 
    218         /// <summary>
    219         /// 从ProcessQueue清除所有未处理的项目
    220         /// </summary>
    221         public void Clear()
    222         {
    223             lock (syncRoot)
    224             {
    225                 queue.Clear();
    226             }
    227         }
    228 
    229         /// <summary>
    230         /// 尝试从ProcessQueue中检索获取下一个项目(如果存在)。 如果不存在,则将值设置为其默认值。
    231         /// </summary>
    232         /// <param name="value">如果不存在,则该变量将被设置为默认值.</param>
    233         /// <returns>如果ProcessQueue包含一个项,则为真,如果没有则为False</returns>
    234         public bool TryDequeue(out T value)
    235         {
    236             lock (syncRoot)
    237             {
    238                 if (queue.Count > 0)
    239                 {
    240                     value = queue.Dequeue();
    241 
    242                     return true;
    243                 }
    244                 else
    245                 {
    246                     value = default(T);
    247 
    248                     return false;
    249                 }
    250             }
    251         }
    252 
    253         /// <summary>
    254         /// 确定队列是否包含指定项
    255         /// </summary>
    256         /// <param name="item">当前项</param>
    257         /// <returns>存在则 True, 不存在则 False</returns>
    258         public bool Contains(T item)
    259         {
    260             lock (syncRoot)
    261             {
    262                 return queue.Contains(item);
    263             }
    264         }
    265 
    266         /// <summary>
    267         /// 将ProcessQueue的内容复制到外部数组,而不影响ProcessQueue的内容
    268         /// </summary>
    269         /// <param name="array">The array to copy the items into</param>
    270         /// <param name="arrayIndex">The starting index in the array</param>
    271         public void CopyTo(T[] array, int arrayIndex)
    272         {
    273             lock (syncRoot)
    274             {
    275                 queue.CopyTo(array, arrayIndex);
    276             }
    277         }
    278 
    279         /// <summary>
    280         /// 从ProcessQueue中检索下一个未处理的项目并将其删除
    281         /// </summary>
    282         /// <returns>The next unprocessed item in the ProcessQueue</returns>
    283         public T Dequeue()
    284         {
    285             lock (syncRoot)
    286             {
    287                 return queue.Dequeue();
    288             }
    289         }
    290 
    291         /// <summary>
    292         /// 将一个项目添加到处理队列的末尾
    293         /// </summary>
    294         /// <param name="item">添加项</param>
    295         public void Enqueue(T item)
    296         {
    297             lock (syncRoot)
    298             {
    299                 //新进队列项
    300                 queue.Enqueue(item);
    301                 //当前处理队列正在运行时
    302                 if (isRunning)
    303                 {
    304                     //清空队列集合重新创建新的线程
    305                     RegenerateIfDisposed();
    306                     //取出一个等待的线程
    307                     WorkerThread firstThread = threads.Where(t => !t.IsSignaled).FirstOrDefault();
    308                     //存在则运行它
    309                     if (firstThread != null) firstThread.Signal();
    310                 }
    311             }
    312         }
    313 
    314         /// <summary>
    315         /// 从ProcessQueue中检索下一个未处理的项目,而不删除它
    316         /// </summary>
    317         /// <returns>The next unprocessed item in the ProcessQueue</returns>
    318         public T Peek()
    319         {
    320             lock (syncRoot)
    321             {
    322                 return queue.Peek();
    323             }
    324         }
    325 
    326         /// <summary>
    327         /// 返回一个包含ProcessQueue中所有未处理项目的数组
    328         /// </summary>
    329         /// <returns></returns>
    330         public T[] ToArray()
    331         {
    332             lock (syncRoot)
    333             {
    334                 return queue.ToArray();
    335             }
    336         }
    337 
    338         /// <summary>
    339         /// 将ProcessQueue的容量设置为其包含的项目的实际数量,除非该数量超过当前容量的90%。
    340         /// </summary>
    341         public void TrimExcess()
    342         {
    343             lock (syncRoot)
    344             {
    345                 queue.TrimExcess();
    346             }
    347         }
    348 
    349         #region IEnumerable<T> Members
    350 
    351         public IEnumerator<T> GetEnumerator()
    352         {
    353             return queue.GetEnumerator();
    354         }
    355 
    356         #endregion
    357 
    358         #region IEnumerable Members
    359 
    360         IEnumerator IEnumerable.GetEnumerator()
    361         {
    362             return queue.GetEnumerator();
    363         }
    364 
    365         #endregion
    366 
    367         #region ICollection Members
    368 
    369         void ICollection.CopyTo(Array array, int index)
    370         {
    371             lock (syncRoot)
    372             {
    373                 ((ICollection)queue).CopyTo(array, index);
    374             }
    375         }
    376 
    377         public int Count
    378         {
    379             get
    380             {
    381                 lock (syncRoot)
    382                 {
    383                     return queue.Count;
    384                 }
    385             }
    386         }
    387 
    388         bool ICollection.IsSynchronized
    389         {
    390             get { return true; }
    391         }
    392 
    393         object ICollection.SyncRoot
    394         {
    395             get { return syncRoot; }
    396         }
    397 
    398         #endregion
    399 
    400         #region IDisposable Members
    401 
    402         public void Dispose()
    403         {
    404             Dispose(true);
    405         }
    406 
    407         private void Dispose(bool disposing)
    408         {
    409             if (disposing)
    410             {
    411                 foreach (WorkerThread thread in threads) thread.Abort();
    412             }
    413 
    414             isDisposed = true;
    415         }
    416 
    417         #endregion
    418 
    419         /// <summary>
    420         /// 封装.NET Thread对象并管理与控制其行为相关联的WaitHandles
    421         /// </summary>
    422         private class WorkerThread
    423         {
    424             private ManualResetEvent abortEvent;
    425             private ManualResetEvent signalEvent;
    426             private ProcessQueue<T> queue;
    427 
    428             private Thread thread;
    429 
    430             public WorkerThread(ProcessQueue<T> queue)
    431             {
    432                 abortEvent = new ManualResetEvent(false);
    433                 signalEvent = new ManualResetEvent(false);
    434                 this.queue = queue;
    435 
    436                 thread = new Thread(ThreadProc);
    437                 thread.Name = "ProcessQueue Worker ID " + thread.ManagedThreadId;
    438             }
    439 
    440             /// <summary>
    441             /// 运行当前线程
    442             /// </summary>
    443             public void Start()
    444             {
    445                 thread.Start();
    446             }
    447 
    448             /// <summary>
    449             /// 终止当前线程
    450             /// </summary>
    451             public void Abort()
    452             {
    453                 abortEvent.Set();
    454 
    455                 thread.Join();
    456             }
    457 
    458             /// <summary>
    459             /// 清除信号WaitHandle,导致线程完成当前的迭代后暂停
    460             /// </summary>
    461             public void Pause()
    462             {
    463                 signalEvent.Reset();
    464             }
    465 
    466             /// <summary>
    467             /// 设置信号WaitHandle,等待线程使其恢复运行(如果暂停)
    468             /// </summary>
    469             public void Signal()
    470             {
    471                 signalEvent.Set();
    472             }
    473 
    474             public bool IsSignaled
    475             {
    476                 get { return signalEvent.WaitOne(0); }
    477             }
    478 
    479             /// <summary>
    480             /// ThreadProc 总线程方法由一个无限循环组成,在发出中止事件时退出
    481             /// </summary>
    482             private void ThreadProc()
    483             {
    484                 WaitHandle[] handles = new WaitHandle[] { signalEvent, abortEvent };
    485 
    486                 while (true)
    487                 {
    488                     //等待指定数组中的任一元素收到信号
    489                     switch (WaitHandle.WaitAny(handles))
    490                     {
    491                         case 0: // signal
    492                             {
    493                                 ProcessItems();
    494                             }
    495                             break;
    496                         case 1: // abort
    497                             {
    498                                 return;
    499                             }
    500                     }
    501                 }
    502             }
    503 
    504             /// <summary>
    505             /// 处理项目
    506             /// </summary>
    507             private void ProcessItems()
    508             {
    509                 T item;
    510                 //从队列中取出一项,这是一个同步的过程
    511                 while (queue.TryDequeue(out item))
    512                 {
    513                     //处理队列项
    514                     queue.ProcessItem(item);
    515                     //如果当前实例收到信号,则为 true;否则为 false。
    516                     //等待当前队列完成 在 调用 signalEvent.Set() 或者 abortEvent.Set() 时 
    517                     if (!signalEvent.WaitOne(0) || abortEvent.WaitOne(0)) return;
    518                 }
    519                 //线程状态设置为非终止状态
    520                 signalEvent.Reset();
    521             }
    522         }
    523     }
    524 }
    View Code
  • 相关阅读:
    Python报错:TypeError: data type not understood
    外星人入侵-01
    python界面编程
    python 之魔法方法
    软工第二次结对作业
    软工结对第一次作业
    软件工程第三次作业
    软件工程第二次作业
    我的第一篇博客
    python字符串的基本操作
  • 原文地址:https://www.cnblogs.com/mschen/p/11602178.html
Copyright © 2011-2022 走看看