▶ 学习回调函数的基本概念,并在CUDA的任务流中插入基于CPU的主机函数,作为回调函数使用。
▶ 源代码(合并了 3 个源文件,删掉了没有用到的部分)
1 // simpleCallback.cu 2 #include <stdio.h> 3 #include <windows.h> 4 #include <cuda_runtime.h> 5 #include "device_launch_parameters.h" 6 #include <helper_functions.h> 7 #include <helper_cuda.h> 8 9 #define N_WORKLOAD 8 10 #define BLOCK 512 11 #define ELEMENT 100000 12 13 struct CUTBarrier // 线程墙 14 { 15 CRITICAL_SECTION criticalSection; // Windows 中有关线程的结构 16 HANDLE barrierEvent; 17 int releaseCount; 18 int count; 19 }; 20 21 CUTBarrier thread_barrier; 22 23 struct heterogeneous_workload // 用于分配工作的结构 24 { 25 int id; // 工作编号 26 int cudaDeviceID; // 执行工作的设备号 27 int *h_data; 28 int *d_data; 29 cudaStream_t stream; // 使用的流号(一个工作使用一条流) 30 bool success; // 检查结果是否正确的标志 31 }; 32 33 HANDLE cutStartThread(unsigned (WINAPI * func)(void *), void *data) // 创建新线程,注意函数指针的形式 34 { 35 return CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)func, data, 0, NULL); 36 } 37 38 CUTBarrier cutCreateBarrier(int releaseCount) // 创建线程墙 39 { 40 CUTBarrier barrier; 41 InitializeCriticalSection(&barrier.criticalSection); 42 barrier.barrierEvent = CreateEvent(NULL, TRUE, FALSE, TEXT("BarrierEvent")); 43 barrier.count = 0; 44 barrier.releaseCount = releaseCount; 45 return barrier; 46 } 47 48 void cutIncrementBarrier(CUTBarrier *barrier) // 线程墙判断线程工作是否已经全部结束 49 { 50 int myBarrierCount; 51 EnterCriticalSection(&barrier->criticalSection); 52 myBarrierCount = ++barrier->count; 53 LeaveCriticalSection(&barrier->criticalSection); 54 if (myBarrierCount >= barrier->releaseCount) // 发出的线程已经全部结束 55 SetEvent(barrier->barrierEvent); 56 } 57 58 void cutWaitForBarrier(CUTBarrier *barrier) // 回收线程墙 59 { 60 WaitForSingleObject(barrier->barrierEvent, INFINITE); 61 } 62 63 __global__ void incKernel(int *data, int N) // 将 data 中所有元素递增总线程个数次 64 { 65 int idx = blockIdx.x * blockDim.x + threadIdx.x; 66 67 if (idx < N) 68 data[idx]++; 69 } 70 71 unsigned WINAPI postprocess(void *void_arg) 72 { 73 heterogeneous_workload *workload = (heterogeneous_workload *)void_arg; 74 cudaSetDevice(workload->cudaDeviceID); 75 76 getLastCudaError("Kernel execution failed"); // 检查GPU计算结果 77 workload->success = true; 78 for (int i = 0; i < N_WORKLOAD; ++i) 79 workload->success &= (workload->h_data[i] == workload->id + i + 1); 80 81 cudaFree(workload->d_data); 82 cudaFreeHost(workload->h_data); 83 cudaStreamDestroy(workload->stream); 84 85 printf("Workload %d finished! ", workload->id); // 回调函数工作完成 86 cutIncrementBarrier(&thread_barrier); // 向线程墙发送工作完成的信号 87 return 0; 88 } 89 90 void CUDART_CB myStreamCallback(cudaStream_t stream, cudaError_t status, void *data)// 回调函数,参数格式固定 91 { 92 cutStartThread(postprocess, data); // 调用函数 postprocess 完成结果检查和内存释放 93 } 94 95 unsigned WINAPI launch(void *void_arg) 96 { 97 heterogeneous_workload *workload = (heterogeneous_workload *)void_arg; // 初始化工作参数 98 cudaSetDevice(workload->cudaDeviceID); 99 cudaStreamCreate(&workload->stream); 100 cudaMalloc(&workload->d_data, ELEMENT * sizeof(int)); 101 cudaHostAlloc(&workload->h_data, ELEMENT * sizeof(int), cudaHostAllocPortable); 102 for (int i = 0; i < ELEMENT; ++i) 103 workload->h_data[i] = workload->id + i; 104 105 // 每个 CPU 线程对应一条 CUDA 流,分别调度流的工作,可以并行,不阻塞 CPU 线程 106 cudaMemcpyAsync(workload->d_data, workload->h_data, ELEMENT * sizeof(int), cudaMemcpyHostToDevice, workload->stream); 107 incKernel << <(ELEMENT + BLOCK - 1) / BLOCK, BLOCK, 0, workload->stream >> > (workload->d_data, ELEMENT); 108 cudaMemcpyAsync(workload->h_data, workload->d_data, ELEMENT * sizeof(int), cudaMemcpyDeviceToHost, workload->stream); 109 110 cudaStreamAddCallback(workload->stream, myStreamCallback, workload, 0); // 回调函数,调用主机函数放入 CUDA 流中,在这里用于检查 GPU 结果和回收内存 111 return 0; 112 } 113 114 int main(int argc, char **argv) 115 { 116 printf(" Start. "); 117 118 heterogeneous_workload *workloads = (heterogeneous_workload *)malloc(N_WORKLOAD * sizeof(heterogeneous_workload)); // 创建工作表 119 thread_barrier = cutCreateBarrier(N_WORKLOAD); // 创建线程墙,以便所有工作结束后回收 120 121 for (int i = 0; i < N_WORKLOAD; ++i) // 分配任务 122 { 123 workloads[i].id = i; 124 workloads[i].cudaDeviceID = 0; // 将任务全部分配给 0 号设备 125 cutStartThread(launch, &workloads[i]); 126 } 127 128 cutWaitForBarrier(&thread_barrier); // 回收线程 129 printf(" %d workloads all finished. ", N_WORKLOAD); 130 131 int success = 1; 132 for (int i = 0; i < N_WORKLOAD; success &= workloads[i].success, ++i); // 检查正确性 133 printf(" %s ", success ? "Correct." : "Failure."); 134 135 free(workloads); 136 getchar(); 137 return success; 138 }
● 输出结果
Start. Work 2 finished! Work 3 finished! Work 7 finished! Work 1 finished! Work 5 finished! Work 4 finished! Work 6 finished! Work 0 finished! 8 workloads all finished. Correct.
▶ 涨姿势
● 回调函数的使用:首先在 cuda_runtime_api.h 中给出了能作为回调函数的主机函数格式,然后给出了回调函数的定义。回调函数需要给出流编号,回调函数指针,回调函数需要的参数,以及一个标志(不太清楚其意义,可能与回调函数是否等待流中所有其他任务是否完成后再开始有关)
1 #define CUDART_CB __stdcall 2 #define CUDARTAPI __stdcall 3 4 // cudaStreamCallback_t 的定义 5 typedef void (CUDART_CB *cudaStreamCallback_t)(cudaStream_t stream, cudaError_t status, void *userData); 6 7 // cudaStreamAddCallback 的定义 8 extern __host__ cudaError_t CUDARTAPI cudaStreamAddCallback(cudaStream_t stream, cudaStreamCallback_t callback, void *userData, unsigned int flags);
● 有关线程创建的一些参数
1 //winnt.h 2 typedef void* HANDLE; // HANDLE 原来就是 void* 3 //minwindef.h 4 typedef unsigned long DWORD;// DWORD 原来就是 unsigned long