在移动交通流调查项目的一个算法分析程序中,碰到一个业务问题:用户采集上传的基站定位数据需要进行分析预处理,方案是先按预定格式解析文件并从中提取出成百上千个基站定位数据记录,并合并相同的基站点,根据获取到的基站位置信息作为参数,去请求google 基站定位 api,从而得到对应的基站定位经纬度等信息,接下来再加上华工的算法分析。
在执行华工算法分析逻辑之前,调用谷歌api这一步必需全部完成;网络请求是个耗时的过程,故对每一个请求开启单独的线程(同时请求可能数百个,这里通过Semaphore信号量来控制每次发出请求的最大数,该部分的讨论不再本话题之类)。
问题出来了,那么如何知道所有的网络请求全部完成了,可以进行下一步算法分析呢?答案是利用前面讲的ManualResetEvent来处理;于是有下面的写法
1
2
3
4
5
|
//针对每个线程 绑定初始化一个ManualResetEvent实例 ManualResetEvent doneEvent = new ManualResetEvent( false ); //通过ThreadPool.QueueUserWorkItem(网络请求方法HttpRequest,doneEvent ) 来开启多线程 //将等待事件一一加入事件列表 |
1
2
3
4
5
6
7
8
9
10
11
12
|
List<ManualResetEvent> listEvent = new List<ManualResetEvent>(); for ( int i=0;i<请求线程数;i++){ listEvent.Add(doneEvent); } //主线程等待网络请求全部完成 WaitHandle.WaitAll(listEvent.ToArray()); //....接下去的算法分析 //在网络请求方法HttpRequest的子线程中调用 doneEvent.Set(); //通知主线程 本网络请求已经完成 |
运行好像没有问题,程序按原定计划执行;但是当线程数大于64个之后抛出异常
WaitHandles must be less than or equal to 64
原来WaitHandle.WaitAll(listEvent.ToArray()); 这里listEvent线程数不能超过64个
以前解决方法:
下面是吴建飞以前的方案:既然WaitHandle.WaitAll方法只能唤醒64个ManualResetEvent对象,那么就采用
1
|
List<List<ManualResetEvent>> _listLocEventList = new List<List<ManualResetEvent>>(); |
采用这种复杂集合;集合的每个元素也是一个集合(内部每个集合包含最大64个ManualResetEvent对象);和上面一样 把每个线程相关的ManualResetEvent对象添加到该集合;
//主线程等待网络请求全部完成
1
2
3
4
|
foreach (List<ManualResetEvent> listEvent in _listLocEventList) { WaitHandle.WaitAll(listEvent.ToArray()); } |
该方案运用起来比较复杂,而且会导致创建大量的ManualResetEvent对象;
现在的设计目标是这种对文件的分析是多任务同时进行的,也就是说会产生的ManualResetEvent对象List<List<ManualResetEvent>>.Size() * 任务数(N个文件上传)
改进的解决方法:
原理:封装一个ManualResetEvent对象,一个计数器current,提供SetOne和WaitAll方法;
主线程调用WaitAll方法使ManualResetEvent对象等待唤醒信号;
各个子线程调用setOne方法 ,setOne每执行一次current减1,直到current等于0时表示所有子线程执行完毕 ,调用ManualResetEvent的set方法,这时主线程可以执行WaitAll之后的步骤。
目标:减少ManualResetEvent对象的大量产生和使用的简单性。
在这里我写了个封装类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
/******************************************************************************** * Copyright © 2001 - 2010Comit. All Rights Reserved. * 文件:MutipleThreadResetEvent.cs * 作者:杨柳 * 日期:2010年11月13日 * 描述:封装 ManualResetEvent ,该类允许一次等待N(N>64)个事件执行完毕 * * 解决问题:WaitHandle.WaitAll(evetlist)方法最大只能等待64个ManualResetEvent事件 * *********************************************************************************/ using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace TestMutipleThreadRestEvent { /// <summary> /// 封装ManualResetEvent /// </summary> public class MutipleThreadResetEvent : IDisposable { private readonly ManualResetEvent done; private readonly int total; private long current; /// <summary> /// 构造函数 /// </summary> /// <param name="total">需要等待执行的线程总数</param> public MutipleThreadResetEvent( int total) { this .total = total; current = total; done = new ManualResetEvent( false ); } /// <summary> /// 唤醒一个等待的线程 /// </summary> public void SetOne() { // Interlocked 原子操作类 ,此处将计数器减1 if (Interlocked.Decrement( ref current) == 0) { //当所以等待线程执行完毕时,唤醒等待的线程 done.Set(); } } /// <summary> /// 等待所以线程执行完毕 /// </summary> public void WaitAll() { done.WaitOne(); } /// <summary> /// 释放对象占用的空间 /// </summary> public void Dispose() { ((IDisposable)done).Dispose(); } } } |
注释写的很清楚了:本质就是只通过1个ManualResetEvent 对象就可以实现同步N(N可以大于64)个线程
下面是测试用例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace TestMutipleThreadRestEvent { /// <summary> /// 测试MutipleThreadResetEvent /// </summary> class Program { static int i = 0; /// <summary> /// 主方法 /// </summary> /// <param name="args">参数</param> static void Main( string [] args) { //假设有100个请求线程 int num = 100; //使用 MutipleThreadResetEvent using (var countdown = new MutipleThreadResetEvent(num)) { for ( int i=0;i<num;i++) { //开启N个线程,传递MutipleThreadResetEvent对象给子线程 ThreadPool.QueueUserWorkItem(MyHttpRequest, countdown); } //等待所有线程执行完毕 countdown.WaitAll(); } Console.WriteLine( "所有的网络请求以及完毕,可以继续下面的分析..." ); Console.ReadKey(); } /// <summary> /// 假设的网络请求 /// </summary> /// <param name="state">参数</param> private static void MyHttpRequest( object state) { // Thread.Sleep(1000); Console.WriteLine(String.Format( "哈哈:{0}" ,++i)); MutipleThreadResetEvent countdown = state as MutipleThreadResetEvent; //发送信号量 本线程执行完毕 countdown.SetOne(); } } } |
输出: