一个简单的Linux多线程例子 带你洞悉互斥量 信号量 条件变量编程
希望此文能给初学多线程编程的朋友带来帮助,也希望牛人多多指出错误。
另外感谢以下链接的作者给予,给我的学习带来了很大帮助
http://blog.csdn.net/locape/article/details/6040383
http://www.cnblogs.com/liuweijian/archive/2009/12/30/1635888.html
一、什么是多线程?
当我自己提出这个问题的时候,我还是很老实的拿着操作系统的书,按着上面的话敲下“为了减少进程切换和创建开销,提高执行效率和节省资源,我们引入了线程的概念,与进程相比较,线程是CPU调度的一个基本单位。”
形象点的举个例子说:一个WEB服务器可以同时接收来自不同用户的网页访问请求,显然服务器处理这些网页请求都是通过并发进行的否则将会造成用户等待时间 长或者响应效率低等问题。如果在服务器中使用进程的办法来处理来自不同网页访问请求的话,我们可以用创建父进程以及多个子进程的方法,然而这样会花费很大 的系统开销和占用较多的资源,因此这样会较大的限制了访问服务器的用户数量。
使用多线程的理由之一是和进程相比,它是一种非常"节俭"的多任务操作方式。我们知道,在Linux系统下,启动一个新的 进程必须分配给它独立的地址空间,建立众多的数据表来维护它的代码段、堆栈段和数据段,这是一种"昂贵"的多任务工作方式。而运行于一个进程中的多个线 程,它们彼此之间使用相同的地址空间,共享大部分数据,启动一个线程所花费的空间远远小于启动一个进程所花费的空间
使用多线程的理由之二是线程间方便的通信机制。对不同进程来说,它们具有独立的数据空间,要进行数据的传递只能通过通信的 方式进行,这种方式不仅费时,而且很不方便。线程则不然,由于同一进程下的线程之间共享数据空间,所以一个线程的数据可以直接为其它线程所用,这不仅快 捷,而且方便。当然,数据的共享也带来其他一些问题,有的变量不能同时被两个线程所修改
二、互斥锁
正如上面所说的,如果两个线程同时对一块内存进行读写或者对向同一个文件写数据,那么结果是难以设想的。正因为如此,引入了对象互斥锁的概念,来保证共享数据操作的完整性。每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。
例如int *a int *b 分别指向两块内存,上面的值分别初始化为(200, 100) 线程A执行这样的一个操作:将*a的值减少50,*b的值增加50.
线程B执行:打印出(a 跟 b 指向的内存的值的和)。
如果串行运行:A: *a -= 50; *b += 50; B: printf("%d ", *a + *b);
如果并发执行,则有可能会出现一下调度:*a -= 50; printf("%d ", *a + *b); *b += 50;
因此我们可以引入互斥量,在对共享数据读写时进行锁操作,实现对内存的访问以互斥的形式进行。
#include <stdio.h> #include <unistd.h> #include <pthread.h> int a = 200; int b = 100; pthread_mutex_t lock; void * ThreadA( void *) { pthread_mutex_lock(&lock); //锁 a -= 50; sleep(5); //执行到一半 使用sleep 放弃cpu调度 b += 50; pthread_mutex_unlock(&lock); } void * ThreadB( void *) { sleep(1); //放弃CPU调度 目的先让A线程运行。 pthread_mutex_lock(&lock); printf ( "%d
" , a + b); pthread_mutex_unlock(&lock); } int main() { pthread_t tida, tidb; pthread_mutex_init(&lock, NULL); pthread_create(&tida, NULL, ThreadA, NULL); pthread_create(&tidb, NULL, ThreadB, NULL); pthread_join(tida, NULL); pthread_join(tidb, NULL); return 1; } |
以上输出为300 去掉锁操作 输出为250。
三、信号量
作用域 | 上锁时 | |
信号量 | 进程间或线程间(linux仅线程间) | 只要信号量的value大于0,其他线程就可以sem_wait成功,成功后信号量的value减一。若value值不大于0,则sem_wait阻塞,直到sem_post释放后value值加一 |
互斥锁 | 线程间 | 只要被锁住,其他任何线程都不可以访问被保护的资源 成功后否则就阻塞 |
信号量本质上是一个非负的整数计数器,它被用来控制对公共资源的访问。当公共资源增加时,调用函数sem_post()增加信号量。只有当信号量值大于0时,才能使用公共资源,使用后,函数sem_wait()减少信号量。
信号量不一定是锁定某一个资源,而是流程上的概念,比如:有A,B两个线程,B线程要等A线程完成某一任务以后再进行自己下面的步骤,这个任务并不一定是锁定某一资源,还可以是进行一些计算或者数据处理之类。
可能这样说大家对信号量的概念还是很模糊,举个例子,现在有个图书馆,其能容纳100人,现在有两个线程A、B,A线程执行:往图书管理进入一个人,B 线程:从图书馆出来一个人。那么为了使得线程A在图书馆满人的时候进入等待,而不是继续往图书馆里进人,使得B线程在图书馆没人的时候等待人进入,我们可 以引入信号量:IN OUT 分别初始化为100和0。
那么A则可以被表示为A:P(IN) //剩余容量减少一 如果容量为0 则等待 B:P(OUT) //人数减少1 如果人数为0则阻塞等待
......... //登记进入图书馆的人信息 ............. //记录 离开图书馆的人信息 (随便一系列操作。)
V(OUT)//增加信号量OUT 表示人数+1 V(IN) //增加图书馆剩余容量+1
通过这样我们就实现了线程的同步。
sem_init 用语初始化一个信号量其原型:int sem_init(sem_t *sem, int pshared, unsigned int value);
sem_init() 初始化一个定位在 sem 的匿名信号量。value 参数指定信号量的初始值。 pshared 参数指明信号量是由进程内线程共享,还是由进程之间共享。如果 pshared 的值为 0,那么信号量将被进程内的线程共享,并且应该放置在所有线程都可见的地址上
上面的P V操作相当于sem_wait () sempost()
sem_wait函数也是一个原子操作,它的作用是从信号量的值减去一个“1”,但它永远会先等待该信号量为一个非零值才开始做减法。也就是说,如果你对一个值为2的信号量调用sem_wait(),线程将 会继续执行,这信号量的值将减到1。如果对一个值为0的信号量调用sem_wait(),这个函数就 会地等待直到有其它线程增加了这个值使它不再是0为止。如果有两个线程都在sem_wait()中等待同一个信号量变成非零值,那么当它被第三个线程增加 一个“1”时,等待线程中只有一个能够对信号量做减法并继续执行,另一个还将处于等待状态。
sem_post的作用很简单,就是使信号量增加1
三、条件变量
条件变量通过允许线程阻塞和 等待另一个线程发送信号的方法弥补了互斥锁的不足,它常和互斥锁一起使用。使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥 锁并等待条件发生变化。一旦其它的某个线程改变了条件变量,它将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。这些线程将重新锁定互斥锁并 重新测试条件是否满足。一般说来,条件变量被用来进行线承间的同步。
假设有共享的资源sum,与之相关联的mutex 是lock_s.假设每个线程对sum的操作很简单的,与sum的状态无关,比如只是sum++.那么只用mutex足够了.程序员只要确保每个线程操作 前,取得lock,然后sum++,再unlock即可.每个线程的代码将像这样
add() { pthread_mutex_lock(lock_s); sum++; pthread_mutex_unlock(lock_s); } |
如果操作比较复杂,假设线程t0,t1,t2的操作是sum++,而线程 t3则是在sum到达100的时候,打印出一条信息,并对sum清零. 这种情况下,如果只用mutex, 则t3需要一个循环,每个循环里先取得lock_s,然后检查sum的状态,如果sum>=100,则打印并清零,然后unlock.如果sum& amp; amp; amp; amp; lt;100,则unlock,并sleep()本线程合适的一段时间.
这个时候,t0,t1,t2的代码不变,t3的代码如下:
print() { while { pthread_mutex_lock(lock_s); if (sum>=100) { printf (“sum reach 100!”); pthread_mutex_unlock(lock_s); } else { pthread_mutex_unlock(lock_s); my_thread_sleep(100); return ; } } } |
这种办法有两个问题
1) sum在大多数情况下不会到达100,那么对t3的代码来说,大多数情况下,走的是else分支,只是lock和unlock,然后sleep().这浪费了CPU处理时间.
2) 为了节省CPU处理时间,t3会在探测到sum没到达100的时候sleep()一段时间.这样却又带来另外一个问题,亦即t3响应速度下降.可能在sum到达200的时候,t3才醒过来.
3) 这样,程序员在设置sleep()时间的时候陷入两难境地,设置得太短了节省不了资源,太长了又降低响应速度.真是难办啊!
这个时候,condition variable内裤外穿,从天而降,拯救了焦头烂额的你. (抄袭的哈哈~)
你首先定义一个condition variable.
pthread_cond_t cond_sum_ready=PTHREAD_COND_INITIALIZER;
t0,t1,t2的代码只要后面加两行,像这样
add() { pthread_mutex_lock(lock_s); sum++; pthread_mutex_unlock(lock_s); if (sum>=100) pthread_cond_signal(&cond_sum_ready); } <br><br> |
T3线程的print<br><br>print { pthread_mutex_lock(lock_s); while (sum<100) pthread_cond_wait(&cond_sum_ready, &lock_s); printf (“sum is over 100!”); sum=0; pthread_mutex_unlock(lock_s); return ; } <br><br> |
注意两点:
1) 在thread_cond_wait()之前,必须先lock相关联的mutex,
因为假如目标条件未满足,pthread_cond_wait()实际上会unlock该mutex,
然后block,在目标条件满足后再重新lock该mutex, 然后返回.
2) 为什么是while(sum<100),而不是if(sum<100)
?这是因为在pthread_cond_signal()和pthread_cond_wait()返回之间,有时间差,假设在这个时间差内,还有另外一
个线程t4又把sum减少到100以下了,那么t3在pthread_cond_wait()返回之后,显然应该再检查一遍sum的大小.
这就是用 while的用意
四、实战:
下面是一个例子:
//一个多线程例子:三个线程 0 1 往水池中加水。 线程2放水。 如果水池满 则阻塞进水线程 等待 #include <unistd.h> #include <pthread.h> #include <stdio.h> #include <errno.h> #include<semaphore.h> pthread_mutex_t Poll_Work; //互斥锁 sem_t Poll_IN; //水池容量信号量 sem_t Poll_OUT; //当前水量信号量 void * thread0( void *param) //0 1线程往水池中加水。 { while (1){ int rv = 0; for ( int i = 0; i < 5; ++ i) while ((rv = sem_wait(&Poll_IN)) != 0 && ( errno == EINTR)) //水量增加5 循环使得容量信号量减少5 ; pthread_mutex_lock(&Poll_Work); //加锁 线程互斥 (*( int *)param) += 5; printf ( "Thread0: %d
" , *( int *)param); for ( int i = 0; i < 5; ++ i) sem_post( &Poll_OUT); //增加当前水量信号量 pthread_mutex_unlock(&Poll_Work); //解锁。 sleep(1); } return NULL; } void * thread1( void *param) { while (1){ int rv = 0; for ( int i = 0; i < 4; ++ i) while ((rv = sem_wait(&Poll_IN)) != 0 && ( errno == EINTR) ) ; pthread_mutex_lock(&Poll_Work); (*( int *)param) += 4; printf ( "Thread1: %d
" , *( int *)param); for ( int i = 0; i < 4; ++ i) sem_post( &Poll_OUT); pthread_mutex_unlock(&Poll_Work); sleep(1); } return NULL; } void * thread2( void *param) //此线程用于将水从池中倒出。。 { while (1){ int rv = 0; for ( int i = 0; i < 3; ++ i) while ((rv = sem_wait(&Poll_OUT)) != 0 && ( errno == EINTR)) //减少当前水量 printf ( "xx" ); pthread_mutex_lock(&Poll_Work); (*( int *)param) -= 3; printf ( "Thread2: %d
" , *( int *)param); for ( int i = 0; i < 3; ++ i) sem_post(&Poll_IN); //水池容量信号量增加 pthread_mutex_unlock(&Poll_Work); sleep(1); } return NULL; } int main() { int sum = 0; //水深 满为100米 初始化 池里没有水 int i; pthread_mutex_init(&Poll_Work, NULL); sem_init(&Poll_IN, 0, 100); sem_init(&Poll_OUT, 0, sum); pthread_t ths[4]; pthread_create(&ths[0], NULL, thread0, ( void *)&sum); pthread_create(&ths[1], NULL, thread1, ( void *)&sum); pthread_create(&ths[2], NULL, thread2, ( void *)&sum); for (i = 0; i < 3; ++ i){ pthread_join(ths[i], NULL); } } |