互斥量
互斥量(Mutex)是“mutual exclusion”的缩写。互斥量是实现线程同步,和保护同时写共享数据的主要方法。
使用互斥量的典型顺序如下:
1. 创建和初始一个互斥量
2. 多个线程尝试去锁定该互斥量
3. 仅有一个线程可以成功锁定改互斥量
4. 锁定成功的线程做一些处理
5. 线程解锁该互斥量
6. 另外一个线程获得互斥量,重复上述过程
7. 最后销毁互斥量
创建和销毁互斥量
pthread_mutex_t_numtex = PTHREAD_MUTEX_INITIALIZER; int pthread_mutex_init(pthread_mutex_t *mutex, pthread_mutexattr_t *attr); int pthread_mutex_destory(pthread_mutex_t *mutex);
互斥量必须用类型pthread_mutex_t类型声明,在使用前必须初始化,这里有两种方法可以初始化互斥量:
- 声明时静态地,如:pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER;
- 动态地用pthread_mutex_init()函数,这种方法允许设定互斥量的属性对象attr。
互斥量初始化后是解锁的。
attr对象用于设置互斥量对象的属性,使用时必须声明为pthread_mutextattr_t类型,默认值可以是NULL。
Pthreads标准定义了三种可选的互斥量属性:
- 协议(Protocol): 指定了协议用于阻止互斥量的优先级改变
- 优先级上限(Prioceiling):指定互斥量的优先级上限
- 进程共享(Process-shared):指定进程共享互斥量
注意所有实现都提供了这三个可先的互斥量属性。
pthread_mutexattr_init()和pthread_mutexattr_destroy()函数分别用于创建和销毁互斥量属性对象。
pthread_mutex_destroy()应该用于释放不需要再使用的互斥量对象。
将互斥量和它要保护的数据明显的关联起来是个不错的选择。如下例:
1 #include<pthread.h> 2 #include "errors.h" 3 4 typedef struct my_struct_tag { 5 pthread_mutex_t mutex; 6 int value; 7 } my_struct_t; 8 9 int main(int argc, char* argv[]) 10 { 11 my_struct_t *data; 12 int status; 13 14 data = malloc(sizeof(my_struct_t)); 15 if( data == NULL ) 16 errno_abort("malloc"); 17 status = pthread_mutex_init(&data->mutex, NULL); 18 if( status != 0 ) 19 err_abort(status, "init mutex"); 20 status = pthread_mutex_destroy(&data->mutex); 21 if( status != 0 ) 22 err_abort(status, "destroy mutex"); 23 (void)free(data); 24 return status; 25 26 }
加锁和解锁互斥量
int pthread_mutex_lock(pthread_mutex_t *mutex); int phtread_mutex_trylock(pthread_mutex_t *mutex); //非阻塞加锁 int pthrad_mutex_unlock(pthread_mutex_t *mutex);
线程用pthread_mutex_lock()函数去锁定指定的mutex变量,若该mutex已经被另外一个线程锁定了,该调用将会阻塞线程直到mutex被解锁。
pthread_mutex_trylock()尝试着去锁定一个互斥量,然而,若互斥量已被锁定,程序会立刻返回并返回一个忙错误(EBUSY)值。该函数在优先级改变情况下阻止死锁是非常有用的。
线程可以用pthread_mutex_unlock()解锁自己占用的互斥量。在一个线程完成对保护数据的使用,而其它线程要获得互斥量在保护数据上工作时,可以调用该函数。
若有一下情形则会发生错误:
- 互斥量已经被解锁
- 互斥量被另一个线程占用
1 #include<pthread.h> 2 #include<time.h> 3 #include "errors.h" 4 5 typedef struct alarm_tag { 6 struct alarm_tag *link; 7 int seconds; 8 time_t time; 9 char message[64]; 10 } alarm_t; 11 12 pthread_mutex_t alarm_mutex = PTHREAD_MUTEX_INITIALIZER; 13 alarm_t *alarm_list = NULL; 14 15 void *alarm_thread(void *arg) 16 { 17 alarm_t *alarm; 18 int sleep_time; 19 time_t now; 20 int status; 21 22 while(1) 23 { 24 status = pthread_mutex_lock(&alarm_mutex); 25 if( status != 0 ) 26 err_abort(status, "Lock mutex"); 27 alarm = alarm_list; 28 if( alarm == NULL ) 29 sleep_time = 1; 30 else{ 31 alarm_list = alarm->link; 32 now = time(NULL); 33 if( alarm->time <= now) 34 sleep_time = 0; 35 else 36 sleep_time = alarm->time - now; 37 #ifdef DEBUG 38 printf("[waiting: %d(%d)"%s"] ", alarm->time, 39 sleep_time, alarm->message); 40 #endif 41 } 42 status = pthread_mutex_unlock(&alarm_mutex); 43 if(status != 0 ) 44 err_abort(status, "Unlock mutex"); 45 if( sleep_time > 0 ) 46 sleep(sleep_time); 47 else 48 sched_yield(); 49 if( alarm != NULL) 50 { 51 printf("(%d) %s ", alarm->seconds, alarm->message); 52 free(alarm); 53 } 54 55 56 } 57 } 58 59 int main(int argc, char *argv[]) 60 { 61 int status; 62 char line[128]; 63 alarm_t *alarm, **current, *next; 64 pthread_t thread; 65 66 status = pthread_create(&thread, NULL, alarm_thread, NULL); 67 if(status != 0) 68 err_abort(status, "Create alarm thread"); 69 while(1) 70 { 71 printf("Alarm> "); 72 if(fgets(line, sizeof(line), stdin) == NULL ) exit(0); 73 if(strlen(line) <= 1) continue; 74 alarm = (alarm_t*)malloc(sizeof(alarm_t)); 75 if(alarm == NULL) 76 errno_abort("malloc alarm"); 77 78 if(sscanf(line, "%d %64[^ ]", &alarm->seconds, alarm->message) < 2) 79 { 80 fprintf(stderr, "Bad command "); 81 free(alarm); 82 } 83 else 84 { 85 status = pthread_mutex_lock(&alarm_mutex); 86 if(status != 0) 87 err_abort(status, "mutex lock"); 88 alarm->time = time(NULL) + alarm->seconds; 89 90 current = &alarm_list; 91 next = *current; 92 while(next != NULL) 93 { 94 if(next->time >= alarm->time) 95 { 96 alarm->link = next; 97 *current = alarm; 98 break; 99 } 100 current = &next->link; 101 next = next->link; 102 } 103 if(next == NULL) 104 { 105 *current = alarm; 106 alarm->link = NULL; 107 } 108 #ifdef DEBUG 109 printf("[list:"); 110 for(next = alarm_list;next != NULL; next = next->link) 111 printf("%d(%d)["%s"] ", next->time, 112 next->time - time(NULL), next->message); 113 printf("] "); 114 #endif 115 status = pthread_mutex_unlock(&alarm_mutex); 116 if(status != 0) 117 err_abort(status, "Unlock mutex"); 118 } 119 } 120 }
在试锁和回退算法中,总是应该以相反的顺序解锁互斥量:
- 尝试加锁互斥量1;如果成功,再加锁互斥量2;如果成功,再加锁互斥量3。如果某一个互斥量加锁失败,则全部回退。
- 解锁互斥量3/2/1
按照相反顺序解锁,如果第二个线程需要加锁这三个互斥量,则会由于加锁互斥量1失败而回退;而如果先解锁1-2-3这样的顺序,可能会到加锁互斥量3时候才失败,回退代价更大。
1 #include<pthread.h> 2 #include "errors.h" 3 4 #define ITERATIONS 10 5 6 pthread_mutex_t mutex[3] = { 7 PTHREAD_MUTEX_INITIALIZER, 8 PTHREAD_MUTEX_INITIALIZER, 9 PTHREAD_MUTEX_INITIALIZER 10 }; 11 12 int backoff = 1; 13 int yield_flag = 0; 14 15 void * lock_forward(void* arg) 16 { 17 int i, iterate, backoffs, status; 18 for( iterate = 0; iterate < ITERATIONS; iterate++ ){ 19 backoffs = 0; 20 for( i=0; i< 3; i++ ){ 21 if( i == 0 ){ 22 status = pthread_mutex_lock(&mutex[i]); 23 if(status != 0) 24 err_abort(status, "First lock"); 25 }else{ 26 if(backoff) 27 status = pthread_mutex_trylock(&mutex[i]); 28 else 29 status = pthread_mutex_lock(&mutex[i]); 30 if( status == EBUSY) { 31 backoffs++; 32 DPRINTF(( 33 " [forward locker" 34 "backing off at %d] ", 35 i)); 36 for(; i>=0 ;i--) { 37 status = pthread_mutex_unlock(&mutex[i]); 38 if(status != 0) 39 err_abort(status, "Backoff"); 40 } 41 }else { 42 if( status != 0) 43 err_abort(status, "Lock mutex"); 44 DPRINTF((" forward locker got %d ", i)); 45 } 46 } 47 if(yield_flag){ 48 if(yield_flag > 0) 49 sched_yield(); 50 else 51 sleep(1); 52 } 53 } 54 printf("lock forward got all locks, %d backoffs ", backoffs); 55 pthread_mutex_unlock(&mutex[2]); 56 pthread_mutex_unlock(&mutex[1]); 57 pthread_mutex_unlock(&mutex[0]); 58 sched_yield(); 59 } 60 return NULL; 61 } 62 63 void *lock_backward(void *arg) 64 { 65 int i, iterate, backoffs; 66 int status; 67 68 for ( iterate = 0; iterate < ITERATIONS; iterate++ ) { 69 backoffs = 0; 70 for ( i = 2; i >= 0; i-- ) { 71 if (i == 2 ) { 72 status = pthread_mutex_lock (&mutex[i]); 73 if (status != 0) 74 err_abort(status, "First lock"); 75 } else { 76 if (backoff) 77 status = pthread_mutex_trylock(&mutex[i]); 78 else 79 status = pthread_mutex_lock(&mutex[i]); 80 if (status == EBUSY ) { 81 backoffs++; 82 DPRINTF(("[backward locker backing off at %d] ",i)); 83 for (; i < 3; i++) { 84 status = pthread_mutex_unlock(&mutex[i]); 85 if (status != 0) 86 err_abort(status, "Backoff"); 87 } 88 } else { 89 if (status != 0) 90 err_abort(status, "Lock mutex"); 91 DPRINTF(("backward locker got %d ", i)); 92 } 93 } 94 if (yield_flag) { 95 if (yield_flag > 0) 96 sched_yield(); 97 else 98 sleep(1); 99 } 100 } 101 printf("Lock backward got all locks, %d backoffs ", backoffs); 102 pthread_mutex_unlock(&mutex[0]); 103 pthread_mutex_unlock(&mutex[1]); 104 pthread_mutex_unlock(&mutex[2]); 105 sched_yield(); 106 } 107 return NULL; 108 } 109 110 int main(int argc, char* argv[]) 111 { 112 pthread_t forward, backward; 113 int status; 114 115 if (argc > 1) 116 backoff = atoi(argv[1]); 117 if (argc > 2) 118 yield_flag = atoi(argv[2]); 119 status = pthread_create(&forward, NULL, lock_forward, NULL); 120 if (status != 0) 121 err_abort(status, "Create forward"); 122 status = pthread_create(&backward, NULL, lock_backward, NULL); 123 if (status != 0) 124 err_abort(status, "Create backward"); 125 pthread_exit(NULL); 126 127 }