zoukankan      html  css  js  c++  java
  • 利用信号量实现线程同步

    本篇使用信号量机制实现对全局资源的正确使用,包括以下两点:

    • 各个子线程对全局资源的互斥使用
    • 主线程对子线程的同步

    信号量

    简单的说,信号量内核对象,也是多线程同步的一种机制,它可以对资源访问进行计数,包括最大资源计数和当前资源计数,是两个32位的值;另外,计数是以原子访问的方式进行,由操作系统维护;

    • 最大资源计数,表示可以控件的最大资源数量
    • 当前资源计数,表示当前可用资源的数量

    信号量的规则:

    • 如果当前资源计数器大于0,那么信号量处于触发状态
    • 如果当前资源计数器等于0,那么信号量处于未触发状态
    • 系统绝对不会让当前资源计数器变为负数
    • 当前资源计数器决定不会大于最大资源计数

    信号量机制:

    以一个停车场的运作为例。假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆直接进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入外面的一辆进去,如果又离开两辆,则又可以放入两辆,如此往复。

    在这个停车场系统中,车位是公共资源,每辆车好比一个线程,看门人起的就是信号量的作用,当当前资源计数大于0,信号量处于触发状态,线程编程可调度;

    相关API

    • 创建信号量
    HANDLE WINAPI CreateSemaphore(
        LPSECURITY_ATTRIBUTES lpSemaphoreAttributes,//more设置为NULL
        LONG lInitialCount,   //当前可用资源的数量
        LONG lMaximumCount,   //最大资源数量
        LPCTSTR lpName        //信号量名字,可以设置为NULL
    );
    • 释放信号量
    BOOL WINAPI ReleaseSemaphore(
        HANDLE hSemaphore,      //信号量句柄
        LONG lReleaseCount,     //该函数返回时,当前可用资源的数增加lReleaseCount个
        LPLONG lpPreviousCount  
    );
    • 打开已经存在的信号量
    HANDLE WINAPI OpenSemaphore(
        DWORD dwDesiredAccess,
        BOOL bInheritHandle,
        LPCTSTR lpName //打开信号量的名字
    );
    • wait函数
    DWORD WINAPI WaitForSingleObject(
        HANDLE hHandle,      //当等待成功时,当前可用信号量递减
        DWORD dwMilliseconds //等待时间
    );
    

    线程同步举例

    线程同步包含两层含义:

    1. 对全局资源互斥访问
    2. 线程间的有序、协同执行(比如:线程A执行完后,再进行线程B的操作)

    例子1 互斥访问

    #include "stdafx.h"
    #include <iostream>
    #include <afxmt.h>
    using namespace std;
    
    int g_nIndex = 0;
    const int nMaxCnt = 20;
    
    #define MAX_SEM_COUNT 1
    #define THREADCOUNT 12
    
    HANDLE ghSemaphore;
    
    DWORD WINAPI ThreadProcBySEM( LPVOID );
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        HANDLE aThread[THREADCOUNT];
        int i;
    
        // Create a semaphore with initial and max counts of MAX_SEM_COUNT
        // 备注:实现多个子线程资源互斥范围,最大信号量需要设置为1
        ghSemaphore = CreateSemaphore( 
            NULL,   // default security attributes
            1,      // initial count
            1,      // maximum count
            NULL);  // unnamed semaphore
        if (ghSemaphore == NULL) 
        {
            printf("CreateSemaphore error: %d
    ", GetLastError());
            return 0;
        }
    
        // Create worker threads
        for( i=0; i < THREADCOUNT; i++ )
        {
            aThread[i] = CreateThread( 
                NULL,       // default security attributes
                0,          // default stack size
                (LPTHREAD_START_ROUTINE) ThreadProcBySEM, 
                NULL,       // no thread function arguments
                0,          // default creation flags
                NULL); // receive thread identifier
    
            if( aThread[i] == NULL )
            {
                printf("CreateThread error: %d
    ", GetLastError());
                return 0;
            }
        }
    
        //Wait for all threads to terminate
        WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
    
        //Close thread and semaphore handles
        for( i=0; i < THREADCOUNT; i++ )
            CloseHandle(aThread[i]);
    
        CloseHandle(ghSemaphore);
    
        return 0;
    }
    
    
    DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
    {
        DWORD dwWaitResult;
        BOOL bContinue = TRUE;
        while(bContinue)
        {
            //Try to enter the semaphore gate.
            dwWaitResult = WaitForSingleObject(ghSemaphore,INFINITE);
            if (WAIT_OBJECT_0 == dwWaitResult)
            {
                if (g_nIndex++ < nMaxCnt)
                {
                    //Simulate thread spending time on task
                    Sleep(5);
                    printf("Thread %d: Index = %d
    ", GetCurrentThreadId(), g_nIndex);
                }
                else
                {
                    bContinue = FALSE;
                }
    
                // Relase the semaphore when task is finished
                if (!ReleaseSemaphore( 
                    ghSemaphore,  // handle to semaphore
                    1,            // increase count by one
                    NULL))       // not interested in previous count
                {
                    printf("ReleaseSemaphore error: %d
    ", GetLastError());
                }
            }
            else
            {
                break;
            }
        }
    
        return TRUE;
    }

    运行结果:

    这里写图片描述

    例子2 线程同步

    下面我们将线程创建时的顺序编号,也打印出来,实现方式是将变量i传递给线程,以实现打印;

    错误代码:

    我们将编号i的地址作为CreateThread入参,传递给线程函数ThreadProcBySEM;主要修改如下:

     // Create worker threads
     for( i=0; i < THREADCOUNT; i++ )
     {
         aThread[i] = CreateThread( 
             NULL,       // default security attributes
             0,          // default stack size
             (LPTHREAD_START_ROUTINE) ThreadProcBySEM, 
             &i,         //线程编号地址
             0,          // default creation flags
             NULL);      // receive thread identifier
    
         if( aThread[i] == NULL )
         {
             printf("CreateThread error: %d
    ", GetLastError());
             return 0;
         }
     }
     //线程函数代码:
    DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
    {
        DWORD dwWaitResult;
        BOOL bContinue = TRUE;
        if (NULL == lpParam)
        {
            return FALSE;
        }
        //获取编号,线程创建有时间开销,当函数执行到,赋值语句时
        //lpParam所指向的是变量i,但是主线程已经对i进行++,不再是创建时的值了
        int nThreadNum = *(int*)lpParam;
    
        if (WAIT_OBJECT_0 == dwWaitResult)
        {
            if (g_nIndex++ < nMaxCnt)
            {
                //Simulate thread spending time on task
                Sleep(5);
                //打印线程编号
                printf("NO %d = Thread %d: Index = %d
    ", nThreadNum, GetCurrentThreadId(), g_nIndex);
            }
        } 
    }

    运行结果:

    从下图中可以看出,不同的线程句柄,却有相同的线程编号,属于异常线程;

    这里写图片描述

    原因分析:

    线程创建到线程函数执行存在时间开销,线程函数不会第一时间开始执行,而此时主线程还处于非阻塞状态,主线程仍然可以对变量i不断进行++操作,导致当前线程进行访问时,其内容已经不再是当前那个值了;

    正确做法:

    1.引入关键代码段同步对象,用于各个子线间,访问全局资源g_nIndex,
    2.在创建新线程前,需要将当前线程的编号保存到,临时变量;
    3.将信号量对象用于主线程和子线程间的同步;

    全部代码如下:

    #include "stdafx.h"
    //#include <windows.h>
    #include <iostream>
    #include <afxmt.h>
    using namespace std;
    
    #define MAX_SEM_COUNT 1
    #define THREADCOUNT 12
    
    int g_nIndex = 0;
    const int nMaxCnt = 20;
    
    HANDLE ghSemaphore;
    CRITICAL_SECTION g_csLockA;
    
    DWORD WINAPI ThreadProcBySEM( LPVOID );
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        HANDLE aThread[THREADCOUNT];
        int i;
    
        // Create a semaphore with initial and max counts of MAX_SEM_COUNT
        ghSemaphore = CreateSemaphore( 
            NULL,   // default security attributes
            0,      // initial count,modify:当前处于非触发状态
            1,      // maximum count
            NULL);  // unnamed semaphore
        if (ghSemaphore == NULL) 
        {
            printf("CreateSemaphore error: %d
    ", GetLastError());
            return 0;
        }
    
        //初始化关键代码段对象,用于访问全局资源的互斥
        InitializeCriticalSection(&g_csLockA);
    
        // Create worker threads
        for( i=0; i < THREADCOUNT; i++ )
        {
    
            aThread[i] = CreateThread( 
                NULL,       // default security attributes
                0,          // default stack size
                (LPTHREAD_START_ROUTINE) ThreadProcBySEM, 
                &i,       // no thread function arguments
                0,          // default creation flags
                NULL); // receive thread identifier
    
            if( aThread[i] == NULL )
            {
                printf("CreateThread error: %d
    ", GetLastError());
                return 0;
            }
            //modify 创建新线程前,需要将编号保存起来,才创建新线程
            //用于同步主线程和子线程
            WaitForSingleObject(ghSemaphore,INFINITE);
        }
    
        //Wait for all threads to terminate
        WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
    
        //Close thread and semaphore handles
        for( i=0; i < THREADCOUNT; i++ )
            CloseHandle(aThread[i]);
    
        CloseHandle(ghSemaphore);
    
        DeleteCriticalSection(&g_csLockA);
    
        return 0;
    }
    
    
    DWORD WINAPI ThreadProcBySEM(LPVOID lpParam)
    {
        DWORD dwWaitResult;
        BOOL bContinue = TRUE;
        if (NULL == lpParam)
        {
            return FALSE;
        }
        int nThreadNum = *(int*)lpParam;
    
        // Relase the semaphore 
        if (!ReleaseSemaphore( 
            ghSemaphore,  // handle to semaphore
            1,            // increase count by one
            NULL))       // not interested in previous count
        {
            printf("ReleaseSemaphore error: %d
    ", GetLastError());
        }
    
        while(bContinue)
        {
            Sleep(10);//modify:当前线程处于休眠,给其他线程执行机会
    
            //Try to enter the cs gate. //modify:保护资源的唯一性访问
            EnterCriticalSection(&g_csLockA);
            if (g_nIndex++ < nMaxCnt)
            {
                printf("NO %02d = Thread %d: Index = %d
    ", nThreadNum, GetCurrentThreadId(), g_nIndex);
            }
            else
            {
                bContinue = FALSE;
            }
    
            LeaveCriticalSection(&g_csLockA);
        }
    
        return TRUE;
    }

    运行结果:

    这里写图片描述

    从运行结果看出,线程句柄和编号可以一一对应,并且资源访问也正常;

  • 相关阅读:
    从Kratos设计看Go微服务工程实践
    京东到家安全测试实践
    浅谈 Protobuf 编码 原创 gsonli 腾讯技术工程 2021-07-14
    API Design Guide
    The power of two choices in randomized load balancing
    NGINX and the "Power of Two Choices" Load-Balancing Algorithm
    SRE 崩溃
    DDoS木马
    String.fromCharCode(88,83,83) 方法返回由指定的 UTF-16 代码单元序列创建的字符串
    汇编语言的AX,BX,CX,DX,分别表示什么
  • 原文地址:https://www.cnblogs.com/jinxiang1224/p/8468285.html
Copyright © 2011-2022 走看看