zoukankan      html  css  js  c++  java
  • Linux进程通信之System V消息队列

    System V消息队列是Open Group定义的XSI,不属于POSIX标准。System V IPC的历史相对很早,在上个世70年代后期有贝尔实验室的分支机构开发,80年代加入System V的系统内核中,后来商用UNIX系统基本都加入了System V IPC的功能。

    System V消息队列相对于POSIX消息队列的区别主要是:

    • POSIX消息队列的读操作总是返回消息队列中优先级最高的最早消息,而对于System V消息队列可以返回任意指定优先级(通过消息类型)的消息。
    • 当向一个空消息队列中写入一个消息时,POSIX消息队列允许产生一个信号或启动一个线程,System V消息队列不提供类似的机制。

    系统内核都会为每一个System V消息队列维护一个信息结构,在Linux 2.6.18中的定义如下:

    <bits/msq.h>
    
    struct msqid_ds
    {
      struct ipc_perm msg_perm;        /*IPC对象的属性信息和访问权限 */
      __time_t msg_stime;              /* time of last msgsnd command */
      __time_t msg_rtime;              /* time of last msgrcv command */
      __time_t msg_ctime;              /* time of last change */
      unsigned long int __msg_cbytes;  /* 当前队列中消息的字节数 */
      msgqnum_t msg_qnum;              /* 当前队列中消息的个数 */
      msglen_t msg_qbytes;             /* 队列允许存放的最大字节数 */
      __pid_t msg_lspid;               /* pid of last msgsnd() */
      __pid_t msg_lrpid;               /* pid of last msgrcv() */
    
    //下面是保留字段
    #if __WORDSIZE == 32
      unsigned long int __unused1;
      unsigned long int __unused2;
      unsigned long int __unused3;
    #endif
      unsigned long int __unused4;
      unsigned long int __unused5;
    };

    消息队列的结构可能的设计如下:


    1 System V消息队列的创建和打开

    System V消息队列的创建和使用会使用下面的函数接口:

    #include <sys/msg.h>
    int msgget(key_t key, int oflg);
                    //成功返回非负消息队列描述符,失败返回-1

    key:消息队列的键,用来创建一个消息队列。System IPC都有一个key,作为IPC的外部标识符,创建成功后返回的描述符作为IPC的内部标识符使用。key的主要目的就是使不同进程在同一IPC汇合。key具体说可以有三种方式生成:

    • 不同的进程约定好的一个值;
    • 通过相同的路径名和项目ID,调用ftok()函数,生成一个键;
    • 还可以设置为IPC_PRIVATE,这样就会创建一个新的,唯一的IPC对象;然后将返回的描述符通过某种方式传递给其他进程;

    oflg:指定创建或打开消息队列的标志和读写权限(ipc_perm中的mode成员)。我们知道System V IPC定义了自己的操作标志和权限设置标志,而且都是通过该参数传递,这和open函数存在差别,open函数第三个参数mode用于传递文件的权限标志。System V IPC的操作标志包含:IPC_CREATIPC_EXCL,权限设置标志如下图:


    下面是创建消息队列的测试代码:

    #include <iostream>
    #include <cstring>
    #include <errno.h>
    
    #include <unistd.h>
    #include <fcntl.h>
    #include <sys/msg.h>
    
    using namespace std;
    
    #define  PATH_NAME "/tmp/anonymQueue"
    
    int main(int argc, char **argv)
    {
        key_t key;
        int fd;
    
        if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
        {
            cout<<"open file "<<PATH_NAME<<"failed.";
            cout<<strerror(errno)<<endl;
            return -1;
        }
        close(fd);
    
        key = ftok(PATH_NAME, 0);
        int msgID;
    
        if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
        {
            cout<<"open message queue failed...";
            cout<<strerror(errno)<<endl;
            return -1;
        }
    
        cout<<"key:0x"<<hex<<key<<endl;
        cout<<"descriptor id:"<<dec<<msgID<<endl;
    }

    Linux 2.6.18下运行结果:

    key:0x8015
    descriptor id:917511

    实现System V IPC的任何系统都提供两个特殊的程序ipcsipcrmipcs输出IPC的各种信息,ipcrm则用于删除各种System V IPC。由于System V IPC不属于POSIX标准,所以这两个命令也未被标准化。下面是通过ipcs命令来查看刚刚创建的消息队列。

    [root@idcserver program]# ipcs -q -i 917511
    Message Queue msqid=917511
    uid=0   gid=0   cuid=0  cgid=0  mode=0666
    cbytes=0        qbytes=65536    qnum=0  lspid=0 lrpid=0
    send_time=Not set                   
    rcv_time=Not set                   
    change_time=Wed Aug  7 16:39:46 2013  

    2 System V消息队列的使用

    System V消息队列的写入消息使用下面的函数接口:

    #include <sys/msg.h>
    int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
                            //成功返回0,失败返回-1

    msqid:消息队列的描述符;

    msgp:指向存放消息的缓冲区,该缓冲区中包含消息类型消息体两部分内容。该缓冲区的结构是由用户定义的,在<sys/msg.h>中有关于该缓冲区结构定义的参版考模

    struct msgbuf
    {
        long int mtype;             /* type of received/sent message */
        char mtext[1];              /* text of the message */
    };

    缓冲区的开头是一个long型的消息类型,该消息类型必须是一个非负数。紧跟在消息类型后面的是消息体部分(如果消息长度大于0),参考模版中定义的mtext只是说明消息体,该部分可以自定义长度。我们自己的应用都会定义特定的消息结构。

    msgsz:缓冲区中消息体部分的长度;

    msgflg:设置操作标志。可以为0IPC_NOWAIT用于在消息队列中没有可用的空间时,调用线程采用何种操作方式

    标志为IPC_NOWAIT,表示msgsnd操作以非阻塞的方式进行,在消息队列中没有可用的空间时,msgsnd操作会立刻返回。并指定EAGAIN错误;

    标志为0,表示msgsnd操作以阻塞的方式进行,这种情况下在消息队列中没有可用的空间时调用线程会被阻塞,直到下面的情况发生:

    • 等到有存放消息的空间;
    • 消息队列从系统中删除,这种情况下回返回一个EIDRM错误;
    • 调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;

    System V消息队列的读取消息使用下面的函数接口:

    #include <sys/msg.h>
    ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);
                          //成功返回接收到的消息的消息体的字节数,失败返回-1

    msqid:消息队列的描述符;

    msgp:指向待存放消息的缓冲区,该缓冲区中将会存放接收到的消息的消息类型消息体两部分内容。该缓冲区的结构是由用户定义的,和msgsnd相对应。

    msgsz:缓冲区中能存放消息体部分的最大长度,这也是该函数能返回的最大数据量;该字段的大小应该是sizeof(msg buffer) - sizeof(long)

    msgtyp:希望从消息队列中获取的消息类型。

    • msgtyp0,返回消息队列中的第一个消息;
    • msgtyp > 0,返回该消息类型的第一个消息;
    • msgtyp < 0,返回小于或等于msgtyp绝对值的消息中类型最小的第一个消息;

    msgflg:设置操作标志。可以为0IPC_NOWAITMSG_NOERROR用于在消息队列中没有可用的指定消息时,调用线程采用何种操作方式

    标志为IPC_NOWAIT,表示msgrcv操作以非阻塞的方式进行,在消息队列中没有可用的指定消息时,msgrcv操作会立刻返回,并设定errnoENOMSG

    标志为0,表示msgrcv操作是阻塞操作,直到下面的情况发生:

    • 消息队列中有一个所请求的消息类型可以获取;
    • 消息队列从系统中删除,这种情况下回返回一个EIDRM错误;
    • 调用线程被某个捕捉到的信号中断,这种情况下返回一个EINTR错误;

    标志为MSG_NOERROR,表示接收到的消息的消息体的长度大于msgsz长度时,msgrcv采取的操作。如果设置了该标志msgrcv在这种情况下回截断数据部分,而不返回错误,否则返回一个E2BIG错误。

    下面是关于消息队列读写的测试代码:

    #include <iostream>
    #include <cstring>
    #include <errno.h>
    
    #include <unistd.h>
    #include <fcntl.h>
    #include <sys/msg.h>
    
    using namespace std;
    
    #define  PATH_NAME "/tmp/anonymQueue"
    
    key_t CreateKey(const char *pathName)
    {
        int fd;
    
        if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
        {
            cout<<"open file "<<PATH_NAME<<"failed.";
            cout<<strerror(errno)<<endl;
            return -1;
        }
    
        close(fd);
    
        return ftok(PATH_NAME, 0);
    }
    
    int main(int argc, char **argv)
    {
        key_t key;
        key = CreateKey(PATH_NAME);
    
        int msgID;
        if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
        {
            cout<<"open message queue failed...";
            cout<<strerror(errno)<<endl;
    
            return -1;
        }
    
        if (fork() == 0)
        {
            char msg[] = "cute yuki...";
            char *msgBuf = new char[sizeof(long) + sizeof(msg)];
            long mtype = 1;
    
            memmove(msgBuf, &mtype, sizeof(mtype));
            memmove(msgBuf + sizeof(mtype), msg, sizeof(msg));
    
            for (int i = 1; i <= 5; ++i)
            {
                if (msgsnd(msgID, msgBuf, sizeof(msg), 0) < 0)
                {
                    cout<<"send message "<<i<<"failed...";
                    cout<<strerror(errno)<<endl;
                    continue;
                }
    
                cout<<"child: send message "<<i<<" success..."<<endl;
                sleep(1);
            }
    
            exit(0);
        }
    
        char msgBuf[256];
        long mtype;
        int recvLen;
    
        for (int i = 1; i <= 5; ++i)
        {
            recvLen = msgrcv(msgID, msgBuf, 256 - sizeof(long), 0, 0);
            if (recvLen < 0)
            {
                cout<<"receive message failed...";
                cout<<strerror(errno)<<endl;
                continue;
            }
    
            memmove(&mtype, msgBuf, sizeof(long));
    
            cout<<"parent receive a message:"<<endl;
            cout<<"message type:"<<mtype<<endl;
            cout<<"message body:"<<msgBuf + sizeof(long)<<endl;
        }
        return 0;
    }

    Linux 2.6.18下的执行结果为:

    child: send message 1 success...
    parent receive a message:
    message type:1
    message body:cute yuki...
    child: send message 2 success...
    parent receive a message:
    message type:1
    message body:cute yuki...
    child: send message 3 success...
    parent receive a message:
    message type:1
    message body:cute yuki...
    child: send message 4 success...
    parent receive a message:
    message type:1
    message body:cute yuki...
    child: send message 5 success...
    parent receive a message:
    message type:1
    message body:cute yuki...

    3 System V消息队列的控制操作

    System V消息队列的删除,属性的设置和获取等控制操作要使用下面的函数接口:

    #include <sys/msg.h>
    int msgctl(int msqid, int cmd, struct msqid_ds *buf);
                            //成功返回0,失败返回-1

    msqid:消息队列的描述符;

    cmd:控制操作的命令,SUS标准提供以下三个命令:

    • IPC_RMID,删除一个消息队列。执行该命令系统会立刻把该消息队列从内核中删除,该消息队列中的所有消息将会被丢弃。这和已经讨论过的POSIX消息队列有很大差别POSIX消息队列通过调用mq_unlink来从内核中删除一个消息队列,但消息队列的真正析构会在最后一个mq_close结束后发生。
    •  IPC_SET,根据buf的所指的值来设置消息队列msqid_ds结构中的msg_perm.uidmsg_perm.gidmsg_perm.modemsg_qbytes四个成员。
    • IPC_STAT,通过buf返回当前消息队列的msqid_ds结构。
    • Linux下还有例如IPC_INFOMSG_INFO等命令,具体可以参考Linux手册;

    buf:指向msqid_ds结构的指针;

    下面是测试代码:

    #include <iostream>
    #include <cstring>
    #include <errno.h>
    
    #include <unistd.h>
    #include <fcntl.h>
    #include <sys/msg.h>
    
    using namespace std;
    
    #define  PATH_NAME "/tmp/anonymQueue"
    
    key_t CreateKey(const char *pathName)
    {
        int fd;
        if ((fd = open(PATH_NAME, O_CREAT, 0666)) < 0)
        {
            cout<<"open file "<<PATH_NAME<<"failed.";
            cout<<strerror(errno)<<endl;
            return -1;
        }
    
        close(fd);
    
        return ftok(PATH_NAME, 0);
    }
    
    int main(int argc, char **argv)
    {
        key_t key;
        key = CreateKey(PATH_NAME);
    
        int msgID;
        if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
        {
            cout<<"open message queue failed...";
            cout<<strerror(errno)<<endl;
            return -1;
        }
    
        msqid_ds msgInfo;
        msgctl(msgID, IPC_STAT, &msgInfo);
    
        cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
        cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
        cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
        return 0;
    }

    Linux 2.6.18下的执行结果为:

    msg_qbytes:65536
    msg_qnum:2
    msg_cbytes:26

    关于消息队列中允许存放最大的字节数可以通过IPC_SET命令进行修改,该修改只能针对本消息队列生效。如下测试代码:

    int main(int argc, char **argv)
    {
        key_t key;
        key = CreateKey(PATH_NAME);
    
        int msgID;
        if ((msgID = msgget(key, IPC_CREAT | 0666)) < 0)
        {
            cout<<"open message queue failed...";
            cout<<strerror(errno)<<endl;
            return -1;
        }
    
        msqid_ds msgInfo;
        msgctl(msgID, IPC_STAT, &msgInfo);
    
        cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
        cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
        cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
    
        msgInfo.msg_qbytes = 6553600;
        if (msgctl(msgID, IPC_SET, &msgInfo) < 0)
        {
            cout<<"set message queue failed...";
            cout<<strerror(errno)<<endl;
            return -1;
        }
    
        msgctl(msgID, IPC_STAT, &msgInfo);
        cout<<"msg_qbytes:"<<msgInfo.msg_qbytes<<endl;
        cout<<"msg_qnum:"<<msgInfo.msg_qnum<<endl;
        cout<<"msg_cbytes:"<<msgInfo.msg_cbytes<<endl;
        return 0;
    }

    Linux 2.6.18下的执行结果为:

    msg_qbytes:65536
    msg_qnum:0
    msg_cbytes:0
    msg_qbytes:6553600
    msg_qnum:0
    msg_cbytes:0

    4 System V消息队列的内核限制

    System V IPC,系统往往会存在一些限制,对于消息队列,在Linux2.6.18中,系统内核存在以下限制:

    [root@idcserver program]# sysctl -a |grep msg
    kernel.msgmnb = 65536 //一个消息队列上允许的最大字节数
    kernel.msgmni = 16 //系统范围内允许存在的最大消息队列数
    kernel.msgmax = 65536     //每个消息的最大字节数

    对于System V消息队列一般内核还有一个限制:系统范围内的最大消息数,在Linux下这个限制由msgmnb*msgmni决定。

    上面已经说过可以通过IPC_SET来设置使用中的消息队列的最大字节数。但是要在系统范围内对内核限制进行修改,在Linux下面可以通过修改/etc/sysctl.conf内核参数配置文件,然后配合sysctl命令来对内核参数进行设置。例如下面示例:

    [root@idcserver program]#echo "kernel.msgmnb = 6553600" >>/etc/sysctl.conf
    [root@idcserver program]#echo "kernel.msgmni = 100" >>/etc/sysctl.conf
    [root@idcserver program]#echo "kernel.msgmax = 6553600" >>/etc/sysctl.conf
    [root@idcserver program]#sysctl -p
    [root@idcserver program]# sysctl -a |grep msg
    kernel.msgmnb = 6553600
    kernel.msgmni = 100
    kernel.msgmax = 6553600

    Aug 8, 2013 AM 08:54 @lab<T...T>
  • 相关阅读:
    LeetCode 515. 在每个树行中找最大值(Find Largest Value in Each Tree Row)
    LeetCode 114. 二叉树展开为链表(Flatten Binary Tree to Linked List)
    LeetCode 199. 二叉树的右视图(Binary Tree Right Side View)
    LeetCode 1022. 从根到叶的二进制数之和(Sum of Root To Leaf Binary Numbers)
    LeetCode 897. 递增顺序查找树(Increasing Order Search Tree)
    LeetCode 617. 合并二叉树(Merge Two Binary Trees)
    LeetCode 206. 反转链表(Reverse Linked List) 16
    LeetCode 104. 二叉树的最大深度(Maximum Depth of Binary Tree)
    LeetCode 110. 平衡二叉树(Balanced Binary Tree) 15
    LeetCode 108. 将有序数组转换为二叉搜索树(Convert Sorted Array to Binary Search Tree) 14
  • 原文地址:https://www.cnblogs.com/pangblog/p/3246577.html
Copyright © 2011-2022 走看看