已经一年多的时间没有使用多线程了.最近需要做一个异步发邮件的功能,为了给同事写一个例子,想起以前写的爬虫的时候,
一个带我的人给了我一个通用的模板类, 我觉得那个例子很好,于是我根据自己的理解,写出了如下的代码.
模型描述
首先是一个通用的 计数器.包括 当前在运行的线程数Current,执行成功的数量Success,执行失败的数量Error,以及总完成数量Finish.
为了方便大家理解,我还是做一下简要的说明.大致过程是这样的: 首先有一批任务进来,需要把这些任务用多线程来处理.(我这里使用的是线程池.)
关于池的概念大家应该埠陌生.比如 数据库的连接池.
任务一个一个的分配,每个任务都是开启一个子线程单独执行的,一边分配任务,一边执行任务.在不停的分配任务的时候,会出现这样一种情况,
就是任务已经分配出去很多了,但是执行的不够快,这个时候需要主线程等待一下,过一会再分配新的任务.
我将上面的描述用一个日常生活中的例子来做说明: 一个小饭店只有10个座位,10点开始营业,到了12点吃饭的时间,外面来了100个客人,
挨个挨个的进入饭店就餐.
第一个人进,占位置1个,第二个人进入, 占位置1个, 第三个人进入,占位置1个,
第四个人进入占位置1个,第五个人进入占位置1个, 然后第一个人打包的走了,有个位置空出来了,
现在已经使用了 4个位置,至于进来以后坐什么地方,这个不用管.
第六个人进入占了位置1个,...第九个人进来了,占位置1个,第2个人吃完了,第十个人进来了,第是一个人进来了,第十二个人进来了,
第三个人吃完了,第十三个人进来了,
第十四个人想进来,抱歉,已经坐满了,要等待,过1分钟再看有没有位置,如果没有人离开,外面的人一直不能进来,
过了五分钟, 再看发现一个桌子四个人都离开了,于是第十四个人进来了,第十五个人进来了....后面的类似前面的...
废话啰嗦完了,上代码
先上一个计数器读写的类
public sealed class ThreadCounter { private int current = 0; private int error = 0; private int success = 0; private int finish = 0; private static object writeObj = new object(); public int getCurrent() { lock (writeObj) return current; } public int getError() { lock (writeObj) return error; } public int getSuccess() { lock (writeObj) return success; } public int getFinish() { lock (writeObj) return finish; } public void setAddCurrent() { lock (writeObj) current++; } public void setSuccess() { lock (writeObj) { success++; current--; } } public void setError() { lock (writeObj) { error++; current--; } } public void setFinish() { lock (writeObj) finish++; } }
主体运行业务处理类
新建类 RunManager,定义一个私有字段,一个随机数生成类,一个线程计数类
1 public class RunManager 2 { 3 private static int MaxThreadCount = 10; 4 5 ThreadCounter tc = new ThreadCounter(); 6 7 Random random = new Random(); 8 }
新增模拟任务的初始化方法
/// <summary> /// 最大开启的线程数 /// </summary> /// <param name="threadCount"></param> public void RunStart(int threadMaxCount) { //最大开启的线程数 Console.WriteLine("设置最大线程数:" + threadMaxCount); MaxThreadCount = threadMaxCount; RunTask(100); }
线程池分配任务
1 public void RunTask(int taskCount) 2 { 3 string mss = ""; 4 string[] arr = new string[] {"|","/","-","\" }; 5 int j = 0; 6 while (true) 7 { 8 if (++j==4) 9 j = 0; 10 Console.Clear(); 11 Console.SetCursorPosition(1, 0); 12 Console.WriteLine("当前运行情况"); 13 Console.Write("["+arr[j]+"] "); 14 mss = string.Format("Query:{0} , Current:{1} , Success:{2} , Error:{3} , Finish:{4}", 15 taskCount, tc.getCurrent(),tc.getSuccess(), tc.getError(), tc.getFinish()); 16 Console.WriteLine(mss); 17 Thread.Sleep(500); 18 19 if (taskCount>0&&tc.getCurrent() < MaxThreadCount) 20 { 21 taskCount--; 22 tc.setAddCurrent(); 23 ThreadPool.UnsafeQueueUserWorkItem(RuanCallBack, 0); 24 } 25 else if (taskCount==0) 26 taskCount = random.Next(60, 125); 27 else if (tc.getCurrent() >= MaxThreadCount) 28 Thread.Sleep(6000); 29 }
30}
模拟线程运行所消耗的时间
1 public void RuanCallBack(object parms) 2 { 3 // 将传过来的参数, 整理为要查询的数据, 以及处理数据,写入文件,打包zip, 4 // 发送邮件通知用户下载 5 try 6 { 7 int t = random.Next(3, 7); 8 Thread.Sleep(t * 1000); 9 tc.setSuccess(); 10 } 11 catch (Exception) 12 { 13 tc.setError(); 14 } 15 finally 16 { 17 tc.setFinish(); 18 } 19 //Console.WriteLine("当前线程序号:" + parms.ToString() + " 线程停顿时间:" + t); 20 }
调用
static void Main(string[] args) { RunManager r = new RunManager(); r.RunStart(20); Console.Read(); Console.WriteLine("运行结束"); }
看看运行效果
Nice!
原代码下载: 下载