本章主要介绍一些进程间通信的方式,如管道、消息队列、信号量和共享存储等。
管道
一般来说,管道是半双工的(即数据只能在一个方向上流动),并且只能在具有公共祖先的两个进程之间使用。通常,父进程创建管道后会接着调用fork
,从而利用管道在父子进程之间通信。
之后,父子进程可以分别关闭管道的读/写端,以利用管道在父子进程中传递信息。例如,如果想要创建从父进程到子进程的管道,则可以关闭父进程的读端和子进程的写端。
由于管道半双工的特性,想要在父子进程间双向传递信息需要建立2个管道。
#include <unistd.h>
// Returns: 0 if OK, −1 on error
int pipe(int fd[2]);
利用pipe
函数可以创建管道,fd
参数返回两个文件描述符,fd[0]
为读而打开,fd[1]
为写而打开。fd[1]
的输出是fd[0]
的输入。
在上面的例子中,父进程关闭fd[0]
,子进程关闭fd[1]
,那么最后的示意图如下:
注意:
- 当读一个写端被关闭的管道,在所有数据被读取后,read返回0
- 当写一个读端被关闭的管道,会产生
SIGPIPE
信号。如果忽略该信号或从信号处理程序返回,则write返回-1,且设置errno为EPIPE
- 写入不超过
PIPE_ BUF
字节的操作是原子的,如果写入数据的大小超过该值,在多个进程同时写一个管道时,所写的数据可能交叉
连接到另一个进程
管道的通常用法是创建一个连接到另一个进程的管道,然后读取其输出或者向其输入端发送数据。可以使用popen
和pclose
实现这一功能。这两个函数实现的操作是:创建一个管道,fork一个子进程,关闭未使用的管道,执行shell运行命令,然后等待命令终止。
#include <stdio.h>
// Returns: file pointer if OK, NULL on error
FILE *popen(const char *cmdstring, const char *type);
// Returns: termination status of cmdstring, or −1 on error
int pclose(FILE *fp);
popen
先执行fork,然后调用exec执行cmdstring
,并且返回一个标准I/O文件指针,如果type
是"r",则文件指针连接到cmdstring
的标准输出,如果是"w"则连接到标准输入,如下图所示:
cmdstring
会以sh -c cmdstring
的方式执行。
pclose
函数关闭标准I/O流,等待命令终止,然后返回shell的终止状态。(注意不要使用fclose函数,它不会等待子进程结束)
协同进程
UNIX系统过滤程序从标准输入读取数据,向标准输出写数据。几个过滤程序通常在shell管道中线性连接。当一个过滤程序既产生某个过滤程序的输入,又读取该过滤程序的输出时,它就变成了协同进程(coprocess)。
要实现协同进程,需要创建两个管道,分别作为协同进程的标准输入和输出,示意图如下:
子进程的参考代码如下:
close(fd1[1]);
close(fd2[0]);
if (fd1[0] != STDIN_FILENO) {
if (dup2(fd1[0], STDIN_FILENO) != STDIN_FILENO)
err_sys("dup2 error to stdin");
close(fd1[0]);
}
if (fd2[1] != STDOUT_FILENO) {
if (dup2(fd2[1], STDOUT_FILENO) != STDOUT_FILENO)
err_sys("dup2 error to stdout");
close(fd2[1]);
}
if (execl("./add2", "add2", (char *)0) < 0)
err_sys("execl error");
dup2
函数用于复制指定的文件描述符,它将两个管道描述符分别连接到标准输入和输出。
注意:
在协同进程中如果需要使用标准I/O(如fgets),则要额外注意其缓冲机制。对于管道,其默认是全缓冲的,可以通过调用fflush
或者设置缓冲模式(setvbuf/setbuf)来解决。
FIFO
FIFO也被称为命名管道,它使得不相关的进程间也能交换数据。
FIFO也是一种文件类型,创建FIFO与创建文件类似,需要指定其路径。
#include <sys/stat.h>
// Both return: 0 if OK, −1 on error
int mkfifo(const char *path, mode_t mode);
int mkfifoat(int dirfd, const char *path, mode_t mode);
mode
参数指明FIFO的文件权限,与open函数中的mode相同。
mkfifoat
函数的path
参数有如下几种情况:
- 如果指定为绝对路径,则会忽略
dirfd
参数,行为与mkfifo
类似 - 如果指定为相对路径,则该路径与
dirfd
打开的目录有关 - 如果指定为相对路径,且
dirfd
有参数AT_FDCWD,那么路径以当前目录开始
创建完成后,就可以使用open打开FIFO。
在打开时如果没有设置非阻塞标志O_NONBLOCK,那么如果以只读方式打开(O_RDONLY),进程会被阻塞直到其他进程为写而打开这个FIFO,同理,只写方式(O_ WRONL )打开也会阻塞。
但是,不应该使用O_RDWR的方式来绕过这种阻塞行为,而应该使用非阻塞标志。使用读写方式打开FIFO,会导致读取数据时永远看不到文件结束,因为至少会有一个写描述符是打开着的。
实例
可以使用FIFO进行客户进程与服务器进程之间的通信。每个客户进程可以将自己的请求写到一个公共的FIFO文件中(请求长度需要小于PIPE_BUF以避免客户进程之间的数据交叉),服务器进程针对每个客户进程创建FIFO,用于向客户进程发送数据。客户进程的FIFO的路径名可以使用客户进程的PID号作为基础,如/tmp/servv1.PID,这样客户进程就直到该从哪个FIFO读取服务器进程返回的数据了。
XSI IPC
这一部分主要包含3种IPC方式:消息队列、信号量和共享存储器。
每个IPC对象与键(key)相关联,以使得多个进程可以通过它进行联系。在创建IPC结构时,必须指定一个键。而在系统内部,则使用标识符引用IPC结构。
关于键的创建方式,主要有如下几种:
-
指定为IPC_PRIVATE,这会创建一个新的IPC结构,可以将返回的标识符存入文件供其他进程使用,也可直接给fork后的子进程使用
-
在公共头文件中定义一个键,然后由一个进程(通常是服务器进程)根据这个键来创建新的IPC结构。但是这种方式可能会与已经存在的键冲突,需要进程删除原有的IPC结构再重新创建。
-
使用
ftok
函数,将路径名和某个数字(0-255)变换为一个键。#include <sys/ipc.h> // Returns: key if OK, (key_t)−1 on error key_t ftok(const char *path, int id);
path
参数必须引用的是现有的文件,id
参数只使用其低8位。
另外,在创建IPC结构时还需要指定其权限,与文件权限类似,但是不存在执行权限。
注意:
IPC_PRIVATE
只能用于创建新的IPC结构,而不能用来引用一个现有的IPC结构。- 如果希望确保新创建的IPC结构没有引用具有同一标识符的现有IPC结构,则可以在flag中同时指定
IPC_CREAT
和IPC_EXCL
。这样,如果已经存在则会返回EEXIST。
消息队列
消息队列是消息的链接表,存储在内核中,由消息队列标识符标识。以下简称队列。
相关的数据结构很少用到,再次不再列出,后面的信号量和共享存储同理。需要的话可以到对应的头文件中查看。
示例代码参考https://gitee.com/maxiaowei/Linux/blob/master/apue/ch15/ipc_msg.c
msgget
用于创建或打开一个队列。
#include <sys/msg.h>
// Returns: message queue ID if OK, −1 on error
int msgget(key_t key, int flag);
key
参数可以是通过ftok函数生成的,也可以是IPC_PRIVATE。flag
用于设定读写权限,如果是新建该IPC结构则可以添加IPC_CREAT
。
// Returns: 0 if OK, −1 on error
int msgsnd(int msqid, const void *ptr, size_t nbytes, int flag);
msgsnd
将新消息添加到队列尾端。
msqid
是get函数返回的队列ID,nbytes
是消息数据的长度。
ptr
指向一个结构,其包含一个正的消息类型,和消息数据(nbytes
为0则无消息数据),可以定义其结构如下:
struct msgbuf {
long mtype; /* message type, must be > 0 */
char mtext[1]; /* message data, of length nbytes */
};
flag
可以指定为IPC_NOWAIT
,当消息队列满时(或达到系统限制),会立即出错返回EAGAIN。否则,进程会一直阻塞直到:有空间容纳消息;队列被删除(返回EIDRM);或捕捉到信号并从处理程序返回(返回EINTR)。
// Returns: size of data portion of message if OK, −1 on error
ssize_t msgrcv(int msqid, void *ptr, size_t nbytes, long type, int flag);
msgrcv
用于从队列中取出消息,可以指定获取某些类型的数据,而不是必须按照先进先出的次序。
ptr
指向的结构与snd函数一样,而nbytes
则指定了消息长度,如果返回的消息长度>nbytes,而flag
中设置了MSG_NOERROR,则消息被截断。如果没有设置则出错返回E2BIG,而消息仍然留在队列中。
type
可以指定想要获取的消息:
- type==0:返回队列中的第一个消息
- type>0:返回消息类型为type的第一个消息
- type<0:返回消息类型≤type绝对值的消息,如果有若干个满足则取类型最小的。
flag
参数同样可以指定为非阻塞。
// Returns: 0 if OK, −1 on error
int msgctl(int msqid, int cmd, struct msqid_ds *buf );
msgctl
函数对队列执行多种操作。
cmd
参数指定队列需要执行的操作:
- IPC_STAT:获取队列的msqid_ds结构信息,存放于buf指向的结构中
- IPC_SET:将msg_perm.uid,msg_perm.gid,msg_perm.mode和msg_qbytes通过buf复制到队列的msqid_ds结构中。该命令只能由超级用户或者有效用户ID等于msg_perm.cuid或msg_perm.uid的用户执行。
- IPC_RMID:删除队列及其中的数据。也只能由上述的两类用户执行。
这3条命令也适用与信号量(semctl)和共享存储(shmctl)。
信号量
信号量是一个计数器,用于为多个进程提供对共享数据对象的访问。
示例代码:https://gitee.com/maxiaowei/Linux/blob/master/apue/ch15/ipc_sem.c
XSI信号量需要定义为一个或多个信号量的合集,因此在创建的时候需要指明信号量的个数,在使用的时候也要指明用的是哪个信号量。
#include <sys/sem.h>
// Returns: semaphore ID if OK, −1 on error//
int semget(key_t key, int nsems, int flag);
semget
用于创建或打开一个信号量合集。相关参数的与上一节的队列相似,多出来的nsems
用于指定该集合中的信号量数。如果是创建新集合,则需要指定数量;如果是引用现有的集合,则将其设置为0。
int semctl(int semid, int semnum, int cmd, ... /* union semun arg */ );
semctl
包含多种信号量操作。
第4个参数arg
由cmd
的实际值来决定是否使用,注意该参数并不是指针。如果需要使用该参数,其类型需要自己定义,一般定义为如下形式:
union semun {
int val; /* for SETVAL */
struct semid_ds *buf; /* for IPC_STAT and IPC_SET */
unsigned short *array; /* for GETALL and SETALL */
};
参数semnum
用于指定信号量集合中的某个成员,该值在0 ~ nsmes-1之间。
cmd
由如下10个可选项:
- IPC_STAT,IPC_SET,IPC_RMID:与队列类似
- GETVAL,SETVAL:返回/设置(通过arg.val)
semnum
指定的成员的信号量值(semval) - GETPID,GETNCNT,GETZCNT:返回指定成员的sempid,semncnt,semzcnt
- GETALL,SETALL:取/设置所有的信号量值(通过arg.array)
除GETALL以外所有的GET命令都由函数的返回值返回,其他命令则是成功返回0,失败返回-1并设置errno。
// Returns: 0 if OK, −1 on error
int semop(int semid, struct sembuf semoparray[], size_t nops);
semop
函数自动执行信号量集合上的操作数组。
nops
是数组semoparray
的元素个数。
semoparray
是一个信号量操作数组,其中存放每个信号量的操作,其结构如下:
struct sembuf {
unsigned short sem_num; /* member # in set (0, 1, ..., nsems-1) */
short sem_op; /* operation (negative, 0, or positive) */
short sem_flg; /* IPC_NOWAIT, SEM_UNDO */
};
sem_flg
的SEM_UNDO标志标识当进程终止时,该操作修改的信号量值会被恢复,即重新设置为调用该操作之前的数值。
sem_op
可以指定如下3种值:
- 正值,表示进程释放的占用的资源数,
sem_op
值会加到对应的信号量的值上。 - 0,表示进程希望等待该信号量值变为0。IPC_NOWAIT标志可以控制进程是否阻塞,相关的出错返回信息可以查阅手册,此处省略。
- 负值,表示进程想要获取的资源数。如果信号量值≥
sem_op
的绝对值(满足需求),则会从当前的信号量值上减去对应的值,否则由IPC_NOWAIT标志决定进程是否阻塞。
semop
函数具有原子性,即要么执行数组中所有的操作,要么什么也不做。
共享存储
共享存储允许两个或多个进程共享一个给定的存储区。但是,需要注意存储区访问的同步问题,当进程在写入数据时其他进程不应该去读取这些数据。一般使用信号量来解决这一同步问题。
相比与通过文件映射的方式来共享存储区的方式,XSI共享存储没有相关的文件,它共享的是内存的匿名段。
示例代码:https://gitee.com/maxiaowei/Linux/blob/master/apue/ch15/ipc_shm.c
#include <sys/shm.h>
// Returns: shared memory ID if OK, −1 on error
int shmget(key_t key, size_t size, int flag);
shmget
函数用于创建或引用一个共享存储段,在创建时size
指定段的大小(单位是字节),若要引用一个现存的段,则应该设置为0。实现一般将大小向上取整为系统页长的整数倍,若指定的size
不是整数倍,则余下的空间是不可使用的。
// Returns: 0 if OK, −1 on error
int shmctl(int shmid, int cmd, struct shmid_ds *buf );
shmctl
函数对共享存储段执行多种操作。主要有IPC_STAT,IPC_SET和IPC_RMID,相关解释可以参考消息队列部分。
另外,Linux中还额外提供额外的命令支持,可以参考手册shmctl(2) 。
// Returns: pointer to shared memory segment if OK, −1 on error
void *shmat(int shmid, const void *addr, int flag);
shmat
用于将共享存储段连接到进程的地址空间。具体连接到地址空间的什么位置由2、3两个参数决定。
- addr=0,则连接到内核选择的第一个可用地址上。(推荐)
- addr≠0,且
flag
没有指定SHM_RND,那么连接到addr
指定的地址。 - addr≠0,且指定了SHM_RND,那么系统会按照公式(addr-(addr % SHMLBA))决定连接地址。该公式作用是将地址向下取最近的SHMLBA的倍数,而常数SHMLBA表示“低边界地址倍数”。
flag
还可以指定SHM_RDONLY以只读方式连接共享段。
// Returns: 0 if OK, −1 on error
int shmdt(const void *addr);
shmdt
用于分离共享存储段。这一操作不会删除系统中共享存储段的标识符及其数据结构。想要删除对应的数据结构,需要调用shmctl
的IPC_RMID命令。
POSIX信号量
POSIX信号量与XSI信号量最大的不同就是没有信号量集的概念,一次只能操作一个信号量。还有就是在删除信号量时,正在使用XSI信号量的操作会失败;而POSIX信号量的操作会正常执行,直到该信号量的最有一个引用被释放。
POSIX信号量有两种形式:命名的和未命名的。两者的差异在于创建和销毁的形式上,使用的方式是一样的。未命名的信号量只存在于内存中,因此想要使用这些信号量的进程需要有对应的访问权限,如同一进程中的线程,或者是不同进程中映射相同的内存内容到自己的地址空间的线程。而命名信号量可以被任何直到它们名字的进程访问。
示例代码:https://gitee.com/maxiaowei/Linux/blob/master/apue/ch15/ipc_psem.c
创建与销毁
命名信号量
给信号量命名需要遵守一定的规则:
- 名字的第一个字符应该是
/
。因为一般POSIX信号量的实现要使用文件系统。 - 名字不应该包含其他斜杠。
- 名字长度是实现定义的,不应长于_POSIX_NAME_MAX。
#include <semaphore.h>
// Returns: Pointer to semaphore if OK, SEM_FAILED on error
sem_t *sem_open(const char *name, int oflag, ... /* mode_t mode,
unsigned int value */ );
sem_open
用于创建一个新的信号量或使用一个现有的信号量。
当想要使用一个现有的信号量时,只需指定其名字,并将oflag
设为0。
当oflag
包含O_CREAT标志时,如果信号量不存在则会创建新的,如果存在则会被使用,但不会重新初始化。指定此标志时,还需要提供后面的2个参数。mode
指定访问权限,这与打开文件的权限相同;value
指定信号量的初值。
如果oflag
同时指定了O_EXCL标志,则在创建信号量时,如果信号量已经存在就会出错。
// Both return: 0 if OK, −1 on error
int sem_close(sem_t *sem);
int sem_unlink(const char *name);
sem_close
用于关闭一个信号量,释放相关资源。进程退出时如果没有调用该函数,系统也会自动关闭打开的信号量。POSIX信号量没有UNDO机制,所以信号量的值不会受到影响。
sem_unlink
用于销毁信号量,删除信号量的名字。如果没有打开的信号量引用,信号量会被立即销毁,否则会延迟到最后一个打开的引用关闭。
未命名信号量
这种形式的信号量主要用于单个进程。
// Both return: 0 if OK, −1 on error
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_destroy(sem_t *sem);
sem_init
用于创建一个未命名信号量。
-
value
指定其初值。 -
pshared
值为0时,信号量仅在进程的线程之间共享;不为0则表明会在进程之间共享。
sem_destroy
用于销毁未命名信号量。销毁之后不能使用任何带有sem的信号量函数,除非通过sem_init重新初始化它。
信号量操作
与XSI信号量不同,POSIX信号量一次操作只能+1或者-1。
#include <time.h>
// All return: 0 if OK, −1 on error
int sem_trywait(sem_t *sem);
int sem_wait(sem_t *sem);
int sem_timedwait(sem_t *restrict sem,
const struct timespec *restrict tsptr);
这3个函数实现信号量的-1操作。
当信号量计数为0时,使用sem_wait
函数会阻塞,直到成功使信号量-1或者被信号中断;而sem_trywait
会返回-1且设置errno为EAGAIN。
使用sem_timedwait
可以设定等待时间,超时后会返回-1且设置errno为ETIMEOUT。
// Returns: 0 if OK, −1 on error
int sem_post(sem_t *sem);
调用sem_post
会使信号量计数+1。如果有进程被改信号量阻塞,那么进程会被唤醒。
// Returns: 0 if OK, −1 on error
int sem_getvalue(sem_t *restrict sem, int *restrict valp);
sem_getvalue
函数用于获取信号量值,该数值存储在valp
指向的地址处。注意函数返回的数值有可能是过时的。