zoukankan      html  css  js  c++  java
  • 《Linux应用进程间通信(三) — 消息队列》

    1.消息队列

      消息队列:提供一种从一个进程向另一个进程发送一个数据块的方法。与FIFO相比,消息队列的优势在于,它独立于发送和接收进程而存在。 

      1.链表式结构组织,存放于内核。

      2.通过队列标识来引用。

      3.通过一个数据类型来索引指定的数据。

      每个队列都有一个msqid_ds结构与其关联“

    struct msqid_ds{
        struct ipc_perm    msg_perm;
        msgqnum_t          msg_qnum;
        msglen_t           msg_qbytes;
        pid_t              msg_lspid;
        pid_t              msg_lrpid;
        ...
    }

    2.msgget函数

    #include <sys/msg.h>
    
    int msgget(key_t key, int msgflg);
    第一个参数key:每一个IPC对象与一个key对应。也可以由函数ftok生成。
    第二个参数msgflg:函数的行为(0666|IPC_CREAT表示用户具有读写权限)
              IPC_CREAT值,若没有该队列,则创建一个并返回新标识符;若已存在,则返回原标识符。
              IPC_EXCL值,若没有该队列,则返回-1;若已存在,则返回0。
    返回值: 成功:非负队列ID; 失败:-1;

       作用 :创建或打开一个现有队列。

      可通过ipcs -q(只查看消息队列的状态):查看系统的IPC状态

    3.msgsnd函数

    #include<sys/msg.h>
    
    int msgsnd(int msqid, const void *msg_ptr, size_t msg_sz, int msgflg);
    
    第一个参数msqid是由msgget函数返回的消息队列标识符;
    第二个参数msg_ptr是一个指向准备发送消息的指针,消息必须像刚才说的那样以一个长整型成员变量开始;
    第三个参数msg_sz是msg_ptr指向的消息的长度。这个长度不能包括长整型消息类型成员变量的长度;
    第四个参数msgfig:
      0:当消息队列满时,msgsnd将会阻塞,直到消息能写进消息队列。
      IPC_NOWAIT:当消息队列已满的时候,msgsnd函数不等待立即返回。
      IPC_NOERROR:若发送的消息大于size字节,则把该消息截断,截断部分将被丢弃,且不通知发送进程
                 返回值: 成功:0; 失败:-1;

      作用:把消息添加到消息队列中。

      消息的结构受两方面的约束。首先,它的长度必须小于系统规定的上限;其次,它必须以一个长整型成员变量开始。接收函数将用这个成员变量来确定消息的类型。当使用消息时,最好把消息结构定义为下面这样:

      struct my_message{

        long int message_type;   /* 必须大于0,消息类型 */

        char mtext[512];    //假如我们要发送512个字节的数据

      }

      由于在消息的接收中要用到message_type,所以你不能忽略它。你必须在声明自己的数据结构时包含它,并且最好将它初始化为一个已知值。

      msgsnd()解除阻塞的条件有以下三个条件:

    ①    不满足消息队列满或个数满两个条件,即消息队列中有容纳该消息的空间。

    ②    msqid代表的消息队列被删除。

    ③    调用msgsnd函数的进程被信号中断。

    4.msgrcv函数

    #include<sys/msg.h>
    
    int msgrcv(int msqid, void *msg_ptr, size_t msg_sz, long int msgtype, int msgflg);
    第一个参数msqid是由msgget函数返回的消息队列标识符;
    第二个参数msg_ptr是一个指向准备接收消息的指针,消息必须像前面msgsnd函数中介绍的那样以一个长整型成员变量开始。
    第三个参数msg_sz是msg_ptr指向的消息的长度,它不包括长整型消息类型成员变量的长度。
    第四个参数msgtype是一个长整型,它可以实现一种简单形式的接收优先级。如果msgtype的值为0,就获取队列中的第一个可用消息。如果它的值大于零,将获取具有相同消息类型的第一个消息。如果它的值小于零,将获取消息类型等于或小于msgtype的绝对值的第一个消息。
    这个函数看起来好像很复杂,但实际应用很简单。如果只想按照消息发送的顺序来接收它们,就把msgtype设置为0。如果只想获取某一特定类型的消息,就把msgtype设置为相应的类型值。如果想接收类型等于或小于n的消息,就把msgtype设置为-n。
    第五个参数msgflg:
      0: 阻塞式接收消息,没有该类型的消息msgrcv函数一直阻塞等待。
      IPC_NOWAIT:如果没有返回条件的消息调用立即返回,此时错误码为ENOMSG。
      IPC_EXCEPT:与msgtype配合使用返回队列中第一个类型不为msgtype的消息。
      IPC_NOERROR:如果队列中满足条件的消息内容大于所请求的size字节,则把该消息截断,截断部

    成功:实际读取到的消息数据长度。 出错:-1,错误原因存于error中。

      作用:从队列中取用消息。

      其中对第四个参数举例说明:消息队列中要求通过一个数据类型来索引指定的数据。也就是在msgsng发送数据前会对long int message_type变量进行复制。通过这个变量来标识数据。而且这个变量必须要大于0.

      那么在msgrcv中,对第四个参数msgtype进行选值时。

      如果等于0,则按照队列的顺序去取,先到先取的原则。

      如果大于0,假如msgtype=1。就会取找队列中message_type等于1的队列的排在第一个的消息。

      如果小于0,假如msgtype=-2。就会-2取绝对值,也就是等于2。再取队列中message_type等于或者小于2的排在第一个的消息。

      msgrcv()解除阻塞的条件有以下三个:

    ①    消息队列中有了满足条件的消息。

    ②    msqid代表的消息队列被删除。

    ③    调用msgrcv()的进程被信号中断。

        

    5.msgctl函数

    #include<sys/msg.h>
    
    int msgctl(int msqid, int command, struct msqid_ds *buf);
    第一个参数msqid是由msgget返回的消息队列标识符;
    第二个参数command是将要采取的动作,它可以取3个值。
    IPC_STAT  取出队列的msqid_ds结构,并将它存放在buf指向的结构中。   
    IPC_SET   如果进程有足够的权限,将buf指向的结构复制到与这个队列相关的msqid_ds结构中。
    IPC_RMID  从系统中删除该消息队列以及仍在该队列中的所有数据。这种删除立即生效。仍在使用这一消息队列的其他进程在它们下一次试图对此队列操作,将得到EIDRM错误。
    
    返回值: 成功:0; 失败:-1

    6.实例

    send.c

    /*send.c*/  
    #include <stdio.h>   
    #include <sys/types.h>   
    #include <sys/ipc.h>   
    #include <sys/msg.h>   
    #include <errno.h>   
    
    #define MSGKEY 1024   
      
    struct msgstru  
    {  
       long msgtype;  
       char msgtext[2048];   
    };  
      
    main()  
    {  
      struct msgstru msgs;  
      int msg_type;  
      char str[256];  
      int ret_value;  
      int msqid;  
      
      msqid=msgget(MSGKEY,IPC_EXCL);  /*检查消息队列是否存在*/  
      if(msqid < 0){  
        msqid = msgget(MSGKEY,IPC_CREAT|0666);/*创建消息队列*/  
        if(msqid <0){  
        printf("failed to create msq | errno=%d [%s]
    ",errno,strerror(errno));  
        exit(-1);  
        }  
      }   
       
      while (1){  
        printf("input message type(end:0):");  
        scanf("%d",&msg_type);  
        if (msg_type == 0)                   //为什么msg_type=0要直接break?因为规定msg_type必须要大于0
           break;  
        printf("input message to be sent:");  
        scanf ("%s",str);  
        msgs.msgtype = msg_type;  
        strcpy(msgs.msgtext, str);  
        /* 发送消息队列 */  
        ret_value = msgsnd(msqid,&msgs,sizeof(struct msgstru),IPC_NOWAIT);     //疑问:sizeof(struct msgsrtu)包括了长整型的长度,不是说不包括吗
        if ( ret_value < 0 ) {  
           printf("msgsnd() write msg failed,errno=%d[%s]
    ",errno,strerror(errno));  
           exit(-1);  
        }  
      }  
      msgctl(msqid,IPC_RMID,0); //删除消息队列   
    }

    receive.c

    /*receive.c */  
    #include <stdio.h>   
    #include <sys/types.h>   
    #include <sys/ipc.h>   
    #include <sys/msg.h>   
    #include <errno.h>   
      
    #define MSGKEY 1024   
      
    struct msgstru  
    {  
       long msgtype;  
       char msgtext[2048];  
    };  
      
    /*子进程,监听消息队列*/  
    void childproc()
    { struct msgstru msgs; int msgid,ret_value; char str[512]; while(1){ msgid = msgget(MSGKEY,IPC_EXCL );/*检查消息队列是否存在 */ if(msgid < 0){ printf("msq not existed! errno=%d [%s] ",errno,strerror(errno)); sleep(2); continue; } /*接收消息队列*/ ret_value = msgrcv(msgid,&msgs,sizeof(struct msgstru),0,0); printf("text=[%s] pid=[%d] ",msgs.msgtext,getpid()); } return; } void main() { int i,cpid; /* create 5 child process */ for (i=0;i<5;i++){ cpid = fork(); if (cpid < 0) printf("fork failed "); else if (cpid ==0) /*child process*/ childproc(); } }

      注意,消息队列是由发送端也就是写端创建的。(为什么要由写端创建?)

      其中msgrcv和msgsng中sizeof(struct msgstru)是错误的,应该是sizeof(struct msgstru) - sizeof(long)。

    7.msgctl.c实例

    msgctl.c源代码如下:

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <error.h>
    
    #define TEXT_SIZE  512
    struct msgbuf
    {
        long mtype;
        char mtext[TEXT_SIZE];
    };
    
    int main(int argc, char **argv)
    {
        int msqid ;
        struct msqid_ds info ;
        struct msgbuf buf ;
        struct msgbuf buf1 ;
        int flag ;
        int sendlength, recvlength ;
     
        msqid = msgget( IPC_PRIVATE, 0666 ) ;
        if ( msqid < 0 )
        {
            perror("get ipc_id error") ;
            return -1 ;
        }
     
        buf.mtype = 1 ;
        strcpy(buf.mtext, "happy new year!") ;
        sendlength = sizeof(struct msgbuf) - sizeof(long) ;
        flag = msgsnd( msqid, &buf, sendlength , 0 ) ;
        if ( flag < 0 )
        {
            perror("send message error") ;
            return -1 ;
        }
        buf.mtype = 3 ;
        strcpy(buf.mtext, "good bye!") ;
        sendlength = sizeof(struct msgbuf) - sizeof(long) ;
        flag = msgsnd( msqid, &buf, sendlength , 0 ) ;
        if ( flag < 0 )
        {
            perror("send message error") ;
            return -1 ;
        }
     
        flag = msgctl( msqid, IPC_STAT, &info ) ;
        if ( flag < 0 )
        {
            perror("get message status error") ;
            return -1 ;
        }
        printf("uid:%d, gid = %d, cuid = %d, cgid= %d
    " ,
               info.msg_perm.uid,  info.msg_perm.gid,  info.msg_perm.cuid,  info.msg_perm.cgid  ) ;
        printf("read-write:%03o, cbytes = %lu, qnum = %lu, qbytes= %lu
    " ,
               info.msg_perm.mode&0777, info.msg_cbytes, info.msg_qnum, info.msg_qbytes ) ;
        system("ipcs -q") ;
        recvlength = sizeof(struct msgbuf) - sizeof(long) ;
        memset(&buf1, 0x00, sizeof(struct msgbuf)) ;
     
        flag = msgrcv( msqid, &buf1, recvlength ,3,0 ) ;
        if ( flag < 0 )
        {
            perror("recv message error") ;
            return -1 ;
        }
        printf("type=%ld, message=%s
    ", buf1.mtype, buf1.mtext) ;
     
        flag = msgctl( msqid, IPC_RMID,NULL) ;
        if ( flag < 0 )
        {
            perror("rm message queue error") ;
            return -1 ;
        }
        system("ipcs -q") ;
     
       return 0 ;
    }

    编译 gcc msgctl.c –o msgctl。

    执行 ./msg,执行结果如下:

    uid:1008, gid = 1003, cuid = 1008, cgid= 1003

    read-write:666, cbytes = 1024, qnum = 2, qbytes= 163840

    ------ Message Queues --------

    key                 msqid      owner      perms      used-bytes   messages   

    0x00000000   65536      zjkf          666          1024            2      

    type=3, message=good bye!

    ------ Message Queues --------

    key        msqid      owner      perms      used-bytes   messages

    8.两进程通过消息队列收发消息

    (1)发送消息队列程序

    msgsnd.c源代码如下:

    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <time.h>
    
    #define TEXT_SIZE  512
    struct msgbuf
    {
        long mtype ;
        int  status ; 
        char time[20] ;
        char mtext[TEXT_SIZE] ;
    }  ;
    char  *getxtsj()
    {  
        time_t  tv ;
        struct  tm   *tmp ;
        static  char  buf[20] ;
        tv = time( 0 ) ;
        tmp = localtime(&tv) ;
        sprintf(buf,"%02d:%02d:%02d",tmp->tm_hour , tmp->tm_min,tmp->tm_sec);
        return   buf ;
    }
    
    int main(int argc, char **argv)
    {
        int msqid ;
        struct msqid_ds info ;
        struct msgbuf buf ;
        struct msgbuf buf1 ;
        int flag ;
        int sendlength, recvlength ;
        int key ;
     
        key = ftok("msg.txt", 0x01 ) ;
        if ( key < 0 )
        {
            perror("ftok key error") ;
            return -1 ;
        }
     
        msqid = msgget( key, 0600|IPC_CREAT ) ;
        if ( msqid < 0 )
        {
            perror("create message queue error") ;
            return -1 ;
        }
     
        buf.mtype = 1 ;
        buf.status = 9 ; 
        strcpy(buf.time, getxtsj()) ;
        strcpy(buf.mtext, "happy new year!") ;
        sendlength = sizeof(struct msgbuf) - sizeof(long) ;
        flag = msgsnd( msqid, &buf, sendlength , 0 ) ;
        if ( flag < 0 )
        {
            perror("send message error") ;
            return -1 ;
        }
        buf.mtype = 3 ;
        buf.status = 9 ; 
        strcpy(buf.time, getxtsj()) ;
        strcpy(buf.mtext, "good bye!") ;
        sendlength = sizeof(struct msgbuf) - sizeof(long) ;
        flag = msgsnd( msqid, &buf, sendlength , 0 ) ;
        if ( flag < 0 )
        {
            perror("send message error") ;
            return -1 ;
        }
        system("ipcs -q") ;
       return 0 ;
    }

    (2)接收消息队列程序

    msgrcv.c源代码如下:

    #include <stdio.h>
    #include <string.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    
    #define TEXT_SIZE  512
    struct msgbuf
    {
        long mtype ;
        int  status ; 
        char time[20] ;
        char mtext[TEXT_SIZE] ;
    };
    int main(int argc, char **argv)
    {
        int msqid ;
        struct msqid_ds info ;
        struct msgbuf buf1 ;
        int flag ;
        int  recvlength ;
        int key ;
        int mtype ;
     
        key = ftok("msg.txt", 0x01 ) ;
        if ( key < 0 )
        {
            perror("ftok key error") ;
            return -1 ;
        }
     
        msqid = msgget( key, 0 ) ;
        if ( msqid < 0 )
        {
            perror("get ipc_id error") ;
            return -1 ;
        }
     
        recvlength = sizeof(struct msgbuf) - sizeof(long) ;
        memset(&buf1, 0x00, sizeof(struct msgbuf)) ;
        mtype = 1 ;
        flag = msgrcv( msqid, &buf1, recvlength ,mtype,0 ) ;
        if ( flag < 0 )
        {
            perror("recv message error") ;
            return -1 ;
        }
        printf("type=%ld,time=%s, message=%s
    ", buf1.mtype, buf1.time,  buf1.mtext) ;
        system("ipcs -q") ;
        return 0 ;
    }

    (3)编译与执行程序

    ①    在当前目录下利用>msg.tmp建立空文件msg.tmp。

    ②    编译发送消息队列程序 gcc msgsnd.c -o  msgsnd。

    ③    执行./msgsnd,执行结果如下:

      ----- Message Queues --------

    key                  msqid        owner      perms      used-bytes   messages   

    0x0101436d    294912      zjkf          600          1072            2          

    ④    编译接收消息程序 gcc msgrcv.c -o msgrcv

    ⑤    执行./msgrcv,执行结果如下:

     type=1,time=03:23:16, message=happy new year!

    ------ Message Queues --------

    key                  msqid        owner      perms      used-bytes   messages   

    0x0101436d    294912     zjkf           600          536              1         

    ⑥    利用ipcrm -q 294912删除该消息队列。因为消息队列是随内核持续存在的,在程序中若不利用msgctl函数或在命令行用ipcrm命令显式地删除,该消息队列就一直存在于系统中。另外信号量和共享内存也是随内核持续存在的。

    更加详细参考:https://www.cnblogs.com/kratosBJ/p/9530097.html

    测试:两个进程都创建消息队列会发生什么?

  • 相关阅读:
    MySQL8 Keepalived+双主
    Last_SQL_Errno: 1050 Last_SQL_Error: Error 'Table 'events' already exists' on query. Default database: 'eygle'. Query: 'create table events
    [MY-011522] [Repl] Plugin group_replication reported: 'The member contains transactions not present in the group.
    MYSQL8 裸机搭主从
    MY-011292 MY-011300 MY-013597 MY-011300
    RHLE8 docker安装
    docker harbor x509: certificate signed by unknown authority action: push: unauthorized to access repository
    harbor配置https访问
    action: push: unauthorized to access repository
    unlock DDIC for HANADB
  • 原文地址:https://www.cnblogs.com/zhuangquan/p/13153988.html
Copyright © 2011-2022 走看看