zoukankan      html  css  js  c++  java
  • posix 消息队列

    注意

    在涉及到posix消息的函数时, gcc 编译时要加-lrt参数, 如
    gcc -lrt unpipc.c mqpack.c send.c -o send
    gcc -lrt unpipc.c mqpack.c receive.c -o receive

    posix消息的name必须以/开头, 例
    ./send /msgname

    基本函数

    #include <mqueue.h>
    mqd_t mq_open(const char *name,int oflag, ...
        /* mode_t mode, struct mq_attr *attr */);
    int mq_close(mqd_t mqdes);
    int mq_unlink(const char *name);
    int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    int mq_setattr(mqd_t mqdes,const struct mq_attr *attr,struct mq_attr *oattr);
    int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);
    ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);
    int mq_notify(mqd_t mqdes, const struct sigevent *notification);
     
    #include <signal.h>
    int sigwait(const sigset_t *set, int *sig);
    

    posix消息队列的读写可以想像成文件的读写:
    mq_close用于关闭mqd/关闭文件fd
    mq_unlink用于删除消息队列/删除文件
    mq_send发送消息到消息队列/写入文件
    mq_receive从消息队列中取出消息/读取文件
    mq_getattr获取消息属性/获取文件信息
    mq_notify当队列中的消息数量由0到1时产生指定的信号/IO同步

    mq_notify:
    1.任意时候只有一个进程可以被注册为接收通知
    2.当通知被发送到注册进程时,其注册即被撤销,就像老式的signal一样
    3.当消息到达队列后再进行注册的话,不会产生消息
    4.只有在消息队列的数量从0到1时才产生

    sigwait:
    1.sigwait是与当前进行的屏蔽信号函数sigprocmask搭配使用的
    2.sigwait挂起线程,直到有感兴趣的信号到达,而这个信号也属于当前进程的屏蔽信号集
    3.sigwait将信号从未决信号中删除,且不会调用信号处理函数,第二个参数返回该信号值
    4.与sigsuspend不同的是,sigwait并不改变当前进程的信号屏蔽集,也不调用处理函数

    包裹函数

    mqpack.h

    #ifndef _MQPACK_H
    #define _MQPACK_H
     
    #include "unpipc.h"
    #include <mqueue.h>
    mqd_t Mq_open(const char *name,int oflag,mode_t mode,struct mq_attr *attr);
    void Mq_close(mqd_t mqd);
    void Mq_unlink(const char *name);
    void Mq_getattr(mqd_t mqd,struct mq_attr *attr);
    void Mq_setattr(mqd_t mqd,struct mq_attr *attr,struct mq_attr *oattr);
    void Mq_send(mqd_t mqd,const char *ptr,size_t len,unsigned int prio);
    ssize_t Mq_receive(mqd_t mqd,char *ptr,size_t len,unsigned int *priop);
    void Mq_notify(mqd_t mqd,const struct sigevent *notification);
     
    #endif
    

    mqpack.c

    #include "mqpack.h"
     
    mqd_t Mq_open(const char *name,int oflag,mode_t mode,struct mq_attr *attr){
        mqd_t mqd=mq_open(name,oflag,mode,attr);
        if(mqd == -1)
            err_quit("mq_open error");
        return(mqd);
    }
    void Mq_close(mqd_t mqd){
        if(mq_close(mqd) == -1)
            err_quit("mq_close error");
    }
    void Mq_unlink(const char *name){
        if(mq_unlink(name) == -1)
            err_quit("mq_unlink error");
    }
    void Mq_getattr(mqd_t mqd,struct mq_attr *attr){
        if(mq_getattr(mqd,attr) == -1)
            err_quit("mq_getattr error");
    }
    void Mq_setattr(mqd_t mqd,struct mq_attr *attr,struct mq_attr *oattr){
        if(mq_setattr(mqd,attr,oattr) == -1)
            err_quit("Mq_setattr error");
    }
    void Mq_send(mqd_t mqd,const char *ptr,size_t len,unsigned int prio){
        if(mq_send(mqd,ptr,len,prio) == -1)
            err_quit("mq_send error");
    }
    ssize_t Mq_receive(mqd_t mqd,char *ptr,size_t len,unsigned int *priop){
        size_t n=mq_receive(mqd,ptr,len,priop);
        if((-1 == n) && (errno != EAGAIN))
            err_quit("mq_receive error");
        return (n);
    }
    void Mq_notify(mqd_t mqd,const struct sigevent *notification){
        if(mq_notify(mqd,notification) == -1)
            err_quit("mq_notify error");
    }
    

    例子

    编译和执行步骤就是文章开头的那三条命令
    send.c

    #include "unpipc.h"
    #include "mqpack.h"
     
    int main(int argc,char *argv[]){
        if(argc != 2)
            err_quit("usage: /send <name>");
        char buff[1024];
        mqd_t mqd;
        struct mq_attr attr;
        mqd=Mq_open(argv[1],O_WRONLY|O_CREAT,0644,NULL);
     
        if(Fork() == 0){
            if(execl("./receive","./receive",argv[1],(char *)0) == -1)
                err_quit("execl error");
        }
     
        sleep(2);
     
        while(1){
            Fgets(buff,1024,stdin);
            if(buff[strlen(buff)-1] == '
    ') //去掉buff中的换行符
                buff[strlen(buff)-1]='';
            Mq_send(mqd,buff,strlen(buff),0);
        }
        Mq_close(mqd);
        exit(0);
    }
    

    receive.c

    #include "unpipc.h"
    #include "mqpack.h"
     
    void print_info(mqd_t mqd,struct mq_attr *attr){
        if(mq_getattr(mqd,attr) == -1)
            err_quit("mq_getn error");
        printf("max num of maxmsg = %ld,max size of msg = %ld
    ",attr->mq_maxmsg,attr->mq_msgsize);
        printf("current mum of maxmsg = %ld
    ",attr->mq_curmsgs);
    }
     
    int main(int argc,char *argv[]){
        int signo;
        mqd_t mqd;
        void *buff;
        ssize_t n;
        sigset_t newmask;
        struct mq_attr attr;
        struct sigevent sigev;
     
        if(argc != 2)
            err_quit("usage mqtest2 <name>");
        mqd=Mq_open(argv[1],O_RDONLY|O_NONBLOCK,0,NULL);
        Mq_getattr(mqd,&attr);
        buff=Malloc(attr.mq_msgsize);
        print_info(mqd,&attr);
     
        Sigemptyset(&newmask);
        Sigaddset(&newmask,SIGUSR1);
        Sigprocmask(SIG_BLOCK,&newmask,NULL);
     
        sigev.sigev_notify=SIGEV_SIGNAL;
        sigev.sigev_signo=SIGUSR1;
        Mq_notify(mqd,&sigev);
     
        for(;;){
            Sigwait(&newmask,&signo);
            if(signo == SIGUSR1){
                Mq_notify(mqd,&sigev);
                while((n=Mq_receive(mqd,buff,attr.mq_msgsize,NULL))>0)
                    printf("read %ld bytes
    ",(long)n);
                //if(errno != EAGAIN)
                //   err_quit("mq_receive error");
            }
        }
    }
    
  • 相关阅读:
    SpringBoot 调用 K8s metrics-server
    Kubernetes 实战——有状态应用(StatefulSet)
    Kubernetes 实战——升级应用(Deployment)
    Kubernetes 实战——发现应用(Service)
    Kubernetes 实战——配置应用(ConfigMap、Secret)
    Java 集合使用不当,Code Review 被 diss了!
    30 个 ElasticSearch 调优知识点,都给你整理好了!
    关于线程池的面试题
    Java面试必问,ThreadLocal终极篇
    Java代码中,如何监控Mysql的binlog?
  • 原文地址:https://www.cnblogs.com/cfans1993/p/5747064.html
Copyright © 2011-2022 走看看