zoukankan      html  css  js  c++  java
  • 转载~kxcfzyk:Linux C语言多线程库Pthread中条件变量的的正确用法逐步详解

            Linux C语言多线程库Pthread中条件变量的的正确用法逐步详解                         

     

    (本文的读者定位是了解Pthread常用多线程API和Pthread互斥锁,但是对条件变量完全不知道或者不完全了解的人群。如果您对这些都没什么概念,可能需要先了解一些基础知识)

    关于条件变量典型的实际应用,可以参考非常精简的Linux线程池实现(一)——使用互斥锁和条件变量,但如果对条件变量不熟悉最好先看完本文。

    Pthread库的条件变量机制的主要API有三个:

     

    • int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
    • int pthread_cond_broadcast(pthread_cond_t *cond);
    • int pthread_cond_signal(pthread_cond_t *cond);

    注:还有一个没说的API是pthread_cond_timewait,它跟pthread_cond_wait(作用见下面)的唯一不同就是可以指定一个等待的超时时间,这里不对它作额外讨论。

    它们和其它几个Pthread API一起用于处理一种特定情形的线程同步问题:

    1. 若干个线程在某个条件没满足时不能继续往下面走,于是纷纷调用pthread_cond_wait使自己在这个条件上陷入等待(休眠);
    2. 当条件满足以后,另外有个活跃着的线程调用pthread_cond_broadcast通知(唤醒)刚才那些等待在这个条件上的所有线程,让它们继续往下运行。

    这种情形是非常通用、非常基础的,很多更加具体的线程同步问题都是这种情形的扩展,比如说经典的消费者/生产者问题,读者/写者问题等等。明显“条件”是这种情形的核心,所以Pthread的这套线程同步机制叫做“条件变量”。可以看出条件变量机制跟Java的wait/notify机制非常类似。

    上面这种情形也可以用POSIX定义的另外一套线程/进程同步机制来实现——信号量(semaphore),而且信号量机制在实际场景中用起来比条件变量机制还简单一些,但是信号量的性能不如Pthread库中的条件变量。

    条件变量通过pthread_cond_t数据类型来声明,而且使用之前必须先要初始化:

    1. pthread_cond_t cond = PTHREAD_COND_INITIALIZER;  
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

    上面这种是通过预定义的初始化宏来静态初始化,也可以用函数动态初始化:

    1. pthread_cond_t cond;  
    2. pthread_cond_init(&cond, NULL);  
    pthread_cond_t cond;
    pthread_cond_init(&cond, NULL);

    注意条件变量应该声明为全局可见的,因为条件变量会在多个线程(的函数)中被访问。条件变量最后不再使用了的时候应该销毁:

    1. pthread_cond_destroy(&cond);  
    pthread_cond_destroy(&cond);

    在初始化后到销毁前这段时间内就是条件变量的正常生命周期了,可以按需要对它调用pthread_cond_wait、pthread_cond_signal和pthread_cond_broadcast。

    pthread_cond_signal的作用跟pthread_cond_broadcast相似,但不同的是pthread_cond_signal会通知所有等待线程中的至少一个,让它(们)继续往下运行,而所有其它没被通知的等待线程则继续等待(休眠)。之所以pthread_cond_signal并不是严格地只唤醒一个等待线程,是因为在多处理器或多核系统中,可能无法实现只唤醒一个等待线程,就算能强行做到只唤醒一个等待线程,也会带来很大的性能损失,这对一个通用的基础线程同步API来说并不合适。

    但实际应用场景中我们通常希望每调用一次pthread_cond_signal就唤醒一个等待线程,比如说下面这种情况:

    某个线程专门负责从网络接收数据包,其它若干线程专门负责处理数据包。当没有任何数据包时,处理线程全部调用pthread_cond_wait陷入等待。当一个数据包到达时,接收线程调用phtread_cond_signal唤醒一个处理线程,处理线程拿走这个数据包去处理。当又一个数据包到达时,接收线程再次调用pthread_cond_signal唤醒一个线程……

    这个问题对信号量机制来说很容易,因为信号量中的sem_post函数只会唤醒一个等待的进程或线程。虽然pthread_cond_signal本身不保证只唤醒一个等待线程,但是POSIX标准在定义这套API时考虑过了这个问题,它留了一个“后门”,让我们在应用程序中可以通过额外的代码来解决这个问题。

    先不考虑那个所谓的“后门”,一个粗略看上去可行的解决办法是,除了条件变量以外,再额外设置一个全局的普通计数变量表示允许唤醒多少个等待线程:

    1. int global_count=0;  
    int global_count=0;
    那么当通知线程需要调用pthread_cond_signal唤醒别的等待线程之前,应该先增加全局变量的计数,表示允许唤醒的线程数目又增加了一个:
    1. global_count++;  
    2. pthread_cond_signal(&cond);  
    global_count++;
    pthread_cond_signal(&cond);

    pthread_cond_signal一调用,那些调用pthread_cond_wait等待在cond的线程可能会有好几个都唤醒了,索性假设全部都被唤醒了。但其实我们只想让其中一个继续往下走,其它的不应该往下走,那么其它那些等待线程就都应该再次调用pthread_cond_wait继续等待(这里明显该有个循环)。

    下面的问题就是,怎么决定哪一个等待线程继续走呢?可以这样,当大家都被唤醒的时候,大家都判断一下global_count是不是大于0,也就是当前允不允许唤醒线程。如果某个等待线程检测到的global_count是大于0的,就赶紧把global_count减掉一个,然后自己往下走。这时候global_count少了一个,可能就是0了,表示不允许再唤醒线程,其它几个等待线程发现这一状况以后就不往下走,再次调用pthread_cond_wait继续等待:

    1. while(global_count<=0) {  
    2.     pthread_cond_wait(&cond, ...);  
    3. }  
    4. global_count--;  
    while(global_count<=0) {
    	pthread_cond_wait(&cond, ...);
    }
    global_count--;
    到现在为止问题基本解决了,但是引出了一个新的问题:多个线程同时访问global_count变量会造成竞态条件。问题看上去很容易解决,使用互斥锁保护就好了。

    对于通知线程线程:

    1. pthread_mutex_lock(&mutex);  
    2. global_count++;  
    3. pthread_cond_signal(&cond);  
    4. pthread_mutex_unlock(&mutex);  
    pthread_mutex_lock(&mutex);
    global_count++;
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
    对于等待线程:
    1. pthread_mutex_lock(&mutex);  
    2. while(global_count<=0) {  
    3.     pthread_cond_wait(&cond, ...);  
    4. }  
    5. global_count--;  
    6. pthread_mutex_unlock(&mutex);  
    pthread_mutex_lock(&mutex);
    while(global_count<=0) {
    	pthread_cond_wait(&cond, ...);
    }
    global_count--;
    pthread_mutex_unlock(&mutex);
    新的问题又出来了:等待线程调用pthread_cond_wait陷入等待时,还占有着mutex互斥锁,下次通知线程想要唤醒线程时就无法获取mutex互斥锁了,于是出现了死锁。所以在调用pthread_cond_wait将当前线程陷入等待之前,我们应该解开mutex互斥锁,当线程被唤醒,从pthread_cond_wait函数返回时,我们应该重新获取mutex互斥锁。

    比如像这样:

    1. pthread_mutex_lock(&mutex);  
    2. while(global_count<=0) {  
    3.     pthread_mutex_unlock(&mutex);  
    4.     pthread_cond_wait(&cond, ...);  
    5.     pthread_mutex_lock(&mutex);  
    6. }  
    7. global_count--;  
    8. pthread_mutex_unlock(&mutex);  
    pthread_mutex_lock(&mutex);
    while(global_count<=0) {
    	pthread_mutex_unlock(&mutex);
    	pthread_cond_wait(&cond, ...);
    	pthread_mutex_lock(&mutex);
    }
    global_count--;
    pthread_mutex_unlock(&mutex);
    这段代码还是有问题的,在pthread_cond_wait函数调用的前后当前线程都有一段看上去“很短”的不拥有mutex互斥锁的真空期,但是对于CPU来说这段真空期并不算太短。

    假设某个等待线程检测到global_count==0,于是解开mutex互斥锁,进入真空期,即将调用pthread_cond_wait。就在这时候,通知线程增加了一下global_count的计数值然后调用了pthread_cond_signal。接下来,刚才那个等待线程调用pthread_cond_wait陷入等待,由于pthread_cond_wait的调用发生在pthread_cond_signal之后,所以pthread_cond_wait并不会返回。如果程序里的等待线程就这一个,这个通知就丢失了。 问题到了这里似乎没路可走了,但是别忘了还有个“后门”没用上,那就是前面一直没提的pthread_cond_wait的第二个参数了。看看本文最开始列出的函数声明,第二个参数赫然是mutex!这下猜也能猜到这第二个参数是干嘛的了,明显就是专门帮我们解开mutex锁啊,然后在pthread_cond_wait返回之前再自动获取mutex锁。这里顺道澄清一下,和条件变量关联的mutex,不是像网上部分人说的那样是用来保护条件变量的,条件变量在实现的时候是能够做到线程安全的,因为它内部还有一个自己的互斥锁。

    所以正确的做法是:

    1. pthread_mutex_lock(&mutex);  
    2. while(global_count<=0) {  
    3.     pthread_cond_wait(&cond, &mutex);  
    4. }  
    5. global_count--;  
    6. pthread_mutex_unlock(&mutex);  
    pthread_mutex_lock(&mutex);
    while(global_count<=0) {
    	pthread_cond_wait(&cond, &mutex);
    }
    global_count--;
    pthread_mutex_unlock(&mutex);

    到这里问题是不是完全解决了?很遗憾还差一点。pthread_cond_wait是线程撤销点(cancellation points)之一,这意味着当某个线程因为调用pthread_cond_wait而陷入休眠等待时,别的线程可以通过这个线程的ID调用pthread_cancel让这个线程强制从pthread_cond_wait返回并开始执行一些清理工作,最后结束退出。

    问题就出在pthread_cond_wait返回上,上面标红的地方已经强调了,pthread_cond_wait返回之前会先自动获取mutex,也就是说返回以后已经占有了mutex互斥锁。这种情况下线程直接退出会导致互斥锁一直被占用,其它线程就无法获取这个互斥锁了,再次出现死锁。 这个问题有两种解决办法,第一个办法是在线程退出前的清理工作中加入解开互斥锁的代码,这个并不难办到,因为POSIX定义了两个API:

    1. void pthread_cleanup_pop(int execute);  
    2. void pthread_cleanup_push(void (*routine)(void*), void *arg);  
    void pthread_cleanup_pop(int execute);
    void pthread_cleanup_push(void (*routine)(void*), void *arg);

    pthread_cleanup_push用于向一个特殊栈压入一个函数指针,当线程退出时,这个特殊栈中的所有函数会被一个个从栈顶弹出并执行(return退出的情况除外)。phtread_cleanup_pop用于从这个特殊栈的栈顶手动弹出函数指针,execute参数非0时,弹出的函数会被自动执行。

    需要注意的是,POSIX标准允许这两个API被实现为带未闭合花括号的宏,所以这两个API一定(最好)要配套使用:它们必须一前一后(push在前),而且在同一个函数的同一个嵌套层次内。

    比如说这两个API的实现有可能会是类似于这样:

    1. #define pthread_cleanup_pop(execute)        XXXX { XXXX  
    2. #define pthread_cleanup_push(routine, arg)  XXXX } XXXX  
    #define pthread_cleanup_pop(execute)		XXXX { XXXX
    #define pthread_cleanup_push(routine, arg)	XXXX } XXXX
    所以这就是为什么它们的调用要求如此奇怪了。 有了这两个API,想要解决刚才的问题,首先要定义一个清理回调函数:
    1. void mutex_clean(void *mutex) {  
    2.     pthread_mutex_unlock((pthread_mutex_t*)mutex);  
    3. }  
    void mutex_clean(void *mutex) {
    	pthread_mutex_unlock((pthread_mutex_t*)mutex);
    }
    然后在等待线程里调用那两个API:
    1. pthread_mutex_lock(&mutex);  
    2. pthread_cleanup_push(mutex_clean, &mutex);  
    3. while(global_count<=0) {  
    4.     pthread_cond_wait(&cond, &mutex);  
    5. }  
    6. global_count--;  
    7. pthread_cleanup_pop(0);  
    8. pthread_mutex_unlock(&mutex);  
    pthread_mutex_lock(&mutex);
    pthread_cleanup_push(mutex_clean, &mutex);
    while(global_count<=0) {
    	pthread_cond_wait(&cond, &mutex);
    }
    global_count--;
    pthread_cleanup_pop(0);
    pthread_mutex_unlock(&mutex);
    或者一个稍微简洁一些的写法:
    1. pthread_mutex_lock(&mutex);  
    2. pthread_cleanup_push(mutex_clean, &mutex);  
    3. while(global_count<=0) {  
    4.     pthread_cond_wait(&cond, &mutex);  
    5. }  
    6. global_count--;  
    7. pthread_cleanup_pop(1);  
    pthread_mutex_lock(&mutex);
    pthread_cleanup_push(mutex_clean, &mutex);
    while(global_count<=0) {
    	pthread_cond_wait(&cond, &mutex);
    }
    global_count--;
    pthread_cleanup_pop(1);
    另外一种解决方案比这个麻烦一些,那就是设置mutex互斥锁的robust属性值为PTHREAD_MUTEX_ROBUST。

    对于robust互斥锁,当持有它的线程没解锁就退出以后,别的线程再去调用pthread_mutex_lock,函数会回一个EOWNERDEAD错误,线程检测到这这个错误后可以调用pthread_mutex_consistent使robust互斥锁恢复一致性,紧接着就可以调用phtread_mutex_unlock解锁了(尽管这个锁并不是当前这个线程加持的)。解锁完毕就可以重新调用pthread_mutex_lock了。 采用这种用方案的时候,首先要声明一个全局可见的mutex属性变量:

    1. pthread_mutexattr_t mutexattr;  
    pthread_mutexattr_t mutexattr;
    然后初始化并设置属性值:
    1. pthread_mutexattr_init(&mutexattr);  
    2. pthread_mutexattr_setrobust(&mutexaddtr, PTHREAD_MUTEX_ROBUST);  
    pthread_mutexattr_init(&mutexattr);
    pthread_mutexattr_setrobust(&mutexaddtr, PTHREAD_MUTEX_ROBUST);
    有了mutex属性,接下来就是在初始化mutex的地方作修改了,通常我们对mutex的初始化都是pthread_mutex_init(&mutex, NULL),现在改成:
    1. pthread_mutex_init(&mutex, &mutexattr);  
    pthread_mutex_init(&mutex, &mutexattr);
    现在准备工作已经完毕,开始干正事了。对于通知线程:
    1. while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {  
    2.     pthread_mutex_consistent(&mutex);  
    3.     pthread_mutex_unlock(&mutex);  
    4. }  
    5. global_count++;  
    6. pthread_cond_signal(&cond);  
    7. pthread_mutex_unlock(&mutex);  
    while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {
    	pthread_mutex_consistent(&mutex);
    	pthread_mutex_unlock(&mutex);
    }
    global_count++;
    pthread_cond_signal(&cond);
    pthread_mutex_unlock(&mutex);
    对于等待线程:
    1. while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {  
    2.     pthread_mutex_consistent(&mutex);  
    3.     pthread_mutex_unlock(&mutex);  
    4. }  
    5. while(global_count<=0) {  
    6.     pthread_cond_wait(&cond, &mutex);  
    7. }  
    8. global_count--;  
    9. pthread_mutex_unlock(&mutex);  
    while(EOWNERDEAD==pthread_mutex_lock(&mutex)) {
    	pthread_mutex_consistent(&mutex);
    	pthread_mutex_unlock(&mutex);
    }
    while(global_count<=0) {
    	pthread_cond_wait(&cond, &mutex);
    }
    global_count--;
    pthread_mutex_unlock(&mutex);
    到这里,所有的事情终于完成了!
  • 相关阅读:
    储存过程与触发器
    session 和 cookie
    (四)Jira工作流状态的属性
    (三)Jira scriptrunner插件
    (二)JIRA安装
    vs2015 调试无法访问网站的问题
    设计模式
    依赖倒置、反射、泛型、委托、AOP
    C# .NET
    持续集成
  • 原文地址:https://www.cnblogs.com/sanghai/p/4861416.html
Copyright © 2011-2022 走看看