zoukankan      html  css  js  c++  java
  • 进程间通信(二)——Posix消息队列

    1.概述

    消息队列可认为是消息链表。有足够写权限的线程可以往队列中放置消息,有足够读权限的进程可以从队列中取走消息。每个消息是一个记录,由发送着赋予一个优先级。

    在向队列中写入消息时,不需要某个进程在该队列上等待消息到达。这与管道不同,管道必须现有读再有写

    消息队列具有随内核的持续性,与管道不同。进程结束后,消息队列中消息不会消失。当管道最后一次关闭,其中的数据将丢弃。

    消息队列具有名字,可用于非亲缘关系的进程间

    Posix消息队列读总是返回最高优先级的最早消息,而System V消息队列的读可以返回任意优先级的

    空消息队列中放置消息Posix消息队列会产生一个信号或启动一个线程。而System V不提供这个功能。

     

    2.消息队列相关函数

    2.1. 创建,关闭,删除

    #include <mqueue.h>

    mqd_t mq_open(const char *name, int oflag, …

                   /* mode_t mode, struct mq_qttr *attr */);

    返回:成功:消息队列描述符 出错:-1

    用于创建一个新的或者打开一个已存在的消息队列。

    oflag是O_RDONLY, O_WRONLY或O_RDWR之一,可能按位或上O_CREAT, O_EXCL或O_NONBLOCK。

    创建新队列时,mode和attr是必须的。attr=NULL使用默认属性

    mq_open返回的消息队列描述符,它不必是像普通描述符那么样的短整形(很可能不是)。

     

    int mq_close(mqd_t mqdes);

    关闭已打开的消息队列调用进程不能在使用这个描述符了,但是消息队列没有从系统中删除

    进程终止时,所有打开者的消息队列都关闭,就像调用mq_close();

     

    从系统中删除name

    int mq_unlink(const char *name);

    成功:0;出错:-1;

    消息队列保存着当前打开着的文件的引用计数,mq_unlink会删除name,但是消息队列的删除要到最后一个mq_close()发生为止

    消息队列至少具有随内核的持续性。当没有进程打开某个消息队列时,该队列的消息仍然存在,直到mq_unlink并让应用计数为0才删除该队列

     

    2.2. 消息队列属性

    #include <mqueue.h>

    int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

    int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oattr);

    成功:0;失败:-1;

    struct mq_attr {

    long mq_flags;             //message queue flag: 0, O_NONBLOCK

    long mq_maxmsg;        //max number of message allowed on queue

    long mq_msgsize;        //max size of a message (in bytes)

    long mq_curmsgs;       //number of message currlently on queue
    }

    创建新队列时,可以指定mq_maxmsg和mq_msgsize属性,但是mq_open会忽略另外两个参数

    mq_setattr设置队列属性时,只使用mq_flags用于设置或着清除非阻塞标志,另两个忽略。

    消息大小和数目只能在创建队列时指定

     

    2.3. 接受和发送

    #include <mqueue.h>

    int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

    成功:0;出错: -1;

    ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio);

    成功: 消息中字节数; 出错: -1;

    用于向队列中放置消息和从队列中取出消息。

    mq_receive中len不能小于mq_msgsize,否则会返回EMSGSIZE。

    prio为消息优先级,必须小于MQ_PRIO_MAX。

    不需要使用优先级不同的消息,指定mq_send为0优先级,mq_receive优先级为NULL。

     

    3. 异步事件通知

    Posix消息队列允许异步消息通知,即当有一个消息放到某个空消息队列中,会产生某种通知

    (1) 产生一个信号

    (2) 创建一个线程执行指定函数

    #include <mqueue.h>

    int mq_notify(mqd_t mqdes, const struct sigevent *notification);

    成功: 0; 出错:-1;

    #include <signal.h>

    union sigval {

    int sival_int;

    int *sival_ptr;
    }

    struct sigevent {

    int sigev_notify;          //通知方式:SIGEV_{NONE, SIGNAL, THREAD}

    int sigev_signo;          //signal number if SIGEV_SIGNAL

    union sigval sigev_value;//传递给信号或者线程的参数

    void (*sigev_notify_function)(union sigval);

    pthread_attr_t *sigev_notify_attrbutes;
    }

    接受到通知之后,会被复位成默认行为,因此必须再注册

    在队列变空前不会再接受到通知,要保证在读出消息之前重新注册。否则在消息变为空后再注册,这段时间内可能又有消息放到队列中造成不会再通知。

     

    4.通知的处理方式

    4.1. 信号通知

    不可以简单的在信号处理程序中调用mq_receive, 因为它不是异步信号安全函数(可以从信号处理程序中调用的函数)。一般只有sem_pos,read,write是信号安全的。

    不可以简单使用mq_receive接受消息,因为这只能接受到一个消息,其他的消息可能永远不会被取出,应该总是使用非阻塞方式读数据

     

    程序处理方式:

    方法1:使用sigsuspend()等待信号的发生,通过在信号处理程序中设置全局变量,被信号唤醒后再处理数据。

    方法2:由于消息队列描述符不是普通描述符,不可以直接使用select。通过管道和在信号处理程序中调用write唤醒select的方式。

    方法3:通过sigwait()函数等待某个信号发生,然后再处理,类似于select。这是最好的方式。

    #include <signal.h>

    int sigwait(const sigset_t *set, int *sig);

    成功:0; 出错:正的EXXX值。

    函数阻塞到set中某个信号发生,sig返回信号值。

    这叫做“同步地等待一个异步信号”,使用信号却没有涉及异步信号处理

    此函数往往在多线程话程序中使用,有EXXX错误。在多线程话程序中不能使用sigprocmask,要使用pthread_sigmask。

    代码示例:接受进程

    #include "unp.h"
    
    int main(int argc, char **argv)
    {
        //消息队列有名字
        //注意Linux和SUN下,
        //不能打开/tmp/myqueue.1等文件,会出现权限不够
        //注意非阻塞,消息队列都是非阻塞使用
        //NULL:使用默认属性,消息数,消息大小。
        Mq_unlink("/mymqueue");
    mqd_t mqd = Mq_open("/mymqueue", O_RDWR | O_CREAT | O_NONBLOCK, FILE_MODE, NULL); //获取队列大小,buff不能小于消息最大大小 struct mq_attr attr; Mq_getattr(mqd, &attr); char *buff = (char*)Malloc(attr.mq_msgsize); printf("msgsize : %d ", attr.mq_msgsize); //sigwait前要阻塞信号 sigset_t newmask; Sigemptyset(&newmask); Sigaddset(&newmask, SIGUSR1); Sigprocmask(SIG_BLOCK, &newmask, NULL); //信号异步处理 //当队列中有信息时产生相应信号 //注册相关信息 struct sigevent sigev; sigev.sigev_notify = SIGEV_SIGNAL; sigev.sigev_signo = SIGUSR1; Mq_notify(mqd, &sigev); for ( ; ; ) { int signo; //等待信号集中某个信号发生 Sigwait(&newmask, &signo); if (signo == SIGUSR1) { //接受前先注册 Mq_notify(mqd, &sigev); ssize_t n; while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) { printf("read %ld bytes ", n); } if (errno != EAGAIN) { err_sys("mq_reveive() failed"); } } } exit(0); }

    发送进程源码:

    #include "unp.h"
    
    int main(int argc, char **argv)
    {
        int prio = 0;
        if (argc < 2 || argc > 3) {
            err_quit("Usage: a.out <#bytes> [priority]");
        } else if (argc == 3) {
            prio = atoi(argv[2]);
        } 
        int len = atoi(argv[1]);
    
        mqd_t mqd =  Mq_open("/mymqueue", O_WRONLY);
    
        char *ptr = (char*)Calloc(len, sizeof(char));
        Mq_send(mqd, ptr, len, prio);
    
        exit(0);
    }

     

    4.2线程处理方式,这是最好的

    注意sigev的设置。

    示例:

    #include "unp.h"
    
    //用于在线程中接受
    mqd_t mqd;
    //用于获取队列大小
    struct mq_attr attr;
    //用于重新注册
    struct sigevent sigev;
    
    static void notify_thread(union sigval arg);
    
    int main(int argc, char **argv)
    {
        //消息队列有名字
        //最好创建前把名字删了,防止有已存在
        Mq_unlink("/mymqueue");
        mqd = Mq_open("/mymqueue", O_RDWR | O_CREAT | O_NONBLOCK, FILE_MODE, NULL);
        Mq_getattr(mqd, &attr);
    
        //线程异步处理
        sigev.sigev_notify = SIGEV_THREAD;
        sigev.sigev_value.sival_ptr = NULL;    //传给线程处理函数的参数
        sigev.sigev_notify_function = notify_thread;
        sigev.sigev_notify_attributes = NULL;   //默认属性
        Mq_notify(mqd, &sigev);
    
        for ( ; ; ) {
            pause();
        }
    
        exit(0);
    }
    
    static void notify_thread(union sigval arg)
    {
        printf("notify_thread started
    ");
        char *buff = (char*)Malloc(attr.mq_msgsize);
        //接受前重新注册
        Mq_notify(mqd, &sigev);
    
        ssize_t n;
        while ((n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
            printf("read %ld bytes
    ", n);
        }
        if (errno != EAGAIN) {
            err_sys("mq_reveive error");
        }
    
        free(buff);
        pthread_exit(NULL);
    }

     

  • 相关阅读:
    ASP.NET Core MVC Razor小记
    ASP.NET Core引入第三方日志框架及简单实现日志策略配置
    test
    记录一个Windows explorer进程卡死的处理,有关于“MicrosoftWindows.Client.CBS_cw5n1h2txyewy”的,已解决!
    Windows版本sed工具
    相同xml批量创建替换脚本.sh
    springboot1.x apollo 更改属性值不起作用。 ConfigurationProperties
    jmeter固定定时器
    jmeter函数助手参数化
    jmeter循环控制器
  • 原文地址:https://www.cnblogs.com/hancm/p/3885824.html
Copyright © 2011-2022 走看看