zoukankan      html  css  js  c++  java
  • 21消息队列

    消息与消息队列

    IPC  (Inter process communication)

    广义:所有可以用于进程间通信的对象和方法

    狭义:特指消息队列,信号量,共享内存

    消息队列    应用于进程间少量数据的顺序共享

    信号量        应用于进程间互斥

    共享内存    应用与进程间大量数据的随机共享访问

    命令行查询IPC对象

    ipcs [...]

    -q       查询  消息队列

    -s       查询  信号量

    -m      查询  共享内存

    -a       查询所有

    无       查询所有

    命令行删除 IPC对象

    ipcrm [...]

    -q   msqid      删除消息队列

    -m  shmid      删除共享内存

    -s   semid      删除信号量

    创建消息队列的步骤:

    消息队列使用(函数):

    1: ftok        使用某个文件做关键字创建 key  

    2: msgget  使用key创建消息队列 msqid

    3: msgsnd  往消息队列中写入消息

    4: msgrcv   从消息队列中读取消息

    5: msgctl     删除消息队列

    创建 IPC

    <sys/types>

    <sys/ipc.h>

    key_t  ftok(const char *pathname, int id)

    注意:由文件名 和 id 唯一决定一个key,文件名和 id都一样的话,获得的key必然一样,否则,key必然不一样,通过创建一个key,实现 两个进程通信的桥梁关键字。

    该文件也要必须存在,可空白。

    创建消息队列

    <sys/types.h>

    <sys/ipc.h>

    <sys/msg.h>

    int  msgget(key_t key, int msgflg)  

         返回 msqid

    创建消息队列

    IPC_PRIVATE     创建key = 0的消息队列,可以同时存在多个

    IPC_EXCL | IPC_CREAT   确保这个消息队列是新创建的,不与已有的消息队列冲突

    往消息队列发送消息

    int msgsnd(int msqid,  const void *msg, size_t szLen, int msgflg);

    参数解析:

    msg 指向 msgbuf 结构(自定义)

    struct msgbuf {

        long type;            //大于0的数

        char text[N];         //文本大小由 szLen 指定

    }

    msgflag:

         0                       队列满则阻塞

        IPC_NOWAIT    队列满则返回错误 (EAGAIN)

    从消息队列中读取消息

    size_t msgrcv(int msqid, void *msg, size_t sz, long msgtype,  int msgflg)

    参数:

    msg 指向 msg_info 数据结构, 同msgsnd

    msgflg 取值 ( MSG_XXX 需要定义宏: _GNU_SOURCE)

        0                     :默认值,没有消息则阻塞

        IPC_NOWAIT: 没有这个类型的消息,立即返回 (ENOMSG)

        MSG_EXCEPT: 返回队列中第一个不是某类型的消息

    注意:使用MSG_EXCEPT必须加上宏:#define _GNU_SOURCE,并且该宏必须在<sys/msg.h>前面定义。

    MSG_NOERROR:   消息内容大于请求长度,丢弃多余部分

    往消息队列发送消息

    msgsnd 函数调用成功会修改 msqid_ds

    msg_lspid           设置为对应进程PID

    msg_qnum          加 1

    msg_stime          设置为当前时间

    从队列中读取消息

    msgrcv 调用成功后,修改 msqid_ds 结构体

    msg_lrpid:          设置为进程PID

    msg_qnum:        减1

    msg_rtime:         设置为当前是时间

    例子:

    typedef struct tagMsg

    {

          long type;

          char szBuf[1024];

    }MSG_S;

    void testMsgRS()

    {

          int ch;

          fprintf(stderr,r“select  r/w:”);

          scanf(“%c”,&ch);

          if(ch!=’w’&&ch!=’r’)

          {

               return ;

    }

    //创建key

          key_t key=ftok("123",321);

          if(key==-1)

          {

               perror("fail ftok");

               return ;

          }

          printf("key=%d ",key);

          //创建消息队列

          int msqid=msgget(key,IPC_CREAT|0666);

          if(msqid==-1)

          {

               perror("fail msgget!");

               return ;

          }

          printf("msqid=%d ",msqid);

         

          MSG_S msg;

          //read message

          if(rs=='r')

          {

               while(1)

               {

                     memset(&msg,0,sizeof(msg.szBuf));

                     fprintf(stderr,"type:");

                     scanf("%ld",&(msg.type));

                     //read(0,&(msg.type),sizeof(long));

                     int nRet=msgrcv(msqid,&msg,sizeof(msg.szBuf),msg.type,  IPC_NOWAIT);

                     if(nRet<0)

                     {

                          perror("fail msgsnd");

                          break;

                     }

                     printf("receive:%s ",msg.szBuf);

               }

          }

          //send message

          else if(rs=='s')

          {

               fprintf(stderr,"example:1 ABCD! ");

               while(1)

               {

                     memset(&msg,0,sizeof(msg.szBuf));

                     fprintf(stderr,"send:");

                     scanf("%ld%s",&(msg.type),msg.szBuf);

                     //read(0,&(msg.type),msg.szBuf,sizeof(MSG_S));

                     int nRet=msgsnd(msqid,&msg,strlen(msg.szBuf),0);

                     if(nRet<0)

                     {

                          perror("fail msgsnd");

                          break;

                     }

                    

               }

          }

    }

    消息队列设置

    int  msgctl(int msqid,   int cmd,  struct msqid_ds *buf)

    参数:

    cmd 参数

    IPC_STAT      读取内核中 msqid_ds 数据到 buf

    IPC_SET        设置 buf 中数据到 msqid_ds

    IPC_RMID      移除消息队列,读写消息队列进程返回 EIDRM

    IPC_INFO       返回系统级别的消息队列限制,保存到buf,这里 buf 指向 msginfo 结构数据

    MSG_INFO     返回 msginfo 消息,同时获取资源消耗情况:

                            msgpool    系统中存在的消息队列数

                            msgmap    系统中所有消息队列的消息数

                            msgtql        系统中所有消息队列占用的字节数

    MSG_STAT     返回 msginfo 消息

                            msqid 参数使用内核的消息队列信息

    参数三结构体:

    struct msqid_ds {

        struct ipc_perm msg_perm;      //消息队列权限等信息

        time_t                msg_stime;      //msgsnd 最后调用时间     

        time_t                msg_rtime;      //msgrcv 最后调用时间

        time_t                msg_ctime;      //消息队列最后改变的时间

        unsigned long   _msg_cbytes;  //消息队列当前消息总长度

        msgqnum_t        msg_qnum;     //消息队列中消息个数

        msglen_t            msg_qbytes;   //队列能存消息的最大长度

        pid_t                   msg_lspid;       //最后调用 msgsnd 的进程号

        pid_t                  msg_lrpid;        //最后调用 msgrcv 的进程

    }

    struct ipc_perm结构体

    struct ipc_perm{

        key_t  __key;           //msgget 的key参数

        uid_t   uid                //消息队列所有者 euid

        gid_t   gid;               //消息队列所有者 egid

        uid_t   cuid;             //消息队列创建者 euid

        gid_t   cgid;             //消息队列创建者 egid

        unsigned short mode;  //访问权限

        unsigned short __seq;   //序列号

    }

    MSG_INFO     返回 msginfo 消息,结构体:

    struct msginfo {

        int msgpool;       //系统中存在的消息队列数

        int msgmap;       //系统中所有消息队列的消息数

        int msgmax;       //单条消息中最大长度

        int msgmnb;       //消息队列最大可写入字节数

        int msgmni;        //消息队列可容纳消息条数

        int msgssz;        //消息段大小,未使用

        int msgtql;         //系统中所有消息队列占用的字节数

    }

    内核中维护的消息队列数据格式(链表)

    struct msg {

        struct msg *msg_next;          //下一个消息节点

        long            msg_type;          //消息类型

        ushort         msg_ts;              //消息长度

        short           msg_spot;          //消息内容(指针)

    }

    例子:

    #define _GNU_SOURCE 

    #include <stdio.h>

    #include <sys/msg.h>

    #include <string.h>

    #include <stdlib.h>

    #include <sys/ipc.h>

    #include <unistd.h>

    typedef struct tagMsg

    {

          long type;

          char szBuf[1024];

    }MSG_S;

    void testMsgRS(char rs)

    {

          key_t key=ftok("123",321);

          if(key==-1)

          {

               perror("fail ftok");

               return ;

          }

          printf("key=%d ",key);

          int msqid=msgget(key,IPC_CREAT|0666);

          if(msqid==-1)

          {

               perror("fail msgget!");

               return ;

          }

          printf("msqid=%d ",msqid);

         

          MSG_S msg;

          //read message

          if(rs=='r')

          {

               while(1)

               {

                     memset(&msg,0,sizeof(msg.szBuf));

                     fprintf(stderr,"type:");

                     scanf("%ld",&(msg.type));

                     //read(0,&(msg.type),sizeof(long));

                     int nRet=msgrcv(msqid,&msg,sizeof(msg.szBuf),msg.type,  IPC_NOWAIT);

                     if(nRet<0)

                     {

                          perror("fail msgsnd");

                          break;

                     }

                     printf("receive:%s ",msg.szBuf);

               }

          }

          //send message

          else if(rs=='s')

          {

               fprintf(stderr,"example:1 ABCD! ");

               while(1)

               {

                     memset(&msg,0,sizeof(msg.szBuf));

                     fprintf(stderr,"send:");

                     scanf("%ld%s",&(msg.type),msg.szBuf);

                     //read(0,&(msg.type),msg.szBuf,sizeof(MSG_S));

                     int nRet=msgsnd(msqid,&msg,strlen(msg.szBuf),0);

                     if(nRet<0)

                     {

                          perror("fail msgsnd");

                          break;

                     }

               }

          }

    }

    //列举msqid_ds的信息

    void printfInfo(struct msqid_ds *pstMsg)

    {

          printf("--------msg_perm------ ");

          printf("msgget key:%#o ",pstMsg->msg_perm.__key);

          printf("all user euid:%d ",pstMsg->msg_perm.uid);

          printf("all user egid:%d ",pstMsg->msg_perm.gid);

          printf("creater euid:%d ",pstMsg->msg_perm.cuid);

          printf("creater egid:%d ",pstMsg->msg_perm.cgid);

          printf("R+W mode:%#o ",pstMsg->msg_perm.mode);  

          printf("queue numer:%d ",pstMsg->msg_perm.__seq);

          printf(" ");

          printf("msgsnd time:%d ",(int)pstMsg->msg_stime);

          printf("msgrcv time:%d ",(int)pstMsg->msg_rtime);

          printf("change time:%d ",(int)pstMsg->msg_ctime);

          printf("message len:%ld ",pstMsg->msg_cbytes);

          printf("message number:%d ",(int)pstMsg->msg_qnum);

          printf("queue max len:%d ",(int)pstMsg->msg_qbytes);

          printf("msgsnd pid:%d ",(int)pstMsg->msg_lspid);

          printf("msgrcv pid:%d ",(int)pstMsg->msg_lrpid);

          printf(" ");

    }

    //提示

    void ListSel()

    {

          printf(" stat:output message queue infomation ");

          printf(" set:set message queue mode ");

          printf(" exit:delete message queue ");

          printf(" ");

    }

    //消息队列设置

    void testMsgctl()

    {

          key_t key=ftok("123",321);

          if(key==-1)

          {

               perror("fail ftok");

               return ;

          }

          printf("key=%d ",key);

          int msqid=msgget(key,IPC_CREAT|0666);

          if(msqid==-1)

          {

               perror("fail msgget!");

               return ;

          }

          printf("msqid=%d ",msqid);

          ListSel();

          struct msqid_ds stMsg;

         

          char szCmd[128];

          //对消息队列进行操作。显示,修改

          while(1)

          {

               fprintf(stderr,"-->");

               scanf("%s",szCmd);

               //显示

               if(!strcmp(szCmd,"stat"))

               {

                     msgctl(msqid, IPC_STAT,  &stMsg);

                     printfInfo(&stMsg);

               }

               //修改权限

               else if(!strcmp(szCmd,"set"))

               {

                     msgctl(msqid, IPC_STAT,  &stMsg);

                     int mode;              

                     printf("now mode:%d ",stMsg.msg_perm.mode);

                     fprintf(stderr,"input new mode:");

                     scanf("%o",&mode);

                    

    if(mode<0||mode>0777)

                     {

                          fprintf(stderr,"mode is invaid ");

                          continue;

                     }

                     //修改操作

                     stMsg.msg_perm.mode=mode;

                     //修改后更新到消息队列

    int nRet=msgctl(msqid, IPC_SET, &stMsg);

                     if(nRet)

                     {

                          perror("failed IPC_SET");

                          continue;

                     }

                     else

                     {

                          printf("set mode sucess ");

                     }

               }

               //退出

               else if(!strcmp(szCmd,"exit"))

               {

                     break;

               }

          }

         

          //删除消息队列

          fprintf(stderr,"sure to delete this message[y/n]:");

          scanf("%s",szCmd);

         

          if(!strcmp(szCmd,"y"))

          {

               //删除操作

               int nRet=msgctl(msqid, IPC_RMID, &stMsg);

               if(nRet)

               {

                     perror("failed IPC_RMID");

               }

               else

               {

                     printf("delete message queue[%d]sucess! ",msqid);

               }

          }

    }

         

    int main(int argc,char** argv)

    {

          /*//默认参数传入

          if((argc!=2)||(strcmp(argv[1],"s")&&strcmp(argv[1],"r")))

          {

               printf("select [s/r/c]:%s ",argv[1]);

               printf(" s:send/receive message ");

               printf(" r:receive message ");

               printf(" c:control message queue ");    

               return 0;    

          }

         

          if(argv[1][0]=='s'||argv[1][0]=='r')

          {

               testMsgRS(argv[1][0]);

          }

          else

          {

               testMsgctl();

          }*/

         

          printf(" s:send/receive message ");

          printf(" r:receive message ");

          printf(" c:control message queue ");    

          printf("select your choice [s/r/c]:");

          char ch;

          scanf("%c",&ch);

          //读、写队列消息

          if(ch=='s'||ch=='r')

          {

               testMsgRS(ch);

          }

          //修改信息队列权限

          else

          {

               testMsgctl();

          }

          return 0;

    }

  • 相关阅读:
    自动化单元测试
    Exadata是什么?
    Exadata的独门武器卸载(Offloading)
    Exadata中最有用的功能存储索引
    面向对象分析与设计(第3版)
    代码质量(权威精选植根于开发实践的最佳读物)
    温昱谈程序员向架构师转型的规律
    sql语句大全
    一个弹出层的代码
    ASP.NET 2.0 实现伪静态网页方法 (转载 ————续)
  • 原文地址:https://www.cnblogs.com/gd-luojialin/p/9216017.html
Copyright © 2011-2022 走看看