概述
- 等待条件变量总是返回锁住的互斥量。
- 条件变量的作用是发送信号,而不是互斥。
- 与条件变量相关的共享数据是“谓词”,如队列满或队列空条件。
- 一个条件变量应该与一个谓词相关。如果一个条件变量与多个谓词相关,或者多个条件变量与一个谓词相关,有可能死锁。
主线程(Main Thread)
|
|
Thread A
|
Thread B
|
创建和销毁条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; int pthread_cond_init (pthread_cond_t *cond, pthread_condattr_t *condattr); int pthread_cond_destroy(pthread_cond_t *cond);
- 永远不要copy一个条件变量,因为使用条件变量的备份是不可知的。不过,可以传递条件变量的指针以使不同函数和线程可以使用它来同步。
- 为了获得最好的结果,应该将条件变量与相关的谓词“链接”在一起。
静态初始化条件变量:
1 #include<pthread.h> 2 #include "errors.h" 3 4 typedef struct my_struct_tag { 5 pthread_mutex_t mutex; 6 pthread_cond_t cond; 7 int value; 8 } my_struct_t; 9 10 my_struct_t data = { 11 PTHREAD_MUTEX_INITIALIZER, 12 PTHREAD_COND_INITIALIZER, 13 0 14 }; 15 16 int main(int argc, char* argv[]) 17 { 18 return 0; 19 }
动态初始化条件变量:
1 #include<pthread.h> 2 #include "errors.h" 3 4 typedef struct my_struct_tag { 5 pthread_mutex_t mutex; 6 pthread_cond_t cond; 7 int value; 8 } my_struct_t; 9 10 int main(int argc, char* argv[]) 11 { 12 my_struct_t *data; 13 int status; 14 15 data = malloc(sizeof(my_struct_t)); 16 if ( data ==NULL ) { 17 errno_abort("malloc"); 18 } 19 20 /* 21 * init 22 */ 23 status = pthread_mutex_init(&data->mutex, NULL); 24 if ( status != 0 ) { 25 err_abort(status, "mutex init"); 26 } 27 status = pthread_cond_init(&data->cond); 28 if ( status != 0 ) { 29 err_abort(status, "cond init"); 30 } 31 32 /* 33 * destroy 34 */ 35 status = pthread_cond_destroy(&data->mutex); 36 if ( status != 0 ) { 37 err_abort(status, "destroy cond"); 38 } 39 40 status = pthread_mutex_destroy(&data->mutex); 41 if ( status != 0 ) { 42 err_abort(status, "destroy mutex"); 43 } 44 free(data); 45 return status; 46 }
等待条件变量
- 在阻塞线程之前,条件变量等待操作将解锁互斥量;而在重新返回线程之前,将再次锁住互斥量。
- 所有并发等待同一个条件变量的线程必须指定同一个相关互斥量。
- 任何条件变量在特定时刻只能与一个互斥量相关联,而互斥量则可以同时与多个条件变量关联。
- 在锁住相关的互斥量之后和在等待条件变量之前,测试谓词是很重要的。
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex); int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mutex, struct timespec* expiration)
1 #include<time.h> 2 #include<pthread.h> 3 #include "errors.h" 4 5 typedef struct my_struct_tag { 6 pthread_cond_t cond; 7 pthread_mutex_t mutex; 8 int value; 9 } my_struct_t; 10 11 my_struct_t data = { 12 PTHREAD_COND_INITIALIZER, 13 PTHREAD_MUTEX_INITIALIZER, 14 0 15 }; 16 17 int hibernation = 1; 18 19 void* wait_thread(void* arg) 20 { 21 int status; 22 sleep(hibernation); 23 status = pthread_mutex_lock(&data.mutex); 24 if (status != 0) 25 err_abort(status, "Lock mutex"); 26 data.value = 1; 27 status = pthread_cond_signal(&data.cond); 28 if (status != 0) 29 err_abort(status, "Signal condition"); 30 status = pthread_mutex_unlock(&data.mutex);; 31 if (status != 0) 32 err_abort(status, "Unlock mutex"); 33 return NULL; 34 } 35 36 int main(int argc, char* argv[]) 37 { 38 int status; 39 pthread_t wait_thread_id; 40 struct timespec timeout; 41 42 if (argc > 1) 43 hibernation = atoi(argv[1]); 44 45 status = pthread_create(&wait_thread_id, NULL, wait_thread, NULL); 46 if (status != 0) 47 err_abort(status, "Create wait thread"); 48 49 timeout.tv_sec = time(NULL) + 2; 50 timeout.tv_nsec = 0; 51 52 status = pthread_mutex_lock(&data.mutex); 53 if (status != 0) 54 err_abort(status, "Lock mutex"); 55 56 while(data.value == 0) { 57 status = pthread_cond_timedwait(&data.cond, &data.mutex, &timeout); 58 if (status == ETIMEDOUT) { 59 printf("Condition wait timed out. "); 60 break; 61 } 62 else if (status != 0) 63 err_abort(status, "wait on condition"); 64 } 65 if (data.value != 0) 66 printf("Condition was signaled. "); 67 status = pthread_mutex_unlock(&data.mutex); 68 if (status != 0) 69 err_abort(status, "Unlock mutex"); 70 return 0; 71 }
唤醒条件变量等待线程
int pthread_cond_signal(pthread_cond_t* cond); int pthread_cond_broadcast(pthread_cond_t* cond);
- pthread_cond_wait()阻塞调用线程直到指定的条件受信(signaled)。该函数应该在互斥量锁定时调用,当在等待时会自动解锁互斥量。在信号被发送,线程被激活后,互斥量会自动被锁定,当线程结束时,由程序员负责解锁互斥量。
- pthread_cond_signal()函数用于向其他等待在条件变量上的线程发送信号(激活其它线程)。应该在互斥量被锁定后调用。
- 若不止一个线程阻塞在条件变量上,则应用pthread_cond_broadcast()向其它所以线程发生信号。
- 在调用pthread_cond_wait()前调用pthread_cond_signal()会发生逻辑错误。
- 使用这些函数时适当的锁定和解锁相关的互斥量是非常重要的。如:
- 调用pthread_cond_wait()前锁定互斥量失败可能导致线程不会阻塞。
- 调用pthread_cond_signal()后解锁互斥量失败可能会不允许相应的pthread_cond_wait()函数结束(保存阻塞)。
Alarm最终版本
1 #include<pthread.h> 2 #include<time.h> 3 #include"errors.h" 4 5 typedef struct alarm_tag { 6 struct alarm_tag *link; 7 int seconds; 8 time_t time; 9 char message[64]; 10 } alarm_t; 11 12 pthread_mutex_t alarm_mutex = PTHREAD_MUTEX_INITIALIZER; 13 pthread_cond_t alarm_cond = PTHREAD_COND_INITIALIZER; 14 alarm_t* alarm_list = NULL; 15 time_t current_alarm = 0; 16 17 /* 18 * insert alarm entry on list, in order. 19 */ 20 void alarm_insert(alarm_t* alarm) 21 { 22 int status; 23 alarm_t **last, *next; 24 25 /* 26 * this routine requires that the caller have locker the alarm_mutex. 27 */ 28 last = &alarm_list; 29 next = *last; 30 next = *last; 31 while (next != NULL) { 32 if (next->time >= alarm->time) { 33 alarm->link = next; 34 *last = alarm; 35 break; 36 } 37 last = &next->link; 38 next = next->link; 39 } 40 if (next == NULL) { 41 *last = alarm; 42 alarm->link = NULL; 43 } 44 #ifdef DEBUG 45 printf("[list: "); 46 for (next = alarm_list; next != NULL; next = next->link) 47 printf("%d(%d)["%s"]", next->time, next->time - time(NULL), next->message); 48 printf("] "); 49 #endif 50 /* 51 * wake the alarm thread if it is not busy( 52 * that is, if current alarm is 0, signifying that it's waiting for work), 53 * or if the new alarm comes before the one on which the alarm thread is waiting. 54 */ 55 if (current_alarm == 0 || alarm->time < current_alarm) { 56 current_alarm = alarm->time; 57 status = pthread_cond_signal(&alarm_cond); 58 if (status != 0) 59 err_abort(status, "Signal cond"); 60 } 61 } 62 63 void *alarm_thread(void* arg) 64 { 65 alarm_t* alarm; 66 struct timespec cond_time; 67 time_t now; 68 int status, expired; 69 70 /* 71 * loop forever, processing commands. 72 * the alarm thread will be disintegrated when the process exits. 73 * lock the mutex at the start -- it while be unlocked during condi 74 * -tion waits, so the main thread can insert alarms. 75 */ 76 status = pthread_mutex_lock(&alarm_mutex); 77 if (status != 0) 78 err_abort(status, "Lock mutex"); 79 while (1) { 80 /* 81 * if the alarm list is enpty, wait until an alarm is added. 82 * setting current_alarm to 0 informs the insert routine that 83 * the trhead is not busy. 84 */ 85 current_alarm = 0; 86 while (alarm_list == NULL) { 87 status = pthread_cond_wait(&alarm_cond, &alarm_mutex); 88 if (status != 0) 89 err_abort(status, "wait on cond"); 90 } 91 alarm = alarm_list; 92 alarm_list = alarm->link; 93 now = time(NULL); 94 expired = 0; 95 if (alarm->time > now) { 96 #ifdef DEBUG 97 printf("[waiting: %d(%d)"%s"] ",alarm->time, 98 alarm->time - time(NULL), alarm->message); 99 #endif 100 cond_time.tv_sec = alarm->time; 101 cond_time.tv_nsec = 0; 102 current_alarm = alarm->time; 103 while (current_alarm == alarm->time) { 104 status = pthread_cond_timedwait( 105 &alarm_cond, &alarm_mutex, &cond_time); 106 if (status == ETIMEDOUT) { 107 expired = 1; 108 break; 109 } 110 if (status != 0) 111 err_abort(status, "cond timedwait"); 112 } 113 if (!expired) 114 alarm_insert(alarm); 115 } 116 else 117 expired = 1; 118 if (expired) { 119 printf("(%d) %s ", alarm->seconds, alarm->message); 120 free(alarm); 121 } 122 } 123 124 } 125 126 int main(int argc, char* argv[]) 127 { 128 int status; 129 char line[128]; 130 alarm_t *alarm; 131 pthread_t thread; 132 133 status = pthread_create( 134 &thread, NULL, alarm_thread, NULL); 135 if (status != 0) 136 err_abort(status, "create alarm thread"); 137 while (1) { 138 printf("Alarm> "); 139 if (fgets(line, sizeof(line), stdin) == NULL) exit(0); 140 if (strlen(line) <= 1) continue; 141 alarm = (alarm_t*)malloc(sizeof(alarm_t)); 142 if (alarm == NULL) 143 errno_abort("Allocate alarm"); 144 145 if (sscanf(line, "%d %64[^ ]", &alarm->seconds, alarm->message) < 2) { 146 fprintf(stderr, "Bad command "); 147 free(alarm); 148 } else { 149 status = pthread_mutex_lock(&alarm_mutex); 150 if (status != 0) 151 err_abort(status, "Lock mutex"); 152 alarm->time = time(NULL) + alarm->seconds; 153 154 alarm_insert(alarm); 155 status = pthread_mutex_unlock(&alarm_mutex); 156 if (status != 0) 157 err_abort(status, "Unlock mutex"); 158 } 159 } 160 }