本章主要介绍线程的概念,创建和终止线程以及线程同步问题。
使用到的函数默认需要包含pthread.h头文件,且在使用gcc编译时,需要链接pthread库。
代码地址:https://gitee.com/maxiaowei/Linux/tree/master/apue
线程的创建与终止
创建线程
// Returns: 0 if OK, error number on failure
int pthread_create(pthread_t *restrict tidp,
const pthread_attr_t *restrict attr,
void *(*start_rtn)(void *), void *restrict arg);
新创建的线程的线程ID被设置成tidp
指向的内存单元;attr
参数定制线程的不同属性;start_rtn
函数是线程开始时执行的函数,其参数可以通过arg
进行传递。
注意:
新线程最好不要通过tidp
指向的内存空间获取自己的线程ID,因为如果新线程在主线程调用pthread_create
返回前就运行了,那么它看到的就是未经初始化的内容,很可能并不是正确的线程ID。可以使用pthread_self
函数获取自己的线程ID。
pthread_t pthread_self(void);
终止线程
任意线程调用exit
、_Exit
或_exit
会导致整个进程终止,可以通过以下3种方式,在不终止进程的前提下终止单个线程:
- 直接从启动实例中返回
- 被同一进程的其他线程取消
- 调用
pthread_exit
void pthread_exit(void *rval_ptr);
// Returns: 0 if OK, error number on failure
int pthread_join(pthread_t thread, void **rval_ptr);
调用pthread_join
的线程会一直阻塞,直到指定的线程终止。如果指定的线程直接返回或者是调用pthread_exit
终止,则可以通过rval_ptr
查看其返回值;如果线程是被取消的,则rval_ptr
被设置为PTHRERAD_CANCELED
。
取消线程
// Returns: 0 if OK, error number on failure
int pthread_cancel(pthread_t tid);
用来请求取消同一进程中的其他线程。被取消的线程的行为表现为如同调用了参数为PTHRERAD_CANCELED
的pthread_exit
函数。但是,线程可以选择忽略或者控制如何被取消。
线程清理处理程序
void pthread_cleanup_push(void (*rtn)(void *), void *arg);
void pthread_cleanup_pop(int execute);
清理函数rtn
只有在以下情况会执行:
- 调用
pthread_exit
- 响应取消请求
- 用非零
execute
参数调用pthread_cleanup_pop(为0时,清理函数不会被调用)
这两个函数需要成对使用。
分离线程
// Returns: 0 if OK, error number on failure
int pthread_detach(pthread_t tid);
默认情况下,线程的终止状态会保留,直到调用pthread_join
。如果线程被分离,则资源会在线程终止后被立即收回。
线程同步
互斥量mutex
// All return: 0 if OK, error number on failure
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
互斥变量为pthread_mutex_t
类型,如果使用静态分配方式,可以直接使用PTHREAD_MUTEX_INITIALIZER
进行初始化。对于动态分配的互斥量,在释放内存前需要调用pthread_mutex_destroy
。
带有超时的互斥锁
如果不希望线程在访问加锁的互斥量时无限等待,可以通过pthread_mutex_timedlock
指定等待的绝对时间。
#include <time.h>
// Returns: 0 if OK, error number on failure
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr);
示例
#include <pthread.h>
#include <time.h>
#include "apue.h"
int main()
{
int err;
struct timespec tout;
struct tm *tmp;
char buf[64];
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 加锁
pthread_mutex_lock(&lock);
printf("mutex is locked.
");
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("current time is %s
", buf);
// 设置超时
tout.tv_sec += 10;
err = pthread_mutex_timedlock(&lock, &tout);
clock_gettime(CLOCK_REALTIME, &tout);
tmp = localtime(&tout.tv_sec);
strftime(buf, sizeof(buf), "%r", tmp);
printf("the time is now %s
", buf);
if(err == 0) {
printf("mutex locked.
");
} else {
printf("can't lock mutex:%s
",strerror(err));
}
return 0;
}
读写锁rwlock
读写锁有3中状态:不加锁、读模式加锁和写模式加锁。一次只有一个线程可以占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁。
读写锁适合对数据结构读的次数远大于写的情况。
// Both return: 0 if OK, error number on failure
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
读写锁在使用前必须初始化,在释放它们底层的内存前必须销毁。
// All return: 0 if OK, error number on failure
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 读模式锁定
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 写模式锁定
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
带有超时的读写锁
// Both return: 0 if OK, error number on failure
int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock,
const struct timespec *restrict tsptr);
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock,
const struct timespec *restrict tsptr);
与互斥量类似。
条件变量cond
当线程等待的条件变量被满足后,该线程就会被唤醒。条件变量需要和互斥量配合使用,条件本身是由互斥量保护的。
在使用条件变量之前,必须对其进行初始化(有静态和动态2种方式)。
// All return: 0 if OK, error number on failure
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict tsptr);
int pthread_cond_signal(pthread_cond_t *cond); // 至少唤醒一个
int pthread_cond_broadcast(pthread_cond_t *cond); // 全部唤醒
pthread_cond_wait
操作主要执行如下操作步骤:
1.解锁互斥量mutex
2.阻塞调用线程,直至另一线程就条件变量cond发出信号
3.重新锁定mutex
因此,在使用pthread_cond_wait
函数之前,应该已经取得mutex锁。
另外,对pthread_cond_wait
的调用应该放在while循环中,因为从wait
函数返回时,并不能确定条件已经得到满足(其他线程先醒来、虚假唤醒等),需要重新对条件进行判断。
示例
仅摘录主要代码,完整代码见ch11/pthread_cond.c
// 消费者进程
void *process_msg(void *arg)
{
for (;;) {
pthread_mutex_lock(&qlock);
while (count <= 0) {
printf("%s wait msg
", tag);
pthread_cond_wait(&qready, &qlock);
}
count--;
pthread_mutex_unlock(&qlock);
/* 处理消息 */
// 放弃cpu,让另一个处理进场有机会得到数据
sleep(1);
}
return NULL;
}
// 生产者进程
int main(void)
{
for (;;) {
pthread_mutex_lock(&qlock);
count += 4;
pthread_mutex_unlock(&qlock);
// 测试两种唤醒方式
#if 1
pthread_cond_broadcast(&qready);
#else
pthread_cond_signal(&qready);
#endif
// 保证两个消费者进程都可以有时间处理数据
sleep(3);
}
return 0;
}
自旋锁spin
自旋锁与互斥量大体类似,主要的不同之处在于自旋锁在获取锁之前会一直忙等。因此,使用自旋锁应该保证持有锁的时间很短。
自旋锁和互斥量的接口类似:
// All return: 0 if OK, error number on failure
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
pshared
表示进程共享(process-shared)属性,表明自旋锁的获取方式。它仅在支持线程进程共享同步(Thread Process-Shared Synchronization)的平台上有效,当设置为PTHREAD_PROCESS_SHARED
,则只要线程可以访问锁底层内存,即使是不同进程的线程都可以获得锁;而设置为PTHREAD_PROCESS_PRIVATE
后,只有初始化该锁的进程内部的线程可以访问它。
屏障barrier
屏障允许多个线程等待,直到所有合作线程满足某个点后,从该点继续执行。主线程可以将某个任务分解多个小任务交给不同的线程,等到所有线程工作完成后,主线程在此基础上继续执行。
如书中的例子,使用8个线程分解800万个数的排序工作,每个线程对其中的100万个数排序,最后由主线程将这些结果进行合并。
// Both return: 0 if OK, error number on failure
int pthread_barrier_init(pthread_barrier_t *restrict barrier,
const pthread_barrierattr_t *restrict attr,
unsigned int count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
初始化函数中的count
参数用于指定所有线程继续运行前,必须到达屏障的线程数。
// Returns: 0 or PTHREAD_BARRIER_SERIAL_THREAD if OK, error number on failure
int pthread_barrier_wait(pthread_barrier_t *barrier);
wait函数表明当前线程已完成工作,准备等待其他线程。当线程调用该函数后满足屏障计数,那么函数的返回值为PTHREAD_BARRIER_SERIAL_THREAD
,其余线程该函数返回值为0。这一特点使得可以很容易的将一个线程作为主线程,它可以工作在其他所有线程已完成的工作结果上。
示例
见ch11/pthread_barrier.c
#include <pthread.h>
#include "apue.h"
pthread_barrier_t pb;
pthread_t t1, t2;
void *th1(void *a)
{
printf("start t1
");
sleep(1);
// 最后一个完成的线程,返回值应该为-1
int r = pthread_barrier_wait(&pb);
printf("th1 r:%d
", r);
return NULL;
}
void *th2(void *a)
{
printf("start t2
");
int r = pthread_barrier_wait(&pb);
printf("th2 r:%d
", r);
return NULL;
}
int main()
{
int r;
pthread_barrier_init(&pb, NULL, 3);
pthread_create(&t1, NULL, th1, NULL);
pthread_create(&t2, NULL, th2, NULL);
r = pthread_barrier_wait(&pb);
printf("main r:%d
", r);
// 等待子进程结束
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return 0;
}