消息队列: message queue
消息队列是Linux IPC中常用的一种通信方式。
消息队列是随内核的持续性,只要内核没有重新自举,Linux系统没有重启,都是一直存在的。
一 posix --- 消息队列
头文件 #include <mqueue.h>
link with -lrt
1. 创建消息队列
#include <fcntl.h> /* For O_* constants */ #include <sys/stat.h> /* For mode constants */ mqd_t mq_open(const char * name, int oflag); mqd_t mq_open(const char * name, int ofalg, mode_t mode, struct mq_attr * attr); /************************************************************************************************************************** (1) name: 消息队列名字, 以 '/' 开始,不能有其它的‘/’ (2) oflag:决定访问(mq_send/mq_receive)消息队列的方式, O_RDONLY O_WRONLY O_RDWR, 可以用 '|' 设定 O_CREAT O_EXCL O_NONBLOCK , O_CREAT: 如果消息队列不存在,就创建新的消息队列。此时需要参数mode 和 attr。 O_EXCL: 需要和O_CREAT连用,如果消息队列存在,返回失败,用于检测消息队列是否存在。 O_NONBLOCK: 决定向消息队列中写入或读取时采用非阻塞方式,比如消息队列已满,此时写入,立即返回失败;消息队列没有消息,此时读取,立即返回失败。 (3) mode UGO(用户组其他人)权限(读,写,可执行),八进制 0664 (4) attr设置为NULL,则为默认属性 struct mq_attr //至少四个成员 { long mq_flags // 0 或者 O_NONBLOCK, 默认为0 long mq_maxmsg //消息队列最多消息数 ubuntu16.04 默认10 long mq_msgsize //消息队列中,消息的最大字节数 ubuntun16.04 默认8192 (8KB) long mq_curmsgs //消息队列中,当前的消息数 }; (5) 返回值: 成功 返回消息队列描述符, 失败返回-1并设置errno *//***********************************************************************************************************************/
2. 发送消息/接收消息
int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout); ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,unsigned *msg_prio); ssize_t mq_timedreceive(mqd_t mqdes, char * msg_ptr, size_t msg_len, unsigned * msg_prio, const struct timespec * abs_timeout); /**************************************************************************************************************************** 1. mqdes 消息队列描述符
2. msg_ptr 发送消息/接收消息的缓冲区
3. msg_len 消息体的长度, mq_send中msg_len<=mq_msgsize, mq_receive中msg_len>=mq_msgsize.
4. msg_prio 消息的优先级,值越大,优先级越高,最大为MQ_PRIO_MAX(至少32), 同时一个进程可以打开的最大消息队列数目MQ_OPEN_MAX(至少8)
mq_receive返回优先级最高的最早消息. 不需要设置优先级,设为0 或 NULL
5. abs_timeout 绝对时间, mq_timedsend, mq_timedreceive限时发送,接收
6. mq_send 成功返回0, 失败返回-1, mq_receive 成功返回消息字节数,失败返回-1 *//*************************************************************************************************************************/
3. 关闭消息队列
/** 关闭描述符和消息队列的联系 成功返回0,失败返回-1 */ int mq_close(mqd_t mqdes);
4. 删除消息队列
/** 从内核中删除名为name的消息队列. 成功返回0,失败返回-1 */ /** 消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。*/ /** 每个消息队列都有一个保存当前打开着描述符数的引用计数器 */ /** 消息队列的销毁会被推迟到所有的引用都被关闭时执行, mq_unlink不会阻塞 */ int mq_unlink(const char *name);
5. 获取/设置消息队列属性
/** 获取消息队列的属性, 成功返回0, 失败-1 */ int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat); /** 设置消息队列的属性(只有mq_flags有效,其它忽略), 成功返回0, 失败-1 */ int mq_setattr(mqd_t mqdes, const struct mq_attr * mqstat, struct mq_attr * omqstat); /** mqstat新属性, omqstat保存原来的属性 */
6. 通知进程可以接收一条消息
/** 在调用进程注册一个异步通知函数, 成功返回0,失败-1*/ int mq_notify(mqd_t mqdes, const struct sigevent *notification); /*********************************************************** 1. notification为NULL, 并且这个进程之前注册过了通知函数, 则会取消调用进程之前注册的通知 2. notification不为NULL,在调用进程注册一个异步通知函数.消息队列从空到非空,会通知 3. 任何时候, 一个消息队列只能被一个进程注册通知. 如果之前调用进程或者其他进程已经注册过了,那么随后调用本函数注册通知会返回失败.
只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。*//********************************************************/
struct sigevent : 链接
二 System V --- 消息队列
头文件 #include <sys/msg.h>
1.创建消息队列
/** 创建消息队列, 成功返回标识符(非负整数, 可以是0), 失败返回-1 【备注:第一次调用该函数的返回值有可能是0,测试结果为没有错误信号errno】*/ int msgget(key_t key, int msgflg); /****************************************************** 1. key IPC键值, IPC_PRIVATE 或者 其它key值,
不建议使用IPC_PRIVATE,因为最大创建的消息队列数是有限制的,并且消息队列是以内核的持续性,使用IPC_PRIVATE每次都会创建新的消息队列,消耗OS资源. 2. msgflg IPC_CREAT 消息队列不存在就创建(可以和存取权限控制符连用,例如 0664),已存在就打开 IPC_EXCL 和IPC_CREAT连用,消息队列不存在就创建,有就返回一个错误
3. key_t ftok( const char * path, int id );创建一个IPC键值
path: 已存在文件的路径,可以用当前路径 "./"
id: 子序号, 低八位有效,即有效值为1~255,不能为0
path和id都一样,返回的key值也相同 *//***************************************************/
2.发送/接收消息
/*********************************************************** * 消息类型必须是长整型的,而且必须是结构体类型的第一个成员, * 消息类型下面是消息正文,正文可以有多个成员(正文成员可以是任意数据类型的)。 * 至于消息结构体类型叫什么名字,成员叫什么名字,自行定义,没有明文规定。 *//********************************************************/ typedef struct _msg { long mtype; // 消息类型 char mtext[100]; // 消息正文 //…… …… // 消息的正文可以有多个成员 }MSG;
/********************************************************** * 从消息队列中获取消息,获取成功后,该消息从消息队列中删除 * msqid:消息队列标识符 * msgp:存放消息 * msgsz:消息的长度(不包括消息类型的long) * msgtyp:消息类型
msgtyp = 0:返回队列中的第一个消息。
msgtyp > 0:返回队列中消息类型为 msgtyp 的消息(常用)。
msgtyp < 0:返回队列中消息类型值小于或等于 msgtyp 绝对值的消息,如果这种消息有若干个,则取类型值最小的消息。
注意:在获取某类型消息的时候,若队列中有多条此类型的消息,则获取最先添加的消息,即先进先出原则。 * msgflg:一般为0 表示如果没有获取消息后的行为, 以下三种行为可以用 | 一起使用 0:msgrcv() 调用阻塞直到接收消息成功为止. MSG_NOERROR: 若返回的消息字节数比 nbytes 字节数多,则消息就会截短到 nbytes 字节,且不通知消息发送进程。 IPC_NOWAIT: 调用进程会立即返回。若没有收到消息则立即返回 -1。 返回值:成功为读取的消息长度,失败-1 *//********************************************************/ ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg); /********************************************************** * 将消息添加到消息队列中 * msqid:消息队列标识符 * msgp:要发送的消息 * msgsz:消息的长度(不包括消息类型的long) * msgflg:一般为0, IPC_NOWAIT(2048 == 1000 0000 0000) if(msgflg&IPC_NOWAIT ==0) 一直阻塞到条件满足 if(msgflg&IPC_NOWAIT !=0) 消息没有立即发送则调用的进程立即返回 * 返回值:成功为0,失败-1 *//********************************************************/ int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
3.删除消息队列
/*********************************************************** 消息队列控制 * msqid:消息队列标识符 * cmd:IPC_RMID, IPC_STAT, IPC_SET IPC_RMID:删除由 msqid 指示的消息队列,将它从系统中删除并破坏相关数据结构。 IPC_STAT:将 msqid 相关的数据结构中各个元素的当前值存入到由 buf 指向的结构中。相当于,把消息队列的属性备份到 buf 里。 IPC_SET:将 msqid 相关的数据结构中的元素设置为由 buf 指向的结构中的对应值。相当于,消息队列原来的属性值清空,再由 buf 来替换。 * buf:存放或更改消息队列的属性 * 返回值:成功为0,失败-1 *//********************************************************/ int msgctl(int msqid, int cmd, struct msqid_ds *buf);