11.3 私有的线程池
11.3.1 创建和销毁私有的线程池
(1)进程默认线程池
当调用CreateThreadpoolwork、CreateThreadpoolTimer、CreateThreadpoolWait或CreateThreadpoolIo,并使传入参数PTP_CALLBACK_ENVIRON设为NULL时,那么所有的工作项将被添加到进程默认的线程池。一般这个默认的线程池能满足大多数应用程序的要求。其生命期与进程相同,在进程终止的时候,Windows负责线程池的清理和销毁工作。
(2)创建私有线程池:PTP_POOL CreateThreadpool(PVOID reserved); //参数为NULL
(3)销毁私有线程池: CloseThreadpool(PTP_POOL pThreadpool);
11.3.2 定制私有线程池
(1)设置线程池中线程的最大数量和最小数量
BOOL SetThreadpoolThreadMinimum(PTP_POOL pThreadpool,DWORD cthrdMin);
BOOL SetThreadpoolThreadMaximum(PTP_POOL pThreadpool,DWORD cthrdMax);
【注意】默认线程池的最小数量为1,最大数量为500
11.3.3 线程池的回调环境(callback environment)——TP_CALLBACK_ENVIRON结构体
(1)初始化回调环境:InitializeThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe);
【说明】初始化的作用是回调环境结构体中的将Version设为1,其余为0。
(2)将回调环境关联到指定的线程池中
SetThreadpoolCallbackpool(PTP_CALLBACK_ENVIRON pcbe,PTP_POOL pThreadpool);
【注意】如果不调用该函数,则pbce为NULL。此时向线程池添加的工作项被会添加到进程默认的线程池中。否则被添加到pThreadpool指定的线程池中。
(3)销毁回调环境:DestroyThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe)
11.3.4 清理组——得体地销毁线程池
(1)创建清理组:PTP_CLEANUP_GROUP CreateThreadpoolCleanupGroup();
(2)将清理组与线程池关联:SetThreadpoolCallbackCleanupGroup函数
参数 |
描述 |
PTP_CALLBACK_ENVIRON pcbe |
清理组与线程池关联是通过这个结构体关联起来的(注意,这个结构中内部PTP_POOL字段指向了我们创建的线程池) |
PTP_CLEANUP_GROUP ptpcg |
指向由CreateThreadpoolCleanupGroup创建的清理 组。 |
PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng |
CleanupGroupCancelCallback回调函数。当调用清理组CloseThreadpoolCleanupGroupmembers函数,并为bCancelPendingCallbacks传入TRUE来清除清理组时,如果此时尚有未被处理的工作项时,则要调用这个回调函数,其原型为: VOID CALLBACK CleanupGroupCancelCallback( PVOID pvObjectContext,PVOID pvCleanupContext) 其中pvObjectContext是通过CreateThreadpool*函数传入的pvContext,参数pvCleanupContext是由CloseThreadpoolCleanupGroupMembers的pvCleanupContext参数传入的。 |
备注: ①每当调用CreateThreadpool*等函数(如CreateThreadpoolTimer)时,如果其最后的回调环境参数不为NULL,那么所创建的项会被添加到回调环境的清理组中,表示线程池中新添加了一项,需要潜在的清理 。 ②如果调用CloseThreadpool*(如CloseThreadpoolTimer),那么等于隐式将对应的项从清理组中删除。 |
(3)释放清理组成员:CloseThreadpoolCleanupGroupMembers(相当为清理组的每个工作项调用CloseThreadpool*函数)
参数 |
描述 |
PTP_CLEANUP_GROUP ptpcg |
指向由CreateThreadpoolCleanupGroup创建的清理 组。 |
BOOL bCancelPendingCallbacks |
是否取消线程池是剩余的工作项。 ①如果bCancelPendingCallbacks指定为TRUE,则所有已提交但尚未处理工作项将被直接取消。如果之前在SetThreadpoolCallbackCleanupGroup的pfng指定了回调函数,那么对每一个被取消的项,回调函数pfng都要被调用一次。 ②如果bCancelPendingCallbacks为FALSE,则在函数返回之前,线程池会花时间来处理队列中剩余的工作项。(这时CleanupGroupCancelCallback函数是不会被调用的,因此pvCleanupContext可设为NULL) |
PVOID pvCleanupContext |
传给CleanupGroupCancelCallback回调函数的额外参数。如果bCancelPendingCallbacks为FALSE,则CleanupGroupCancelCallback不会被调用,所以该参数可以设为NULL)。 |
备注:主控线程调用该函数时,该函数会一直等待,直到线程池中所有剩余的工作项都处理完毕才返回。(这点与WaitForThreadpool*函数(如WaitForThreadpoolWork)相似。 |
(4)销毁清理组:ClosethreadpoolCleanupGroup将清理组本身所占内存释放!
【PrivateBatch程序】私有线程池使用的演示
/************************************************************************* Module: PrivateBatch.cpp Notices: Copyright(c) 2008 Jeffrey Richter & Christophe Nasarre *************************************************************************/ #include "../../CommonFiles/CmnHdr.h" #include "resource.h" #include <tchar.h> #include <strsafe.h> ////////////////////////////////////////////////////////////////////////// //全局变量 HWND g_hDlg = NULL; PTP_POOL g_pThreadPool = NULL; //私有线程池 TP_CALLBACK_ENVIRON g_callbackEnv; PTP_CLEANUP_GROUP g_pCleanupGroup = NULL; HANDLE g_hPrintfEvent = NULL; HANDLE g_hEmailEvent = NULL; //自定义的消息 #define WM_APP_COMPLETED (WM_APP + 123) ////////////////////////////////////////////////////////////////////////// typedef struct _tagCallBackData{ HANDLE finishEvent; TCHAR szAction[MAX_PATH]; }CALLBACK_DATA, *PCALLBACK_DATA; typedef struct _tagSYNCRO_DATA{ //同步数据结构 DWORD Count; //内核对象的数量 HANDLE* Handles;//内核对象的句柄数组 }SYNCHRO_DATA, *PSYNCHRO_DATA; ////////////////////////////////////////////////////////////////////////// void AddMessage(PCTSTR pszFormat,...) { va_list argList; va_start(argList, pszFormat); TCHAR szMsg[20*1024]; _vstprintf_s(szMsg, _countof(szMsg), pszFormat, argList); HWND hListBox = GetDlgItem(g_hDlg, IDC_LB_STATUS); ListBox_SetCurSel(hListBox, ListBox_AddString(hListBox, szMsg)); va_end(argList); } ////////////////////////////////////////////////////////////////////////// int lastWorkTime = 0; //注意:原程序这里少于NTAPI调用约定,会出错! VOID NTAPI SimpleHandler(PTP_CALLBACK_INSTANCE pInstance, PVOID Context){ //确保有效的随机数 int workTime = ((rand() % 4) + 1); //产生1-4之间的随机数 while (workTime == lastWorkTime){ workTime = ((rand() % 4) + 1); } lastWorkTime = workTime; AddMessage(TEXT("线程[%u] 执行 "%s" 将耗时大约%u秒。"), GetCurrentThreadId(), ((PCTSTR)Context), workTime); //模拟其他工作 Sleep(workTime * 1000); AddMessage( TEXT("线程[%u] 执行 "%s" 结束."), GetCurrentThreadId(), (PCTSTR)Context); delete[] ((PTSTR)Context); } ////////////////////////////////////////////////////////////////////////// VOID NTAPI EndOfBatchCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID Context) { PSYNCHRO_DATA pData = (PSYNCHRO_DATA)Context; AddMessage(TEXT("线程[%u] 正在等待批处理完成..."), GetCurrentThreadId()); DWORD BatchItemsCount = pData->Count; do{ DWORD dwRet = WaitForMultipleObjects(pData->Count, pData->Handles,FALSE,1000); switch (dwRet) { case WAIT_TIMEOUT: break; default: //任何一种操作结束时 BatchItemsCount--; break; } } while (BatchItemsCount > 0); //通知UI线程任务己经完成 PostMessage(g_hDlg, WM_APP_COMPLETED, 0, (LPARAM)pData); } ////////////////////////////////////////////////////////////////////////// VOID NTAPI ActionCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID Context){ PCALLBACK_DATA pData = (PCALLBACK_DATA)Context; //根据操作的名称(如"邮件"或"打印...")来计算一个伪时间 DWORD dwDuration = (DWORD)_tcslen(pData->szAction); AddMessage(TEXT("线程[%u] 执行 "%s" 将耗时大约%u秒"), GetCurrentThreadId(),pData->szAction,dwDuration); //模拟一些工作 Sleep(dwDuration * 1000); AddMessage(TEXT("线程[%u] 执行 "%s" 完毕!"), GetCurrentThreadId(), pData->szAction); //当回调函数终止时,请求线程池触发相应的同步对象(这里会触发相应的 //g_hMailEvent或g_hPrintEvent对象 SetEventWhenCallbackReturns(pInstance, pData->finishEvent); //SetEvent(pData->finishEvent); //清除Context参数指定的对象(外部传入的) delete (pData); } ////////////////////////////////////////////////////////////////////////// void OnStartBatch(){ if (g_hEmailEvent != NULL){ AddMessage(TEXT("旧的批处理任务仍在运行中...")); return; } AddMessage(TEXT("线程[%u]----开始新的批处理任务----"), GetCurrentThreadId()); //创建具名的同步事件对象 g_hEmailEvent = CreateEvent(NULL, FALSE,FALSE,TEXT("EmailEvent")); //自动,无信号 g_hPrintfEvent = CreateEvent(NULL, FALSE, FALSE, TEXT("PrintfEvent")); //定义线程池要处理的工作项 //1.跟踪事件对象,因为每一步操作完成时,相应的对象会被触发 PSYNCHRO_DATA pSyncData = new SYNCHRO_DATA(); pSyncData->Count = 2; pSyncData->Handles = new HANDLE[2]; pSyncData->Handles[0] = g_hEmailEvent; pSyncData->Handles[1] = g_hPrintfEvent; TrySubmitThreadpoolCallback((PTP_SIMPLE_CALLBACK)EndOfBatchCallback, pSyncData, &g_callbackEnv); //2.模拟发送邮件操作 PCALLBACK_DATA pData = new CALLBACK_DATA(); pData->finishEvent = g_hEmailEvent; StringCchCopy(pData->szAction, MAX_PATH, TEXT("发送邮件")); //_tcscpy_s(pData->szAction, MAX_PATH, TEXT("邮件")); TrySubmitThreadpoolCallback((PTP_SIMPLE_CALLBACK)ActionCallback, pData, &g_callbackEnv); //3.模拟打印操作 pData = new CALLBACK_DATA(); pData->finishEvent = g_hPrintfEvent; StringCchCopy(pData->szAction, MAX_PATH, TEXT("打印操作...")); //_tcscpy_s(pData->szAction, MAX_PATH, TEXT("打印中...")); TrySubmitThreadpoolCallback((PTP_SIMPLE_CALLBACK)ActionCallback, pData,&g_callbackEnv); } ////////////////////////////////////////////////////////////////////////// //销毁线程池 void OnDeletePool(){ //注意:DestroyThreadpoolEnviroment函数并没做什么, // 要求我们自己释放线程池 CloseThreadpoolCleanupGroupMembers(g_pCleanupGroup, FALSE, NULL); CloseThreadpoolCleanupGroup(g_pCleanupGroup); if (g_pThreadPool != NULL){ CloseThreadpool(g_pThreadPool); } //销毁环境块,此版本的DestroyThreadpoolEnviroment并没做什么 //也许下一个版本会提供,这里直接调用即可 DestroyThreadpoolEnvironment(&g_callbackEnv); g_pThreadPool = NULL; } //创建线程池 void OnCreatePool(){ //有效性检验 if (g_pThreadPool != NULL){ OnDeletePool(); } //创建私有线程池 g_pThreadPool = CreateThreadpool(NULL); if (g_pThreadPool == NULL){ MessageBox(NULL, TEXT("无法创建私有线程池!"), TEXT("初始化失败!"), MB_OK | MB_ICONERROR); return; } //定制线程池,最小线程数为2,最大为4. SetThreadpoolThreadMaximum(g_pThreadPool, 4); if (!SetThreadpoolThreadMinimum(g_pThreadPool,2)){ MessageBox(NULL, TEXT("设置线程池最小线程数失败!"), TEXT("初始化失败!"), MB_OK | MB_ICONERROR); return; } //重置回调环境块 //注意:这是一个内联函数,调用以后各字段如下 // CallbackEnviron->Version =1; // CallbackEnviron->Pool = NULL; // CallbackEnviron->CleanupGroup = NULL; // CallbackEnviron->CleanupGroupCancelCallback = NULL; // CallbackEnviron->RaceDll = NULL; // CallbackEnviron->ActivationContext = NULL; // CallbackEnviron->FinalizationCallback = NULL; // CallbackEnviron->u.Flags = 0; //初始化回调环境块 InitializeThreadpoolEnvironment(&g_callbackEnv); //将线程池与回调环境关联 //注意:该调用会使得CallbackEnviro->Pool = Pool; SetThreadpoolCallbackPool(&g_callbackEnv, g_pThreadPool); //创建资源清理器 g_pCleanupGroup = CreateThreadpoolCleanupGroup(); //将清理器与环境块关联 SetThreadpoolCallbackCleanupGroup(&g_callbackEnv, g_pCleanupGroup,NULL); } //运行 void OnRun(){ srand(GetTickCount()); for (int current = 1; current <= 6;current++){ PTSTR pRequest = new TCHAR[MAX_PATH]; StringCchPrintf(pRequest, MAX_PATH, TEXT("请求%u"), current); if (TrySubmitThreadpoolCallback((PTP_SIMPLE_CALLBACK)SimpleHandler, pRequest,&g_callbackEnv)){ AddMessage(TEXT("线程[%u] 请求%u 被提交"),GetCurrentThreadId(), current); } else { AddMessage(TEXT("线程[%u] 请求%u 不能被提交"),GetCurrentThreadId(), current); } } } ////////////////////////////////////////////////////////////////////////// BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){ //保存对话框句柄 g_hDlg = hwnd; return TRUE; } void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){ switch (id) { case IDOK: case IDCANCEL: EndDialog(hwnd, id); break; case IDC_BTN_START_BATCH: OnStartBatch(); break; case IDC_BTN_CREATE_POOL: OnCreatePool(); break; case IDC_BTN_RUN: OnRun(); break; case IDC_BTN_DELETE_POOL: OnDeletePool(); break; } } ////////////////////////////////////////////////////////////////////////// INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){ switch (uMsg){ chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog); chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand); case WM_APP_COMPLETED: { AddMessage(TEXT("线程[%u] ----全部批处理任务完成!----"), GetCurrentThreadId()); //别忘了清除同步对象 PSYNCHRO_DATA pData = (PSYNCHRO_DATA)lParam; for (DWORD current = 0; current < pData->Count;current++){ if (!CloseHandle(pData->Handles[current])){ AddMessage(TEXT("线程[%u] 关闭句柄%u时出现错误[%u])"), GetCurrentThreadId(),current,GetLastError()); } } g_hEmailEvent = NULL; g_hPrintfEvent = NULL; delete (pData); } break; } return FALSE; } ////////////////////////////////////////////////////////////////////////// int APIENTRY _tWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance, LPTSTR lpCmdLine, int nShowCmd){ UNREFERENCED_PARAMETER(hPrevInstance); DialogBoxParam(hInstance, MAKEINTRESOURCE(IDD_MAIN), NULL, Dlg_Proc, _ttoi(lpCmdLine)); return 0; }
//resource.h
//{{NO_DEPENDENCIES}} // Microsoft Visual C++ 生成的包含文件。 // 供 11_PrivateBatch.rc 使用 // #define IDD_MAIN 101 #define IDC_BTN_START_BATCH 1001 #define IDC_BTN_RUN 1002 #define IDC_LB_STATUS 1003 #define IDC_BTN_CREATE_POOL 1004 #define IDC_BTN_DELETE_POOL 1005 // Next default values for new objects // #ifdef APSTUDIO_INVOKED #ifndef APSTUDIO_READONLY_SYMBOLS #define _APS_NEXT_RESOURCE_VALUE 102 #define _APS_NEXT_COMMAND_VALUE 40001 #define _APS_NEXT_CONTROL_VALUE 1004 #define _APS_NEXT_SYMED_VALUE 101 #endif #endif
//PrivateBatch.rc
// Microsoft Visual C++ generated resource script. // #include "resource.h" #define APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // // Generated from the TEXTINCLUDE 2 resource. // #include "winres.h" ///////////////////////////////////////////////////////////////////////////// #undef APSTUDIO_READONLY_SYMBOLS ///////////////////////////////////////////////////////////////////////////// // 中文(简体,中国) resources #if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS) LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED #ifdef APSTUDIO_INVOKED ///////////////////////////////////////////////////////////////////////////// // // TEXTINCLUDE // 1 TEXTINCLUDE BEGIN "resource.h " END 2 TEXTINCLUDE BEGIN "#include ""winres.h"" " "