zoukankan      html  css  js  c++  java
  • 第11章 Windows线程池(3)_私有的线程池

    11.3 私有的线程池

    11.3.1 创建和销毁私有的线程池

     (1)进程默认线程池

          当调用CreateThreadpoolwork、CreateThreadpoolTimer、CreateThreadpoolWait或CreateThreadpoolIo,并使传入参数PTP_CALLBACK_ENVIRON设为NULL时,那么所有的工作项将被添加到进程默认的线程池。一般这个默认的线程池能满足大多数应用程序的要求。其生命期与进程相同,在进程终止的时候,Windows负责线程池的清理和销毁工作。

    (2)创建私有线程池:PTP_POOL CreateThreadpool(PVOID reserved); //参数为NULL

    (3)销毁私有线程池: CloseThreadpool(PTP_POOL pThreadpool);

    11.3.2 定制私有线程池

    (1)设置线程池中线程的最大数量和最小数量

       BOOL SetThreadpoolThreadMinimum(PTP_POOL pThreadpool,DWORD cthrdMin);

       BOOL SetThreadpoolThreadMaximum(PTP_POOL pThreadpool,DWORD cthrdMax);

       【注意】默认线程池的最小数量为1,最大数量为500

    11.3.3 线程池的回调环境(callback environment)——TP_CALLBACK_ENVIRON结构体

     

    (1)初始化回调环境:InitializeThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe);

        【说明】初始化的作用是回调环境结构体中的将Version设为1,其余为0。

    (2)将回调环境关联到指定的线程池中

      SetThreadpoolCallbackpool(PTP_CALLBACK_ENVIRON pcbe,PTP_POOL pThreadpool);

      【注意】如果不调用该函数,则pbce为NULL。此时向线程池添加的工作项被会添加到进程默认的线程池中。否则被添加到pThreadpool指定的线程池中。

    (3)销毁回调环境:DestroyThreadpoolEnvironment(PTP_CALLBACK_ENVIRON pcbe)

    11.3.4 清理组——得体地销毁线程池

    (1)创建清理组:PTP_CLEANUP_GROUP CreateThreadpoolCleanupGroup();

    (2)将清理组与线程池关联:SetThreadpoolCallbackCleanupGroup函数

    参数

    描述

    PTP_CALLBACK_ENVIRON pcbe

    清理组与线程池关联是通过这个结构体关联起来的(注意,这个结构中内部PTP_POOL字段指向了我们创建的线程池)

    PTP_CLEANUP_GROUP ptpcg

    指向由CreateThreadpoolCleanupGroup创建的清理 组。

    PTP_CLEANUP_GROUP_CANCEL_CALLBACK pfng

    CleanupGroupCancelCallback回调函数。当调用清理组CloseThreadpoolCleanupGroupmembers函数,并为bCancelPendingCallbacks传入TRUE来清除清理组时,如果此时尚有未被处理的工作项时,则要调用这个回调函数,其原型为:

    VOID CALLBACK CleanupGroupCancelCallback(

    PVOID pvObjectContext,PVOID pvCleanupContext)

    其中pvObjectContext是通过CreateThreadpool*函数传入的pvContext,参数pvCleanupContext是由CloseThreadpoolCleanupGroupMembers的pvCleanupContext参数传入的。

    备注:

    ①每当调用CreateThreadpool*等函数(如CreateThreadpoolTimer)时,如果其最后的回调环境参数不为NULL,那么所创建的项会被添加到回调环境的清理组中,表示线程池中新添加了一项,需要潜在的清理

    ②如果调用CloseThreadpool*(如CloseThreadpoolTimer),那么等隐式将对应的项从清理组中删除

    (3)释放清理组成员:CloseThreadpoolCleanupGroupMembers(相当为清理组的每个工作项调用CloseThreadpool*函数)

    参数

    描述

    PTP_CLEANUP_GROUP ptpcg

    指向由CreateThreadpoolCleanupGroup创建的清理 组。

    BOOL bCancelPendingCallbacks

    是否取消线程池是剩余的工作项。

    ①如果bCancelPendingCallbacks指定为TRUE,则所有已提交但尚未处理工作项将被直接取消。如果之前在SetThreadpoolCallbackCleanupGroup的pfng指定了回调函数,那么对每一个被取消的项,回调函数pfng都要被调用一次

    ②如果bCancelPendingCallbacks为FALSE,则在函数返回之前,线程池会花时间来处理队列中剩余的工作项。(这时CleanupGroupCancelCallback函数是不会被调用的,因此pvCleanupContext可设为NULL)

    PVOID pvCleanupContext

    传给CleanupGroupCancelCallback回调函数的额外参数。如果bCancelPendingCallbacks为FALSE,则CleanupGroupCancelCallback不会被调用,所以该参数可以设为NULL)。

    备注:主控线程调用该函数时,该函数会一直等待,直到线程池中所有剩余的工作项都处理完毕才返回。(这点与WaitForThreadpool*函数(如WaitForThreadpoolWork)相似。

    (4)销毁清理组:ClosethreadpoolCleanupGroup将清理组本身所占内存释放!

     【PrivateBatch程序】私有线程池使用的演示

    /*************************************************************************
    Module:  PrivateBatch.cpp
    Notices: Copyright(c) 2008 Jeffrey Richter & Christophe Nasarre
    *************************************************************************/
    
    #include "../../CommonFiles/CmnHdr.h"
    #include "resource.h"
    #include <tchar.h>
    #include <strsafe.h>
    
    //////////////////////////////////////////////////////////////////////////
    //全局变量
    HWND                    g_hDlg = NULL;
    PTP_POOL                g_pThreadPool = NULL; //私有线程池
    TP_CALLBACK_ENVIRON        g_callbackEnv;
    PTP_CLEANUP_GROUP       g_pCleanupGroup = NULL;
    HANDLE                    g_hPrintfEvent = NULL;
    HANDLE                    g_hEmailEvent = NULL;
    
    
    //自定义的消息
    #define WM_APP_COMPLETED   (WM_APP + 123)
    
    //////////////////////////////////////////////////////////////////////////
    typedef struct _tagCallBackData{
        HANDLE finishEvent;
        TCHAR  szAction[MAX_PATH];
    }CALLBACK_DATA, *PCALLBACK_DATA;
    
    typedef struct _tagSYNCRO_DATA{ //同步数据结构
        DWORD  Count; //内核对象的数量
        HANDLE* Handles;//内核对象的句柄数组
    }SYNCHRO_DATA, *PSYNCHRO_DATA;
    
    //////////////////////////////////////////////////////////////////////////
    void AddMessage(PCTSTR pszFormat,...)
    {
        va_list argList;
        va_start(argList, pszFormat);
    
        TCHAR szMsg[20*1024];
        _vstprintf_s(szMsg, _countof(szMsg), pszFormat, argList);
        HWND hListBox = GetDlgItem(g_hDlg, IDC_LB_STATUS);
        ListBox_SetCurSel(hListBox, ListBox_AddString(hListBox, szMsg));
        
        va_end(argList);
    
    }
    
    //////////////////////////////////////////////////////////////////////////
    int lastWorkTime = 0; 
    //注意:原程序这里少于NTAPI调用约定,会出错!
    VOID  NTAPI SimpleHandler(PTP_CALLBACK_INSTANCE pInstance, PVOID Context){
    
        //确保有效的随机数
        int workTime = ((rand() % 4) + 1); //产生1-4之间的随机数
        while (workTime == lastWorkTime){
            workTime = ((rand() % 4) + 1);
        }
        lastWorkTime = workTime;
    
        AddMessage(TEXT("线程[%u] 执行 "%s" 将耗时大约%u秒。"), GetCurrentThreadId(),
                   ((PCTSTR)Context), workTime);
    
        //模拟其他工作
        Sleep(workTime * 1000);
    
        AddMessage(
            TEXT("线程[%u] 执行 "%s" 结束."),
            GetCurrentThreadId(), (PCTSTR)Context);
    
        delete[] ((PTSTR)Context);
    }
    
    //////////////////////////////////////////////////////////////////////////
    VOID NTAPI EndOfBatchCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID Context)
    {
        PSYNCHRO_DATA pData = (PSYNCHRO_DATA)Context;
    
        AddMessage(TEXT("线程[%u] 正在等待批处理完成..."), GetCurrentThreadId());
        DWORD BatchItemsCount = pData->Count;
        do{
            DWORD dwRet = WaitForMultipleObjects(pData->Count, pData->Handles,FALSE,1000);
            switch (dwRet)
            {
            case WAIT_TIMEOUT:
                break;
    
            default: //任何一种操作结束时
                BatchItemsCount--;
                break;
            }
        } while (BatchItemsCount > 0);
    
        //通知UI线程任务己经完成
        PostMessage(g_hDlg, WM_APP_COMPLETED, 0, (LPARAM)pData);
    }
    
    //////////////////////////////////////////////////////////////////////////
    VOID NTAPI ActionCallback(PTP_CALLBACK_INSTANCE pInstance, PVOID Context){
        PCALLBACK_DATA pData = (PCALLBACK_DATA)Context;
    
        //根据操作的名称(如"邮件"或"打印...")来计算一个伪时间
        DWORD  dwDuration = (DWORD)_tcslen(pData->szAction);
    
        AddMessage(TEXT("线程[%u] 执行 "%s" 将耗时大约%u秒"), 
                   GetCurrentThreadId(),pData->szAction,dwDuration);
        //模拟一些工作
        Sleep(dwDuration * 1000);
    
        AddMessage(TEXT("线程[%u] 执行 "%s" 完毕!"),
                   GetCurrentThreadId(), pData->szAction);
    
        //当回调函数终止时,请求线程池触发相应的同步对象(这里会触发相应的
        //g_hMailEvent或g_hPrintEvent对象
        SetEventWhenCallbackReturns(pInstance, pData->finishEvent);
        //SetEvent(pData->finishEvent);
        //清除Context参数指定的对象(外部传入的)
        delete (pData);
    }
    
    //////////////////////////////////////////////////////////////////////////
    void OnStartBatch(){
        if (g_hEmailEvent != NULL){
            AddMessage(TEXT("旧的批处理任务仍在运行中..."));
            return;
        }
    
        AddMessage(TEXT("线程[%u]----开始新的批处理任务----"), GetCurrentThreadId());
    
        //创建具名的同步事件对象
        g_hEmailEvent = CreateEvent(NULL, FALSE,FALSE,TEXT("EmailEvent")); //自动,无信号
        g_hPrintfEvent = CreateEvent(NULL, FALSE, FALSE, TEXT("PrintfEvent"));
    
        //定义线程池要处理的工作项
        //1.跟踪事件对象,因为每一步操作完成时,相应的对象会被触发
        PSYNCHRO_DATA pSyncData = new SYNCHRO_DATA();
        pSyncData->Count = 2;
        pSyncData->Handles = new HANDLE[2];
        pSyncData->Handles[0] = g_hEmailEvent;
        pSyncData->Handles[1] = g_hPrintfEvent;
        TrySubmitThreadpoolCallback((PTP_SIMPLE_CALLBACK)EndOfBatchCallback,
                                    pSyncData, &g_callbackEnv);
    
        //2.模拟发送邮件操作
        PCALLBACK_DATA pData = new CALLBACK_DATA();
        pData->finishEvent = g_hEmailEvent;
        StringCchCopy(pData->szAction, MAX_PATH, TEXT("发送邮件"));
        //_tcscpy_s(pData->szAction, MAX_PATH, TEXT("邮件"));
        TrySubmitThreadpoolCallback((PTP_SIMPLE_CALLBACK)ActionCallback,
                                    pData, &g_callbackEnv);
    
        //3.模拟打印操作
        pData = new CALLBACK_DATA();
        pData->finishEvent = g_hPrintfEvent;
        StringCchCopy(pData->szAction, MAX_PATH, TEXT("打印操作..."));
        //_tcscpy_s(pData->szAction, MAX_PATH, TEXT("打印中..."));
        TrySubmitThreadpoolCallback((PTP_SIMPLE_CALLBACK)ActionCallback,
                                    pData,&g_callbackEnv);
    }
    
    //////////////////////////////////////////////////////////////////////////
    //销毁线程池
    void OnDeletePool(){
        //注意:DestroyThreadpoolEnviroment函数并没做什么,
        //      要求我们自己释放线程池
        CloseThreadpoolCleanupGroupMembers(g_pCleanupGroup, FALSE, NULL);
    
        CloseThreadpoolCleanupGroup(g_pCleanupGroup);
    
        if (g_pThreadPool != NULL){
            CloseThreadpool(g_pThreadPool);
        }
    
        //销毁环境块,此版本的DestroyThreadpoolEnviroment并没做什么
        //也许下一个版本会提供,这里直接调用即可
        DestroyThreadpoolEnvironment(&g_callbackEnv);
        g_pThreadPool = NULL;
    }
    
    //创建线程池
    void OnCreatePool(){
        //有效性检验
        if (g_pThreadPool != NULL){
            OnDeletePool();
        }
    
        //创建私有线程池
        g_pThreadPool = CreateThreadpool(NULL);
        if (g_pThreadPool == NULL){
            MessageBox(NULL, TEXT("无法创建私有线程池!"), 
                       TEXT("初始化失败!"), MB_OK | MB_ICONERROR);
            return;
        }
    
        //定制线程池,最小线程数为2,最大为4.
        SetThreadpoolThreadMaximum(g_pThreadPool, 4);
        if (!SetThreadpoolThreadMinimum(g_pThreadPool,2)){
            MessageBox(NULL, TEXT("设置线程池最小线程数失败!"),
                       TEXT("初始化失败!"), MB_OK | MB_ICONERROR);
            return;
        }
    
    
        //重置回调环境块
        //注意:这是一个内联函数,调用以后各字段如下
        //    CallbackEnviron->Version =1;
        //    CallbackEnviron->Pool    = NULL;
        //    CallbackEnviron->CleanupGroup = NULL;
        //    CallbackEnviron->CleanupGroupCancelCallback = NULL;
        //    CallbackEnviron->RaceDll = NULL;
        //    CallbackEnviron->ActivationContext = NULL;
        //    CallbackEnviron->FinalizationCallback = NULL;
        //    CallbackEnviron->u.Flags = 0;
        //初始化回调环境块
        InitializeThreadpoolEnvironment(&g_callbackEnv);
        
        //将线程池与回调环境关联
        //注意:该调用会使得CallbackEnviro->Pool = Pool;
        SetThreadpoolCallbackPool(&g_callbackEnv, g_pThreadPool);
    
        //创建资源清理器
        g_pCleanupGroup = CreateThreadpoolCleanupGroup();
        //将清理器与环境块关联
        SetThreadpoolCallbackCleanupGroup(&g_callbackEnv, g_pCleanupGroup,NULL);
    }
    
    //运行
    void OnRun(){
        
        srand(GetTickCount());
    
        for (int current = 1; current <= 6;current++){
            PTSTR pRequest = new TCHAR[MAX_PATH];
            StringCchPrintf(pRequest, MAX_PATH, TEXT("请求%u"), current);
            if (TrySubmitThreadpoolCallback((PTP_SIMPLE_CALLBACK)SimpleHandler,
                pRequest,&g_callbackEnv)){
                AddMessage(TEXT("线程[%u] 请求%u 被提交"),GetCurrentThreadId(), current);
            } else {
                AddMessage(TEXT("线程[%u] 请求%u 不能被提交"),GetCurrentThreadId(), current);
            }
        }
    }
    
    //////////////////////////////////////////////////////////////////////////
    BOOL Dlg_OnInitDialog(HWND hwnd, HWND hwndFocus, LPARAM lParam){
        //保存对话框句柄
        g_hDlg = hwnd;
        return TRUE;
    }
    
    void Dlg_OnCommand(HWND hwnd, int id, HWND hwndCtrl, UINT codeNotity){
        switch (id)
        {
        case IDOK:
        case IDCANCEL:
            EndDialog(hwnd, id);
            break;
    
        case IDC_BTN_START_BATCH:
            OnStartBatch();
            break;
    
        case IDC_BTN_CREATE_POOL:
            OnCreatePool();
            break;
    
        case IDC_BTN_RUN:
            OnRun();
            break;
    
        case IDC_BTN_DELETE_POOL:
            OnDeletePool();
            break;
        }
    }
    
    //////////////////////////////////////////////////////////////////////////
    INT_PTR WINAPI Dlg_Proc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam){
        switch (uMsg){
            chHANDLE_DLGMSG(hwnd, WM_INITDIALOG, Dlg_OnInitDialog);
            chHANDLE_DLGMSG(hwnd, WM_COMMAND, Dlg_OnCommand);
    
        case WM_APP_COMPLETED:
            {
                AddMessage(TEXT("线程[%u] ----全部批处理任务完成!----"), GetCurrentThreadId());
    
                //别忘了清除同步对象
                PSYNCHRO_DATA pData = (PSYNCHRO_DATA)lParam;
                for (DWORD current = 0; current < pData->Count;current++){
                    if (!CloseHandle(pData->Handles[current])){
                        AddMessage(TEXT("线程[%u] 关闭句柄%u时出现错误[%u])"), 
                                   GetCurrentThreadId(),current,GetLastError());
                    }
                }
    
                g_hEmailEvent = NULL;
                g_hPrintfEvent = NULL;
                delete (pData);
            }
            break;
        }
    
        return FALSE;
    }
    //////////////////////////////////////////////////////////////////////////
    int APIENTRY _tWinMain(HINSTANCE hInstance, HINSTANCE hPrevInstance,  LPTSTR lpCmdLine, int nShowCmd){
        UNREFERENCED_PARAMETER(hPrevInstance);
    
        DialogBoxParam(hInstance, MAKEINTRESOURCE(IDD_MAIN), NULL, Dlg_Proc, _ttoi(lpCmdLine));
        
        return 0;
    }

    //resource.h

    //{{NO_DEPENDENCIES}}
    // Microsoft Visual C++ 生成的包含文件。
    // 供 11_PrivateBatch.rc 使用
    //
    #define IDD_MAIN                        101
    #define IDC_BTN_START_BATCH             1001
    #define IDC_BTN_RUN                     1002
    #define IDC_LB_STATUS                   1003
    #define IDC_BTN_CREATE_POOL             1004
    #define IDC_BTN_DELETE_POOL             1005
    // Next default values for new objects
    // 
    #ifdef APSTUDIO_INVOKED
    #ifndef APSTUDIO_READONLY_SYMBOLS
    #define _APS_NEXT_RESOURCE_VALUE        102
    #define _APS_NEXT_COMMAND_VALUE         40001
    #define _APS_NEXT_CONTROL_VALUE         1004
    #define _APS_NEXT_SYMED_VALUE           101
    #endif
    #endif

    //PrivateBatch.rc

    // Microsoft Visual C++ generated resource script.
    //
    #include "resource.h"
    
    #define APSTUDIO_READONLY_SYMBOLS
    /////////////////////////////////////////////////////////////////////////////
    //
    // Generated from the TEXTINCLUDE 2 resource.
    //
    #include "winres.h"
    
    /////////////////////////////////////////////////////////////////////////////
    #undef APSTUDIO_READONLY_SYMBOLS
    
    /////////////////////////////////////////////////////////////////////////////
    // 中文(简体,中国) resources
    
    #if !defined(AFX_RESOURCE_DLL) || defined(AFX_TARG_CHS)
    LANGUAGE LANG_CHINESE, SUBLANG_CHINESE_SIMPLIFIED
    
    #ifdef APSTUDIO_INVOKED
    /////////////////////////////////////////////////////////////////////////////
    //
    // TEXTINCLUDE
    //
    
    1 TEXTINCLUDE 
    BEGIN
        "resource.h"
    END
    
    2 TEXTINCLUDE 
    BEGIN
        "#include ""winres.h""
    "
        ""
    END
    
    3 TEXTINCLUDE 
    BEGIN
        "
    "
        ""
    END
    
    #endif    // APSTUDIO_INVOKED
    
    
    /////////////////////////////////////////////////////////////////////////////
    //
    // Dialog
    //
    
    IDD_MAIN DIALOGEX 0, 0, 294, 201
    STYLE DS_SETFONT | DS_MODALFRAME | DS_CENTER | WS_POPUP | WS_CAPTION | WS_SYSMENU
    CAPTION "使用私有线程池"
    FONT 10, "宋体", 400, 0, 0x86
    BEGIN
        PUSHBUTTON      "开始批处理",IDC_BTN_START_BATCH,90,7,59,14
        PUSHBUTTON      "运行",IDC_BTN_RUN,163,7,50,14
        LISTBOX         IDC_LB_STATUS,7,27,280,147,LBS_NOINTEGRALHEIGHT | NOT WS_BORDER | WS_VSCROLL | WS_HSCROLL | WS_TABSTOP,WS_EX_STATICEDGE
        DEFPUSHBUTTON   "退出",IDOK,237,180,50,14
        PUSHBUTTON      "创建线程池",IDC_BTN_CREATE_POOL,7,7,58,14
        PUSHBUTTON      "删除线程池",IDC_BTN_DELETE_POOL,229,7,58,14
    END
    
    
    /////////////////////////////////////////////////////////////////////////////
    //
    // DESIGNINFO
    //
    
    #ifdef APSTUDIO_INVOKED
    GUIDELINES DESIGNINFO
    BEGIN
        IDD_MAIN, DIALOG
        BEGIN
        END
    END
    #endif    // APSTUDIO_INVOKED
    
    #endif    // 中文(简体,中国) resources
    /////////////////////////////////////////////////////////////////////////////
    
    
    
    #ifndef APSTUDIO_INVOKED
    /////////////////////////////////////////////////////////////////////////////
    //
    // Generated from the TEXTINCLUDE 3 resource.
    //
    
    
    /////////////////////////////////////////////////////////////////////////////
    #endif    // not APSTUDIO_INVOKED
  • 相关阅读:
    BlockingQueue(阻塞队列)详解
    支付宝系统架构(内部架构图)
    微博的消息队列
    JVM源码分析之堆外内存完全解读
    滑动冲突的补充——Event的流程走向
    BaseFragment的定义—所有Fragment的父类
    BaseActivity的定义——作为所有Activity类的父类
    BGARefreshLayout-Android-master的简单使用
    分析BGARefreshLayout-master
    简便数据库——ORMLite框架
  • 原文地址:https://www.cnblogs.com/5iedu/p/4823362.html
Copyright © 2011-2022 走看看