zoukankan      html  css  js  c++  java
  • Posix消息队列

      它通常用来在不同进程间发送特定格式的消息数据。POSIX消息队列随内核持续性,即使当没有任何进程打开这某个消息队列,该队列上的消息也一直存在,直到调用mq_unlink并让他的引用计数达到0以上才删除队列。

      POSIX消息队列和system V消息队列都不能向接受者准确的表示每条信息的发送者。可以通过管道传递消息描述符

      编译时要链接lrt库即:-lrt(real time 实时库)
    消息队列和管道和FIFO区别

    1. 一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达。管道和FIFO是相反的,进程向其中写消息时,管道和FIFO必需已经打开来读,那么内核会产生SIGPIPE信号
    2. 一个进程在往某消息队列写入消息后, 终止进程. 另一个进程某时刻读出该消息;消息队列是随内核的持续性,即一个进程向消息队列写入消息后,然后终止,另外一个进程可以在以后某个时刻打开该队列读取消息。只要内核没有重新自举,消息队列没有被删除。然而对于管带或FIFO而言, 当管道或FIFO的最后一次关闭发生时,仍在管道或FIFO中的数据将被抛弃(管道和FIFO是随进程的持续性)

    Posix消息队列和System V 消息队列区别

    1. 对Posix消息队列的读取总是返回最高优先级的最早消息; 对System V消息队列的读则可以返回任意指定优先级的消息
    2. 当往一个空队列放置一个消息时, Posix消息队列允许产生一个信号或者启动一个线程; System V则不提供类似机制
    3. posix消息队列的mqd_t类型的句柄可以被select/poll/epoll监听。

    消息队列中的每条消息通常具有以下属性:

    1. 一个表示优先级的整数;
    2. 消息的数据部分的长度(可以为0);
    3. 消息数据本身(如果长度大于0);

      消息队列可认为是一个链表,有足够写权限的线程可往队列中放置消息,有足够读权限的线程可从队列中读走消息,每个消息都是一个记录。在链表头记录最大消息数及每个消息的最大大小

      mqd_t类型不能是一个数组类型,不必像文件描述那样为一个整数。它是指向一个mq_info结构的指针。使用内存映射I/O实现该消息队列。

    #include <bits/mqueue.h>
    typedef int mqd_t;
    
    #include <mqueue.h> 
    mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr *attr */); //成功返回消息队列描述符,失败返回-1,创建消息队列时指定attr属性——最大消息数目和每个消息的最大大小
    mqd_t mq_close(mqd_t mqdes); 
    mqd_t mq_unlink(
    const char *name); //成功返回0,失败返回-1
    1. name:表示消息队列的名字,它符合POSIX IPC的名字规则。POSIX的名字只能以一个’/’开头,名字中不能包含其他的’/’;所创建的POSIX消息队列不会在文件系统中创建真正的路径名
    2. oflag:表示打开的方式,和open函数的类似。有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。
    3. mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。
    4. attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。
    5. mq_close用于关闭一个消息队列,和文件的close类型,关闭后,消息队列并不从系统中删除。一个进程结束,会自动调用关闭打开着的消息队列。该函数也将引用计数减一,变为0附带删除该队列。
    6. mq_unlink用于删除一个消息队列。消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,当消息队列的引用计数大于0时,将其name删除,队列的析构到对后一个mq_close调用时才发生,消息队列的名字也占引用计数,调用该函数从系统删除该名字意味着引用计数将其减一,若变为0真正删除该队列。该函数成功返回0失败返回-1

    mq_open返回值类型:

    1. EACCES]:The message queue exists and the permissions specified by oflag are denied, or the message queue does not exist and permission to create the message queue is denied.
    2. [EEXIST]:O_CREAT and O_EXCL are set and the named message queue already exists.
    3. [EINTR]:函数 mq_open() 被信号中断.
    4. [EINVAL]:函数 mq_open() 不支持参数name指定的路径.
    5. [EINVAL]:设置了O_CREAT标志, 参数3 attr 不是NULL;并且 mq_maxmsg 或 mq_msgsize 小于等于0.
    6. [EMFILE]:本进程中使用了过多的消息队列描述符 或 文件描述符.
    7. [ENFILE]:系统中打开的消息队列数目超过了系统支持的最大数.
    8. [ENOENT]:没有设置O_CREAT标志,并且指定的消息队列不存在.
    9. [ENOSPC]:空间不足,无法创建新的消息队列.
    10. [ENAMETOOLONG]:参数 name 的长度超过系统定义的最大长度.

    flag:

    参数 oflag 表示想要访问(receive/send)消息队列的方式.是由下列宏组合计算出来的, 注意必须包含前3个(访问模式) 中的一个:

    1. O_RDONLY (只读):打开一个消息队列用来接受消息.调用进程可以使用mq_open返回的描述符用于函数 mq_receive(), 但是不能用于函数 mq_send().一个消息队列可以在相同或不同的进程中多次打开用来接收消息.
    2. O_WRONLY (只写):打开一个消息队列用来发送消息.调用进程可以使用mq_open返回的描述符用于函数 mq_send(), 但是不能用于函数 mq_receive().一个消息队列可以在相同或不同的进程中多次打开用来发送消息.
    3. O_RDWR (读写):打开一个消息队列用即可用来发送消息也可以用来接受消息.调用进程可以使用任何支持O_RDONLY 和O_WRONLY 访问模式的函数.一个消息队列可以在相同或不同的进程中多次打开用来发送消息.Any combination of the remaining flags may be specified in the value of oflag:
    4. O_CREAT (创建):创建一个消息队列. 使用这个参数需要追加2个参数: mode(类型是mode_t), 和attr(类型是mq_attr*).,如果name参数指定的消息队列已经存在, 那么这个参数将不起任何作用, 除非像下面 O_EXCL 中提到的情况;否则, 会创建一个空的消息队列. 消息队列的用户ID会被设置成这个进程的实际的用户ID.消息队列的组ID会被设置成这个进程的实际的组ID; 然而, 如果参数name 指向的消息队列在文件系统中可见, 那么组ID会被设置成包含消息队列的目录的组ID.如果参数mode中的位与文件权限中位不同,这种情况是未定义的. 如果 attr 是 NULL, 创建的消息队列是系统实现的默认属性. 如果 attr 不是 NULL 并且调用进程对参数name指定的文件有特定的权限(什么权限),那么消息队列的属性 mq_maxmsg 和 mq_msgsize 会被设置成 attr 中的属性. 属性 mq_flags 和 mq_curmsgs 会被忽略.如果 attr 不是 NULL , 并且调用进程对参数name指定的文件没有特定的权限,函数 mq_open() 会返回失败,不会创建消息队列.
    5. O_EXCL (既存检查):如果 O_EXCL 和O_CREAT同时被设置了, 如果消息队列应经存在,那么函数 mq_open() 会返回失败.提供了检查消息队列是否存在的方法, 如果设置了 O_EXCL 必须同时设置 O_CREAT 否则结果未定义.
    6. O_NONBLOCK (非阻塞):决定函数 mq_send() 和 mq_receive() 在获取当前无法获得的资源或消息时,是一直等待(阻塞), 还是返回失败并将 errno 设置成 [EAGAIN]; 

    消息队列的属性

    #include <mqueue.h> 
    mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr); 
    mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr); //均成功返回0,失败返回-1
    
    #include <bits/mqueue.h> 
    struct mq_attr
    {
        long int mq_flags; /* Message queue flags. 消息队列的标志:0或O_NONBLOCK,用来表示是否阻塞 0 O_NONBLOCK*/
        long int mq_maxmsg; /* Maximum number of messages. 消息队列的最大消息数*/
        long int mq_msgsize; /* Maximum message size. 消息队列中每个消息的最大字节数*/
        long int mq_curmsgs; /* Number of messages currently queued.  消息队列中当前的消息数目*/
        long int __pad[4];
    };
    1. mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。
    2. mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。
    3. mq_maxmsg和mq_msgsize属性只能在创建消息队列时通过mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。 

    发送消息

    #include <mqueue.h>
    mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); //成功返回0,出错返回-1 
    mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); //成功返回接收到消息的字节数,出错返回-1
    

    #ifdef __USE_XOPEN2K mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout); mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout); #endif
    1. mq_send向消息队列中写入一条消息,mq_receive从消息队列中优先级最高的一条消息,且该优先级能随该消息的内容及长度同一返回。
    2. mqdes:消息队列描述符;
    3. msg_ptr:指向消息体缓冲区的指针;
    4. msg_len:消息体的长度,其中mq_receive的该参数不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。如果mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。
    5. msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的无符号数这个数至少为32,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。
    6. 还有两个XSI定义的扩展接口限时发送和接收消息的函数:mq_timedsend和mq_timedreceive函数。默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK。
    //mqsend,c
    #include "unpipc.h"
    #include <mqueue.h>
    
    int main()
    {
        printf("请输入消息队列的名字:");
        char name[MAXLINE];
        fgets(name,MAXLINE,stdin);
        int len=strlen(name);
        if(name[len-1]=='
    ')
            --len;
    
        int flags=O_RDWR|O_CREAT|O_EXCL;
        mqd_t mqd=mq_open(name,flags,FILE_MODE,NULL);
        if(mqd<0)
            if(errno==EEXIST)
            {
                mq_unlink(name);
                mqd=mq_open(name,O_RDWR|O_CREAT,FILE_MODE,NULL);
            }
            else
            {
                printf("open message queue error...
    ");
                return -1;
            }
    
        struct mq_attr attr;
        mq_getattr(mqd,&attr);
        char msg[MAXLINE];
    
        int i;
        for(i=0;i<6;++i)
        {
            printf("请输入第 %d 个消息:",i+1);
            scanf("%s",msg);
            printf("请输入第 %d 个消息的优先级:",i+1);
            int prio;
            scanf("%d",&prio);
    
            if(mq_send(mqd,msg,sizeof(msg),prio)<0)
                printf("send message: %d faild.error info: %s
    ",i,strerror(errno));
            
            printf("send message: %d success.
    ",i+1);
    
        }
        exit(0);
    }
    //mqreceive.c
    #include "unpipc.h"
    #include <mqueue.h>
    
    int main()
    {
        char name[MAXLINE];
        printf("请输入接受消息的消息队列的名字:");
        fgets(name,MAXLINE,stdin);
        int len=strlen(name);
        if(name[len-1]=='
    ')
            --len;
    
        mqd_t mqd=mq_open(name,O_RDONLY);
    
        struct mq_attr attr;
        mq_getattr(mqd,&attr);
        char *buffer=(char *)malloc(sizeof(char)*attr.mq_msgsize);
    
        int i;
        for(int i=0;i<6;++i)
        {
            int prio;
            if(mq_receive(mqd,buffer,attr.mq_msgsize,&prio)<0)
            {
                printf("receive message failed. error info: %s
    ",strerror(errno));
                continue;
            }
            printf("receive message %d: %s, prio is: %d
    ",i+1,buffer,prio);
        }
        mq_close(mqd);
        exit(0);
    }

    消息队列的限制

      POSIX消息队列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize,分别用于限定消息队列中的最大消息数和每个消息的最大字节数。在前面已经说过了,这两个参数可以在调用mq_open创建一个消息队列的时候设定。当这个设定是受到系统内核限制的。
      ulimit -a |grep message

      该大小是整个消息队列的大小,不仅仅是最大消息数*消息的最大大小;还包括消息队列的额外开销。前面我们知道Linux 2.6.18下POSIX消息队列默认的最大消息数和消息的最大大小分别为:

    //在<unistd.h>中定义
    
    mq_maxmsg = 10
    mq_msgsize = 8192 
    MQ_OPEN_MAX //一个进程同时能够拥有的打开的消息队列的最大数目 posix要求至少位为8
    MQ_PRIO_MAX// 一个进程的最大优先值加1 posix要求他至少为32

    mq_notify函数

      当队列为空时,该函数才会响应

      Posix消息队列允许异步事件通知, 以告知何时有一个消息放置到了某个空消息队列中, 以下两种方式可选:

    1. 产生一个信号
    2. 创建一个线程来执行一个指定函数
    mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
    
    struct sigevent
    {
        int sigev_notify; //notification type SIGEV_{NONE,SIGNAL,THREAD}
        int sigev_signo; //signal number if SIGEV_SIGNAL
        union sigval   sigev_value; //signal value
        void (*sigev_notify_function)(union sigval);//passed to signal handeler or thread following two if SIGEV_THREAD
        pthread_attr_t *sigev_notify_attributes;
    }
    
    union sigval
    {
        int sival_int; //integer value
        void *sival_ptr; //pointer value
    }
    1. SIGEV_SIGNAL:发送由evp->sigev_sino指定的信号到调用进程,evp->sigev_value的值将被作为siginfo_t结构体中si_value的值。
    2. SIGEV_NONE:什么都不做,只提供通过timer_gettime和timer_getoverrun查询超时信息。
    3. SIGEV_THREAD:以evp->sigev_notification_attributes为线程属性创建一个线程,在新建的线程内部以evp->sigev_value为参数调用evp->sigev_notification_function。
    4. 把sigev_notify设置成SIGEV_THREAD,这会创建一个新的线程,该线程调用由sigev_notify_function指定的函数,调用的参数由sigev_value指定,新线程属性由sigev_notifyattributes指定,要是默认属性合适的话,他可以是一个空指针。
    5. SIGEV_THREAD_ID:和SIGEV_SIGNAL类似,不过它只将信号发送到线程号为evp->sigev_notify_thread_id的线程,注意:这里的线程号不一定是POSIX线程号,而是线程调用gettid返回的实际线程号,并且这个线程必须实际存在且属于当前的调用进程。
    6. 成功返回0,出错返回-1

    注意:

    1. 如果mq_notify函数的notification非空, 那么当前进程希望在有一个消息到达所指定的先前为空的队列时得到通知. 即"该进程被注册为接收该队列的通知"
    2. 如果notification为空, 而且当前进程目前被注册为接收所指定队列的通知, 那么已存在的注册将被撤销
    3. 任意时刻只能有一个进程可以被注册为接收某个给定队列的通知
    4. 当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知, 只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下, 通知才会发出. 即在mq_receive中的阻塞比任何通知的注册都要优先
    5. 当通知被发送给注册进程时, 其注册就被撤销, 如果想的话, 需要重新注册(所以一般情况下, 都是在信号处理函数中的一开始就再次调用mq_notify进行重新注册)
    6. 每当一个信号产生时,其行为就恢复为某认行为,信号处理程序调用的第一个函数通常时signal,用于重建处理程序,那么这就提供了一个短时间的窗口,他处于该信号产生的当前进程重建信号处理程序之间,这段事件再次产生同一个信号可能终止当前进程,所以每次当前进程发通知后还需重新注册,然而消息队列不同于信号,因为在队列变空之前通知不会再发生,所以应该在从队列中读出消息之前而不是之后重新注册

      版本1:错误,因为在信号处理函数中不能调用mq_notify.mq_receive,printf等。启动第一个mqnotify程序后再启动另外一个会报错:Device or resource busy

    #include "unpipc.h"
    #include "my_err.h"
    #include <mqueue.h>
    #include <unistd.h>
    #include <fcntl.h>
    
    mqd_t mqd;
    struct sigevent sigev;
    struct mq_attr attr;
    void *buff;
    
    static void sig_usr1(int signo)
    {
        ssize_t n;
        mq_notify(mqd,&sigev);
        n=mq_receive(mqd,buff,attr.mq_msgsize,NULL);
        printf("ISGUSR1 receievd read:%ld
    ",(long)n);
        return ;
    }
    
    int main()
    {
        printf("请输入要创建消息队列名:");
    
        char name[MAXLINE];
        fgets(name,MAXLINE,stdin);
        int len=strlen(name);
        if(name[len-1]=='
    ')
            name[len-1]='';
    
        mqd=mq_open(name,O_RDONLY);
        if(mqd<0)
            {
                perror("open message queue error...
    ");
                return -1;
            }
        
        /*sigset_t newmask;
        sigemptyset(&newmask);//清空当前信号集
        sigaddset(&newmask,SIGUSR1);//将SIGUSR1加入当前信号集
        sigprocmask(SIG_BLOCK,&newmask,NULL);//将当前信号集的状态设为阻塞
    */
        signal(SIGUSR1,sig_usr1);
        sigev.sigev_notify=SIGEV_SIGNAL;
        sigev.sigev_signo=SIGUSR1;
        mqd_t t=mq_notify(mqd,&sigev);//当前进程被注册为接收队列的通知
        if(t<0)
        {
            printf("%s
    ",strerror(errno));
            exit(t);
        }
    
        mq_getattr(mqd,&attr);
        buff=(char *)malloc(sizeof(char)*attr.mq_msgsize);
        while(1)
        {
            pause();
        }
        exit(0);
    }

       版本二:让信号吹程序只设置一个全局标志,让某个线程检查该标志以确定何时收到一个消息

      注意:如果在下一个消息被读出之前有两个消息到达(可在mq_notify前加一个sleep模拟),通知只是有一个消息被放置控队列才发出,如果读出第一个消息之前有两个消息到达,那么只有一个通知发出,于是读出第一个消息并调用sigsuspend等待另个消息,它对应的通知永远不会发出,在此期间,另一个消息可能已经防止该队列中继续等待而我们一直忽略它。

    #include "unpipc.h"
    #include "my_err.h"
    #include <mqueue.h>
    #include <unistd.h>
    #include <fcntl.h>
    
    volatile sig_atomic_t flag;
    static void sig_usr1(int signo)
    {
        flag=1;
        return;
    }
    
    int main()
    {
        printf("请输入要创建消息队列名:");
    
        char name[MAXLINE];
        fgets(name,MAXLINE,stdin);
        int len=strlen(name);
        if(name[len-1]=='
    ')
            name[len-1]='';
    
        mqd_t mqd=mq_open(name,O_RDONLY);
        if(mqd<0)
        {
            perror(strerror(errno));
            return -1;
        }
        
        sigset_t newmask,zeromask,oldmask;
        sigemptyset(&newmask);//清空当前信号集
        sigemptyset(&zeromask);
        sigemptyset(&oldmask);
        sigaddset(&newmask,SIGUSR1);//将SIGUSR1加入当前信号集
    
        signal(SIGUSR1,sig_usr1);
        struct sigevent sigev;
        sigev.sigev_notify=SIGEV_SIGNAL;
        sigev.sigev_signo=SIGUSR1;
        mqd_t t=mq_notify(mqd,&sigev);//当前进程被注册为接收队列的通知
        if(t<0)
        {
            perror(strerror(errno));
            return t;
        }
        struct mq_attr attr;
        mq_getattr(mqd,&attr);
        char *buffer=(char *)malloc(sizeof(char)*attr.mq_msgsize);
        while(1)
        {
            //no signal block,
            sigprocmask(SIG_BLOCK,&newmask,&oldmask);
            while(flag==0)
                sigsuspend(&zeromask);
            flag=0;
    
            mq_notify(mqd,&sigev);
            ssize_t n=mq_receive(mqd,buffer,attr.mq_msgsize,NULL);
            printf("read %ld",(long)n);
            //ublock usr1
            sigprocmask(SIG_UNBLOCK,&newmask,NULL);
        }
        exit(0);
    }

      版本三:未解决版本二的问题使用非阻塞模式读消息队列

    #include "unpipc.h"
    #include "my_err.h"
    #include <mqueue.h>
    #include <unistd.h>
    #include <fcntl.h>
    
    int main()
    {
        printf("请输入要创建消息队列名:");
    
        char name[MAXLINE];
        fgets(name,MAXLINE,stdin);
        int len=strlen(name);
        if(name[len-1]=='
    ')
            name[len-1]='';
    
        mqd_t mqd=mq_open(name,O_RDONLY|O_NONBLOCK);
        if(mqd<0)
            {
                perror("open message queue error...
    ");
                return -1;
            }
        
        sigset_t newmask;
        sigemptyset(&newmask);//清空当前信号集
        sigaddset(&newmask,SIGUSR1);//将SIGUSR1加入当前信号集
        sigprocmask(SIG_BLOCK,&newmask,NULL);//将当前信号集的状态设为阻塞
    
        struct sigevent sigev;
        sigev.sigev_notify=SIGEV_SIGNAL;
        sigev.sigev_signo=SIGUSR1;
        mq_notify(mqd,&sigev);//当前进程被注册为接收队列的通知
    
        int signo;
        struct mq_attr attr;
        mq_getattr(mqd,&attr);
        char *buffer=(char *)malloc(sizeof(char)*attr.mq_msgsize);
        while(1)
        {
            sigwait(&newmask,&signo);//把阻塞信号从挂起信号集中删除,解除阻塞,所检测的进程必须是阻塞处理的
            if(signo==SIGUSR1)
            {
                mq_notify(mqd,&sigev);//通知被发送给注册进程时,注册即被撤销,需再次注册
    
                size_t n=mq_receive(mqd,buffer,attr.mq_msgsize,NULL);
                if(n>0)
                    printf("receive mesage is: %s
    ",buffer);
                
                if(n<0)
                {
                    if(errno==EAGAIN)
                        continue;
                }
            }
        }
        exit(0);
    }

      版本四: 也可创建一个管道,当接收到通知时会触发信号处理函数,在信号处理函数中向管道写一个字节数据,当管道中有数据可读时即pipe[0]可读,从管道中读出一个字节数据,然后再读取通知线程的数据。

       代码github:https://github.com/tianzengBlog/test/tree/master/ipc2/mqueue

  • 相关阅读:
    多浏览器CSS样式解决方法
    CSS基础
    HTML页面——常用模块
    CSS选择器
    li水平与div水平
    天梯题目解答——1012、1430
    天梯题目解答——1205、1075、1083、1160
    HTML5 元素拖动
    div简单水平移动效果
    Matlab代码优化--向量化
  • 原文地址:https://www.cnblogs.com/tianzeng/p/9795239.html
Copyright © 2011-2022 走看看