有些人会觉得多线程无非是,有多少任务就启动多少线程,CreadThread,执行完了自己结束就释放资源了,其实不然。多线程是需要管理的,线程的启动、执行、等待和结束都需要管理,线程间如何通信,如何共享内存数据,如果保证线程间的同步,避免死锁,都要考虑。
以前做项目时,用过 Codeproject 上一个线程管理的代码 Work Queue[1],很好用,也是不错的学习资料,但对于多线程初学者也不是一眼就能看懂的,所以今天打算对这个代码做个解读笔记,可为其它学习者提供一个参考,也深化自己对多线程的理解。关于该类的使用可直接访问原网址。
这个多线程管理类为 CWorkQueue,使用的是生产者-消费者模式。CWorkQueue 创建的每个线程都是一个消费者,生产者是类成员 m_pWorkItemQueue。生产者资源由外界使用者通过 InsertWorkItem 成员函数注入,然后通过 ReleaseSemaphore 通知消费者(即线程)处理,消费者线程 ThreadFunc 自创建起始就一直在等待,等待生产者通知,接到有任务通知后,线程就执行任务,执行完毕后继续等待。
主线程通过两种机制来跟已建立的新线程通信,信号量 Semaphore 和 事件 Event,Semaphore 用于通知新线程执行任务,Event 用于通知新线程结果自己。
1、信号量 Semaphore
由上可知,生产者和消费者使用的通信机制是信号量 Semaphore。信号量 Semaphore 只有两种状态,触发和未触发,决定状态的是当前资源数量,数量大于0表示信号量处于触发,等于0表示资源已经耗尽故信号量处于末触发。影响当前资源数量的函数有几个,下面依次介绍。
首先 CWorkQueue 在 Create 函数中创建了 Semaphore 信号量,并存在成员变量 m_phSincObjectsArray[SEMAPHORE_INDEX] 中,信号量的初始计数为 0,最大计数为 2147483647L。
m_phSincObjectsArray[SEMAPHORE_INDEX] = CreateSemaphore(NULL,0,LONG_MAX,NULL); //创建Semaphore对象
每当用户调用 InsertWorkItem 将工作任务插入到队列(m_pWorkItemQueue->push(pWorkItem);)后,都要调用 ReleaseSemaphore,这会增加信号量的当前资源计数,该程序里试加 1。
if (!ReleaseSemaphore(m_phSincObjectsArray[SEMAPHORE_INDEX],1,NULL)) { assert(false); return false; }
在 CWorkQueue 在 Create 函数中还同时创建了线程,这些线程 ThreadFunc 自运行时就处于 WaitForMultipleObjects 状态。等待函数会检查信号量的当前资源计数,如果大于0(即信号量处于触发状态),减1后返回让调用线程继续执行。一个线程可以多次调用等待函数来减小信号量。线程中设置了无限循环 for(;;),因此当线程执行完当前处理后会继续等待信号量。
//等待两个事件 dwWaitResult = WaitForMultipleObjects(NUMBER_OF_SYNC_OBJ,pWorkQueue->m_phSincObjectsArray,FALSE,INFINITE);
2、事件 Event
在 Create 函数中创建信号量 Semaphore 的同时,还创建了事件, 并存在成员变量中,主要用于通知各线程结束执行。
m_phSincObjectsArray[ABORT_EVENT_INDEX] = CreateEvent(NULL,TRUE,FALSE,NULL); //创建event 事件对象,初始化时为无信号状态,使用手动重置为无信号状态
当用户调用 Destroy 函数,会调用 SetEvent 触发事件,然后 WaitForMultipleObjects 等待所有线程结束(注意到第三个参数为 true)。然后线程函数 ThreadFunc 探测到这个事件,会从 WaitForMultipleObjects(注意到第三个参数为 false) 返回,执行下面 ABORT_EVENT_INDEX 处理,结束线程。当所有的线程结束,接着 Destroy 的 WaitForMultipleObjects 返回,做些清理的工作即结束线程管理。
因为信号量和事件都是 HANDLE,所以该类中把这两个存在一个数组 m_phSincObjectsArray,通过枚举型变量来标识。
3、生产者 m_pWorkItemQueue
用户通过 InsertWorkItem 插入工作任务到队列。
bool CWorkQueue::InsertWorkItem(WorkItemBase* pWorkItem)
工作任务由要线程执行的函数,和 Abort 函数,Abort 函数用于最后调用 Destroy 突然终止线程时,执行剩余线程的清理工作。
class WorkItemBase { virtual void DoWork(void* pThreadContext) = 0; virtual void Abort () = 0; friend CWorkQueue; };
WorkItemBase 需要用户来继承,并实现两个虚函数。
class SpecificWorkItem : public WorkItemBase { void DoWork(void* pThreadContext); void Abort(); //memeber variables needed here }; void SpecificWorkItem::DoWork(void* pThreadContext) { //Notify Start //proccessing done here //Notify Finish //free all that was occupied } void SpecificWorkItem::Abort() { //Notify aborted //free all that was occupied }
这样在用户执行 InsertWorkItem 插入任务到队列后,通过 ReleaseSemaphore 激发信号量 Semaphore,线程 ThreadFunc 会在 WaitForMultipleObjects 中探测到 Semaphore 的激发状态,然后获得队列中的任务,并删除队列中该任务防止其它线程重复执行,然后执行用户的任务 pWorkItem->DoWork(pThreadData),此处 pThreadData 我没用到,为 NULL 。
4、消费者 ThreadFunc
消费者线程 ThreadFunc 在 Create 中创建,返回在线程句柄存储在类成员数组 m_phThreads 中,供 Destroy 函数等待线程结束时使用。
DWORD dwThreadId; PTHREAD_CONTEXT pThreadsContext ; //创建所有的线程 for(i = 0 ; i < nNumberOfThreads ; i++ ) { //初始化每个线程的上下文,用于传递给线程函数 pThreadsContext = new THREAD_CONTEXT; pThreadsContext->pWorkQueue = this; //传递当前对象的指针,使新建线程可以访问到主线程中的生产者资源 pThreadsContext->pThreadData = ThreadData == NULL? NULL : ThreadData[i]; //创建线程 m_phThreads[i] = CreateThread(NULL, 0, CWorkQueue::ThreadFunc, pThreadsContext, 0, //为0表示线程创建之后立即就可以进行调度 &dwThreadId); }
5、由消费者线程去访问生产者资源可以看出,同一个进程的线程可共享内存。用户可以通过 SpecificWorkItem 构造函数把数据和函数传进成员变量和成员函数,通过 InsertWorkItem 把 SpecificWorkItem 传进 CWorkQueue 的 m_pWorkItemQueue,即生产者任务队列,然后消费者线程会获得通知,并访问生产者提取任务,执行函数和数据。
参考资料:
1、http://www.codeproject.com/Articles/3607/Work-Queue