zoukankan      html  css  js  c++  java
  • Posix消息队列

      消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反。Posix消息队列与System V消息队列的区别如下:
    1. 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。
    2. 当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似的机制。

    Posix消息队列操作函数如下:

    #include    <mqueue.h>
    typedef int mqd_t;
    mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);
    返回: 成功时为消息队列描述字,出错时为-1。   
    功能: 创建一个新的消息队列或打开一个已存在的消息的队列。     

    #include    <mqueue.h>
    int mq_close(mqd_t mqdes);
    返回: 成功时为0,出错时为-1。
    功能: 关闭已打开的消息队列。

    #include    <mqueue.h>
    int mq_unlink(const char *name)
    返回: 成功时为0,出错时为-1
    功能: 从系统中删除消息队列。

    #include    <mqueue.h>
    int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);
    均返回:成功时为0, 出错时为-1

    每个消息队列有四个属性:
    struct mq_attr
    {
        long mq_flags;      /* message queue flag : 0, O_NONBLOCK */
        long mq_maxmsg;     /* max number of messages allowed on queue*/
        long mq_msgsize;    /* max size of a message (in bytes)*/
        long mq_curmsgs;    /* number of messages currently on queue */
    };

    每个消息均有一个优先级,它是一个小于MQ_PRIO_MAX的无符号整数
    #define MQ_PRIO_MAX 32768

    #include    <mqueue.h>
    int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
    返回:成功时为0,出错为-1
    ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);
    返回:成功时为消息中的字节数,出错为-1

    消息队列的限制:
    MQ_OPEN_MAX : 一个进程能够同时拥有的打开着消息队列的最大数目
    MQ_PRIO_MAX : 任意消息的最大优先级值加1

    #include    <mqueue.h>
    int mq_notify(mqd_t mqdes, const struct sigevent *notification);
    返回: 成功时为0,出错时为-1
    功能: 给指定队列建立或删除异步事件通知

    union sigval
    {
        int sival_int;      /* Integer value */
        void *sival_ptr;    /* pointer value */
    };

    struct sigevent
    {
        int     sigev_notify;   /* SIGEV_{ NONE, ISGNAL, THREAD} */
        int     sigev_signo;    /* signal number if SIGEV_SIGNAL */
        union sigval sigev_value;   /* passed to signal handler or thread */
        void    (*sigev_notify_function)(union sigval);
        pthread_attr_t *sigev_notify_attribute;
    }; 

    异步信号安全函数
    #include    <signal.h>
    int sigwait(const sigset_t *set, int *sig);

    Posxi实时信号
    信号可划分为两大小组:
    1. 其值在SIGRTMIN和SIGRTMAX之间(包括两者在内)的实时信号。
    2. 所有其他信号:SIGALRM, SIGINT, SIGKILL等等。

    void func(int signo, siginfo_t *info, void *context);

    typedef struct
    {
        int     si_signo;   /* same value as signo argument */
        int     si_code;    /* SI_{USER, QUEUE, TIMER, ASYNCIO, MESGQ}*/
        union sigval si_value;    /* integer or pointer value from sender */
    } siginfo_t;

    下面采用上面的函数,写程序进程测试。

    程序1:创建一个消息队列,其名字是作为命令行参数指定,消息队列创建成功后输出队列的属性。程序如下:

    View Code
     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <mqueue.h>
     5 #include <fcntl.h>
     6 #include <errno.h>
     7 #include<sys/stat.h>
     8 #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
     9 
    10 int main(int argc,char *argv[])
    11 {
    12     int     c,flags;
    13     mqd_t   mqd;
    14     struct mq_attr  attr;
    15 
    16     flags = O_RDWR|O_CREAT;
    17     printf("create mqueue.\n");
    18     while((c = getopt(argc,argv,"e")) != -1)
    19     {
    20         switch(c)
    21         {
    22             case 'e':
    23                 flags |= O_EXCL;
    24                 break;
    25         }
    26     }
    27     if(optind != argc-1)
    28     {
    29         perror("usage: mqcreate [-e] <name>");
    30         exit(0);
    31     }
    32     if((mqd = mq_open(argv[optind],flags,FILE_MODE,NULL)) == -1)
    33     {
    34         perror("mq_open() error");
    35         exit(-1);
    36     }
    37     mq_getattr(mqd,&attr);
    38     printf("max #msgs = %ld,max #bytes/msg = %ld,#currently on queue = %ld\n",
    39            attr.mq_maxmsg,attr.mq_msgsize,attr.mq_curmsgs);
    40     mq_close(mqd);
    41     exit(0);
    42 }

    本测试是用的Linux Ubuntu系统,编译程序的时候需要添加-lrt连接,如果不加会提示如下错误信息

    因此正确编译的方式如下:

    程序编译完成后,如果直接运行程序则提示mq_open失败,提示mq_open permission denied。解决办法是:

    mkdir /dev/mqueue
    mount -t mqueue none /dev/mqueue

    然后再运行即可看到创建的消息队列。程序结果如下所示:

     程序2:练习mq_send和mq_receive函数,调用mqsend程序向消息队列中写入消息,调用mqreceive程序从消息队列中读取消息。程序如下所示:

    mqsend程序:

    View Code
     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <mqueue.h>
     5 #include <fcntl.h>
     6 #include <errno.h>
     7 #include<sys/stat.h>
     8 typedef unsigned int  uint_t;
     9 
    10 int main(int argc,char *argv[])
    11 {
    12     mqd_t   mqd;
    13     void    *ptr;
    14     size_t  len;
    15     uint_t  prio;
    16     if(argc != 4)
    17     {
    18         printf("usage: mqsend <name> <$bytes> <priority>\n");
    19         exit(0);
    20     }
    21     len = atoi(argv[2]);
    22     prio = atoi(argv[3]);
    23     mqd = mq_open(argv[1],O_WRONLY);
    24     ptr = calloc(len,sizeof(char));
    25     if(mq_send(mqd,ptr,len,prio) == -1)
    26     {
    27         perror("mq_send() error:");
    28         exit(-1);
    29     }
    30     exit(0);
    31 }

    mqreceive程序:

    View Code
     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <mqueue.h>
     5 #include <fcntl.h>
     6 #include <errno.h>
     7 #include<sys/stat.h>
     8 typedef unsigned int  uint_t;
     9 
    10 int main(int argc,char *argv[])
    11 {
    12     int     c,flags;
    13     mqd_t   mqd;
    14     ssize_t n;
    15     uint_t  prio;
    16     void *buff;
    17     struct mq_attr attr;
    18 
    19     flags = O_RDONLY;
    20     while((c = getopt(argc,argv,"n")) != -1)
    21     {
    22         switch(c)
    23         {
    24             case 'n':
    25                 flags |= O_NONBLOCK;  //设置为非阻塞
    26                 break;
    27         }
    28     }
    29     if(optind != argc-1)
    30     {
    31         printf("usage: mqreceive [-n] <name>");
    32         exit(0);
    33     }
    34     mqd = mq_open(argv[optind],flags);
    35     mq_getattr(mqd,&attr);
    36     buff = malloc(attr.mq_msgsize);
    37     if((n = mq_receive(mqd,buff,attr.mq_msgsize,&prio)) == -1)
    38     {
    39         perror("mq_receive error: ");
    40         exit(-1);
    41     }
    42     printf("read %ld bytes,priority = %u\n",(long) n,prio);
    43     exit(0);
    44 }

    程序执行结果如下所示:

    程序3:信号通知函数使用,当有一个消息放置到某个空队列中,该程序产生信号,通知进程消息队列中放入了一个新的消息。程序如下:

    View Code
     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <mqueue.h>
     5 #include <fcntl.h>
     6 #include <errno.h>
     7 #include <signal.h>
     8 
     9 typedef unsigned int  uint_t;
    10 
    11 volatile    sig_atomic_t mqflag;  //全局变量,检查信号的产生
    12 static void sig_usr1(int);
    13 
    14 int main(int argc,char *argv[])
    15 {
    16     mqd_t       mqd;
    17     void        *buff;
    18     ssize_t     n;
    19     sigset_t    zeromask,newmask,oldmask;
    20     struct mq_attr  attr;
    21     struct sigevent sigev;
    22     if(argc != 2)
    23     {
    24         printf("usage :mqnotify <name>");
    25         exit(0);
    26     }
    27     mqd = mq_open(argv[1],O_RDONLY);
    28     mq_getattr(mqd,&attr);
    29     buff = malloc(attr.mq_msgsize);
    30     sigemptyset(&zeromask);
    31     sigemptyset(&newmask);
    32     sigemptyset(&oldmask);
    33     sigaddset(&newmask,SIGUSR1);
    34     signal(SIGUSR1,sig_usr1);
    35     sigev.sigev_notify = SIGEV_SIGNAL;
    36     sigev.sigev_signo = SIGUSR1;
    37     if(mq_notify(mqd,&sigev) == -1)
    38     {
    39         perror("mq_notify error");
    40         exit(-1);
    41     }
    42     for(; ;)
    43     {
    44         sigprocmask(SIG_BLOCK,&newmask,&oldmask);
    45         while(mqflag == 0)
    46             sigsuspend(&zeromask); //挂起,等待
    47         mqflag = 0;
    48         mq_notify(mqd,&sigev);
    49         n = mq_receive(mqd,buff,attr.mq_msgsize,NULL);
    50         printf("read %ld bytes\n",(long) n);
    51         sigprocmask(SIG_UNBLOCK,&newmask,NULL);
    52     }
    53     eixt(0);
    54 }
    55 
    56 static void sig_usr1(int signo)
    57 {
    58     mqflag = 1;
    59     return ;
    60 }

    程序执行结果如下:

    可以使用sigwait函数代替信号处理程序的信号通知,将信号阻塞到某个函数中,仅仅等待该信号的递交。采用sigwait实现上面的程序如下:

    View Code
     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <mqueue.h>
     5 #include <fcntl.h>
     6 #include <errno.h>
     7 #include <signal.h>
     8 
     9 int main(int argc,char *argv[])
    10 {
    11     mqd_t       mqd;
    12     int         signo;
    13     void        *buff;
    14     ssize_t     n;
    15     sigset_t    newmask;
    16     struct mq_attr  attr;
    17     struct sigevent sigev;
    18     if(argc != 2)
    19     {
    20         printf("usage :mqnotify <name>");
    21         exit(0);
    22     }
    23     mqd = mq_open(argv[1],O_RDONLY);
    24     mq_getattr(mqd,&attr);
    25     buff = malloc(attr.mq_msgsize);
    26     sigemptyset(&newmask);
    27     sigaddset(&newmask,SIGUSR1);
    28     sigprocmask(SIG_BLOCK,&newmask,NULL);
    29     
    30     sigev.sigev_notify = SIGEV_SIGNAL;
    31     sigev.sigev_signo = SIGUSR1;
    32     if(mq_notify(mqd,&sigev) == -1)
    33     {
    34         perror("mq_notify error");
    35         exit(-1);
    36     }
    37     for(; ;)
    38     {
    39        sigwait(&newmask,&signo); //阻塞并等待该信号
    40        if(signo == SIGUSR1)
    41        {
    42             mq_notify(mqd,&sigev);
    43             while((n = mq_receive(mqd,buff,attr.mq_msgsize,NULL))>=0)
    44                 printf("read %ld bytes\n",(long) n);
    45             if(errno != EAGAIN)
    46             {
    47                 perror("mq_receive error");
    48                 exit(-1);
    49             }
    50        }
    51     }
    52     eixt(0);
    53 }

     启动线程处理消息通知,程序如下:

    View Code
     1 #include <stdio.h>
     2 #include <stdlib.h>
     3 #include <unistd.h>
     4 #include <mqueue.h>
     5 #include <fcntl.h>
     6 #include <errno.h>
     7 #include <signal.h>
     8 
     9 mqd_t       mqd;
    10 struct mq_attr  attr;
    11 struct sigevent sigev;
    12 static void notify_thread(union sigval);
    13 
    14 int main(int argc,char *argv[])
    15 {
    16 
    17     if(argc != 2)
    18     {
    19         printf("usage :mqnotify <name>");
    20         exit(0);
    21     }
    22     mqd = mq_open(argv[1],O_RDONLY | O_NONBLOCK);
    23     mq_getattr(mqd,&attr);
    24 
    25     sigev.sigev_notify = SIGEV_THREAD;
    26     sigev.sigev_value.sival_ptr = NULL;
    27     sigev.sigev_notify_function = notify_thread;
    28     sigev.sigev_notify_attributes = NULL;
    29 
    30     if(mq_notify(mqd,&sigev) == -1)
    31     {
    32         perror("mq_notify error");
    33         exit(-1);
    34     }
    35     for(; ;)
    36     {
    37         pause();
    38     }
    39     eixt(0);
    40 }
    41 static void notify_thread(union sigval arg)
    42 {
    43     ssize_t     n;
    44     void        *buff;
    45     printf("notify_thread started\n");
    46     buff = malloc(attr.mq_msgsize);
    47     mq_notify(mqd,&sigev);
    48     while((n = mq_receive(mqd,buff,attr.mq_msgsize,NULL))>=0)
    49                 printf("read %ld bytes\n",(long) n);
    50     if(errno != EAGAIN)
    51     {
    52                 perror("mq_receive error");
    53                 exit(-1);
    54     }
    55     free(buff);
    56     pthread_exit(NULL);
    57 }
  • 相关阅读:
    二分图最大匹配的K&#246;nig定理及其证明
    HDOJ 2389 Rain on your Parade
    HDOJ 1083 Courses
    HDOJ 2063 过山车
    POJ 1469 COURSES
    UESTC 1817 Complete Building the Houses
    POJ 3464 ACM Computer Factory
    POJ 1459 Power Network
    HDOJ 1532 Drainage Ditches
    HDU 1017 A Mathematical Curiosity
  • 原文地址:https://www.cnblogs.com/Anker/p/2843832.html
Copyright © 2011-2022 走看看