zoukankan      html  css  js  c++  java
  • POSIX 消息队列

    POSIX消息队列与System V消息队列的主要区别:
    1.对POSIX队列的读总数返回最高优先级到最早消息,对SV队列到读则可以返回任意指定优先级的消息
    2.当往一个空队列放置一个消息时,POSIX允许产生一个信号或启动一个线程,System V不提供此机制

    消息的属性:
    1.一个无符号整数的优先级(POSIX)或一个长整数的类型(SV)
    2.消息的数据部分长度(可以为0)
    3.数据本身(如果长度大于0)

    POSIX消息队列总结:
    mq_open创建一个新队列或者打开一个已经存在的队列
    mq_close关闭队列
    mq_unlink删除队列名,删除队列
    mq_send往队列放置消息
    mq_receive从一个队列中读出消息
    mq_setattr和mq_getattr查询和设置队列的属性
    mq_notify允许注册一个信号或者线程,在有一个消息被放置到空队列时,发送信号或者激活线程
    每个消息被赋予一个小整数优先级,mq_receive总是返回最高优先级的最早消息

    限制:
    /proc/sys/fs/mqueue/msg_max 10
    /proc/sys/fs/mqueue/msgsize_max 8192
    /proc/sys/fs/mqueue/queues_max 256

    创建一个新的消息队列或者打开一个已经存在的消息队列
    <mqueue.h> 注意:编译加-lrt
    <fcntl.h>
    <sys/stat.h>
    mqd_t mq_open(const char *name, int oflag);
    mqd_t mq_open(const char *name, int oflag, mode_t mode,  struct mq_attr *attr);
    成功返回描述字,失败返回-1并设置errno
    name: 必须为/开头!!!
    oflag: O_RDONLY, O_WRONLY, O_RDWR, O_CREAT, O_EXCL, O_NONBLOCK

    关闭消息队列,但不能删除它
    mqd_t mq_close(mqd_t mqdes);
    成功返回0,失败返回-1

    删除消息队列,不一定马上删除消息队列,但队列名会立即删除
    mqd_t mq_unlink(const char *name);
    成功返回0,失败返回-1
    当某个进程还没有关闭此消息队列时,调用mq_unlink时,不会马上删除队列,当最后一个进程关闭队列时,该队列被删除
    int flags;
    mqd_t mqd;
    flags = O_RDWR | O_CREAT | O_EXCL;
    mqd = mq_open("/tmp.111", flags, 0644, NULL);
    if (mqd == (mqd_t)-1) {
    perror("mq_open");
    return 1;
    }

    消息队列的属性
    mq_getattr mq_setattr
    mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
    成功返回0,失败返回-1
    struct mq_attr {
    long mq_flags;       /* Flags: 0 or O_NONBLOCK */
    long mq_maxmsg;      /* Max. # of messages on queue */
    long mq_msgsize;     /* Max. message size (bytes) */
    long mq_curmsgs;     /* # of messages currently in queue */
    };
    mq_setattr只能修改mq_flags属性,maxmsg和msgsize在mq_open时设置
    mqd_t mqd;
    struct mq_attr attr;
    mqd = mq_open(argv[1], O_RDONLY);
    mq_getattr(mqd, &attr);
    printf("maxmsg=%ld, msgsize=%ld, curmsgs=%ld ",
    attr.mq_maxmsg, attr.mq_msgsize, attr.mq_curmsgs);
    mq_close(mqd);

    收发消息
    mq_send mq_receive
    mq_receive返回队列中最高优先级的最早消息,而且该优先级能随该消息的内容及其长度一起返回

    ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
    成功返回消息的长度,消息的实际长度,不包括消息头;失败返回-1
    msg_len指示msg_ptr的长度,必须大于等于mq_msgsize
    如果msg_prio不为NULL,函数返回消息的优先级
    如果队列为空,调用将阻塞,如果队列设置0_NONBLOCK,调用立即返回EAGAIN

    // 向队列加入一条消息
    mqd_t mqd;
    char *msg;
    size_t len;
    unsigned int prio;
    len = 100;
    prio = 5;
    mqd = mq_open("/abc.123", O_WRONLY);
    msg = (char *)malloc(len);
    memset(msg, 0, len);
    mq_send(mqd, msg, len, prio);

    // 从队列读入一条消息
    mqd_t mqd;
    char *msg;
    size_t len;
    int n;
    unsigned int prio;
    struct mq_attr attr;
    mqd = mq_open("/abc.123", O_RDONLY);
    mq_getattr(mqd, &attr);
    len = attr.mq_msgsize;
    msg = (char *)malloc(len);
    memset(msg, 0, len);
    n = mq_receive(mqd, msg, len, &prio);
    printf("read %ld bytes, priority=%u ", (long)n, prio);

    队列限制
    long int open_max = sysconf(_SC_MQ_OPEN_MAX);  // -1
    long int prio_max = sysconf(_SC_MQ_PRIO_MAX);  // 32768

    消息通告
    当往空队列放置了一个消息时,通知进程
    通告方式有2种:
    1. 产生一个信号
    2. 创建一个线程执行一个指定的函数
    mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
    成功返回0;失败返回-1
    给队列建立或者删除异步事件通知
    1.如果notification非空,那么当前进程希望在有一个消息到达而且队列先前为空时得到通知,该进程被注册为接收该队列的通知
    2.如果notification为空,而且当前进程目前被注册为接收该队列的通知,那么现有注册将被撤销
    3.任意时刻只有一个进程可以被注册为接收队列的通知
    4.当有一个消息到达一个空队列,而且已经有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发送。即在mq_receive调用中的阻塞比任何通知的注册都优先
    5.当该通知已经发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册
    6.当调用mq_notify但是队列不为空时,通知不会发送;当队列变为空,并且有一个消息入队时,才发送通知

    union sigval {                /* Data passed with notification */
    int     sival_int;        /* Integer value */
    void   *sival_ptr;        /* Pointer value */
    };

    struct sigevent {
    int    sigev_notify;      /* Notification method */
    int    sigev_signo;       /* Notification signal */
    union sigval sigev_value; /* Data passed with notification */
    void (*sigev_notify_function) (union sigval);
    /* Function for thread notification */
    void  *sigev_notify_attributes;
    /* Thread function attributes */
    };
    sigev_notify:SIGEV_NONE,SIGEV_SIGNAL,SIGEV_THREAD

    // 使用非阻塞mq_receive的信号通知
    volatile sig_atomic_t mqflag;
    static void sig_usr1(int);
    int main(int argc, char *argv[])
    {
    mqd_t mqd;
    void *buf;
    ssize_t n;
    sigset_t zeromask, newmask, oldmask;
    struct mq_attr attr;
    struct sigevent sigev;

    assert(argc == 2);
    mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
    mq_getattr(mqd, &attr);
    buf = malloc(attr.mq_msgsize);
    sigemptyset(&zeromask);
    sigemptyset(&newmask);
    sigemptyset(&oldmask);
    sigaddset(&newmask, SIGUSR1);
    signal(SIGUSR1, sig_usr1);
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);

    for ( ; ; ) {
    sigprocmask(SIG_BLOCK, &newmask, &oldmask);
    while (mqflag == 0)
    sigsuspend(&zeromask);
    mqflag = 0;
    mq_notify(mqd, &sigev);
    while ((n = mq_receive(mqd, buf, attr.mq_msgsize, NULL)) >= 0) {
    printf("read %ld bytes ", (long)n);
    }
    if (errno != EAGAIN)
    die("mq_receive");
    sigprocmask(SIG_UNBLOCK, &newmask, NULL);
    }

    return 0;
    }
    static void sig_usr1(int signo)
    {
    mqflag = 1;
    return;
    }

    // 使用sigwait代替信号处理程序的信号通知
    #include <signal.h>
    int sigwait(const sigset_t *set, int *sig);
    成功返回0,并设置sig为收到的信号;失败返回错误码

    int main(...)
    {
    ...
    sigemptyset(&newmask);
    sigaddset(&newmask, SIGUSR1);
    sigprocmask(SIGBLOCK, &newmask, NULL);

    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);
    for ( ; ; ) {
    sigwait(&newmask, &signo);
    if (signo == SIGUSR1) {
    mq_notify(mqd, &sigev);
    while ((n = mq_receive(mqd, buf, len, NULL)) >=0) {
    printf("read %ld bytes ", n);

    if (errno != EAGAIN)
    die("mq_receive");
    }
    }
    ...
    }

    // 使用select的POSIX消息队列
    int pfds[2];
    static void sig_usr1(int);
    int main(int argc, char *arg[])
    {
    int fds;
    char c;
    fd_set rfds;
    mqd_t mqd;
    void *buf;
    ssize_t n;
    size_t len;
    struct mq_attr attr;
    struct sigevent sigev;

    asset(argc == 2);
    mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
    mq_getattr(mqd, &attr);
    len = attr.mq_msgsize;
    buf = malloc(len);
    pipe(pfds);
    // 设置信号处理程序,建立通知
    signal(SIGUSR1, sig_usr1);
    sigev.sigev_notify = SIGEV_SIGNAL;
    sigev.sigev_signo = SIGUSR1;
    mq_notify(mqd, &sigev);
    FD_ZERO(&rfds);
    for ( ; ; ) {
    FD_SET(pfds[0], &rfds);
    nfds = select(pfds[0]+1, &rfds, NULL, NULL, NULL);
    if (FD_ISSET(pfds[0], &rfds)) { // 管道可读
    read(pfds[0], &c, 1);
    mq_notify(mqd, &sigev);
    while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
    printf("read %ld bytes ", (long)n);
    }
    if (errno != EAGAIN)
    die("mq_receive");
    }
    }
    return 0;
    }

    static void sig_usr1(int signo)
    {
    write(pfds[1], "", 1); // 异步信号处理安全的函数
    return;
    }

    // 收到通知后,启动一个线程,接收消息,然后结束进程
    #include <pthread.h>
    #include <mqueue.h>
    #include <assert.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #define die(msg) { perror(msg); exit(EXIT_FAILURE); }
    static void tfunc(union sigval sv)   /* Thread start function */
    {
    struct mq_attr attr;
    ssize_t nr;
    void *buf;
    mqd_t mqdes = *((mqd_t *) sv.sival_ptr);
    /* Determine max. msg size; allocate buffer to receive msg */
    if (mq_getattr(mqdes, &attr) == -1) die("mq_getattr");
    buf = malloc(attr.mq_msgsize);
    if (buf == NULL) die("malloc");
    nr = mq_receive(mqdes, buf, attr.mq_msgsize, NULL);
    if (nr == -1) die("mq_receive");
    printf("Read %ld bytes from MQ0 ", (long) nr);
    free(buf);
    exit(EXIT_SUCCESS);         /* Terminate the process */
    }

    int main(int argc, char *argv[])
    {
    mqd_t mqdes;
    struct sigevent not;
    assert(argc == 2);
    mqdes = mq_open(argv[1], O_RDONLY);
    if (mqdes == (mqd_t) -1) die("mq_open");
    not.sigev_notify = SIGEV_THREAD;
    not.sigev_notify_function = tfunc;
    not.sigev_notify_attributes = NULL;
    not.sigev_value.sival_ptr = &mqdes;   /* Arg. to thread func. */
    if (mq_notify(mqdes, &not) == -1) die("mq_notify");
    pause();    /* Process will be terminated by thread function */
    return 0;
    }          

    // 启动一个新线程
    mqd_t mqd;
    struct mq_attr attr;
    struct sigevent sigev;
    static void notify_thread(union sigval);
    int main(int argc, char *argv[])
    {
    assert(argc == 2);
    mqd = mq_open(argv[1], O_RDONLY|O_NONBLOCK);
    mq_getattr(mqd, &attr);
    sigev.sigev_notify = SIGEV_THREAD;
    sigev.sigev_value.sival_ptr = NULL;
    sigev.sigev_notify_function = notify_thread;
    sigev.sigev_notify_attributes = NULL;
    mq_notify(mqd, &sigev);
    for ( ; ; )
    pause();
    return 0;
    }
    static void notify_thread(union sigval arg)
    {
    ssize_t n;
    size_t len;
    void *buf;
    len = attr.mq_msgsize;
    printf("notify_thread started ");
    buf = malloc(len);
    mq_notify(mqd, &sigev);
    while ((n = mq_receive(mqd, buf, len, NULL)) >= 0) {
    printf("read %ld bytes ", (long)n);
    }
    if (errno != EAGAIN)
    die("mq_receive");
    free(buf);
    pthread_exit(NULL);
    }

    POSIX实时信号
    unix信号分为两大组:
    实时信号:SIGRTMIN--SIGRTMAX
    其他信号:SIGINT, SIGQUIT, SIGKILL, ...

    信号的实时行为取决于SA_SIGINFO
    实时行为包含以下特征:
    1.信号是排队的,即如果一个信号产生了3次,它就递交3次。以FIFO的顺序排队
    2.当有多种SIGRTMIN到SIGRTMAX范围内的解阻塞信号排队时,值较小的信号先于值较大的信号递交(注意:linux与此相反)
    3.当某个非实时信号递交时,传递给它的信号处理的唯一参数是该信号的值,实时信号比其他信号传递更多的信息
    4.有些新函数使用实时信号工作,如sigqueue用来代替kill

    // 查看实时信号的递交顺序
    static void sig_rt(int, siginfo_t *, void *);
    int main(void)
    {
    int i, j;
    pid_t pid;
    sigset_t newset;
    union sigval val;
    printf("SIGRTMIN=%d, SIGRTMAX=%d ", (int)SIGRTMIN, (int)SIGRTMAX);
    pid = fork();
    if (pid < 0) die("fork");
    else if (pid == 0) {
    /* 阻塞3个实时信号 */
    sigemptyset(&newset);
    sigaddset(&newset, SIGRTMIN);
    sigaddset(&newset, SIGRTMIN+1);
    sigaddset(&newset, SIGRTMIN+2);
    sigprocmask(SIG_BLOCK, &newset, NULL);
    signal_rt(SIGRTMIN, sig_rt);
    signal_rt(SIGRTMIN+1, sig_rt);
    signal_rt(SIGRTMIN+2, sig_rt);
    sleep(6);
    sigprocmask(SIG_UNBLOCK, &newset, NULL);
    sleep(3);
    exit(0);
    }
    else {
    sleep(3);
    for (i=SIGRTMIN; i<=SIGRTMIN+2; i++) {
    for (j=0; j<=2; j++) {
    val.sival_int = j;
    sigqueue(pid, i, val);
    printf("send signal signo=%d, val=%d ", i, j);
    }
    }
    exit(0);
    }
    }

    static void sig_rt(int signo, siginfo_t *info, void *context)
    {
    printf("receive signal signo=%d, code=%d, ival=%d ",
    signo, info->si_code, info->si_value.sival_int);
    }
    typedef void sigfunc_rt(int, siginfo_t *, void *);
    sigfunc_rt *signal_rt(int signo, sigfunc_rt *func)
    {
    struct sigaction act, oact;
    act.sa_sigaction = func;
    sigemptyset(&act.sa_mask);
    act.sa_flags = SA_SIGINFO; /* 实时信号必须指定 */
    if (signo == SIGALRM) {
    #ifdef    SA_INTERRUPT
    act.sa_flags |= SA_INTERRUPT;
    #endif        
    }
    else {
    #ifdef    SA_RESTART
    act.sa_flags |= SA_RESTART;
    #endif
    }
    if (sigaction(signo, &act, &oact) < 0)
    return (sigfunc_rt *)SIG_ERR;
    else
    return oact.sa_sigaction;
    }
    输出如下:
    [root@jiangkun unp]# ./rtsig 
    SIGRTMIN=34, SIGRTMAX=64
    send signal signo=34, val=0
    send signal signo=34, val=1
    send signal signo=34, val=2
    send signal signo=35, val=0
    send signal signo=35, val=1
    send signal signo=35, val=2
    send signal signo=36, val=0
    send signal signo=36, val=1
    send signal signo=36, val=2
    receive signal signo=36, code=-1, ival=0
    receive signal signo=36, code=-1, ival=1
    receive signal signo=36, code=-1, ival=2
    receive signal signo=35, code=-1, ival=0
    receive signal signo=35, code=-1, ival=1
    receive signal signo=35, code=-1, ival=2
    receive signal signo=34, code=-1, ival=0
    receive signal signo=34, code=-1, ival=1
    receive signal signo=34, code=-1, ival=2

    struct sigaction {
    void (*sa_handler)(int);
    void (*sa_sigaction)(int, siginfo_t *, void *);
    sigset_t sa_mask;
    int sa_flags;
    void (*sa_restorer)(void); /* 被遗弃了! */
    };

    实时信号之所以是可靠的,因为在进程阻塞该信号的时间内,发给该进程的所有实时信号会排队,而非实时信号则会合并为一个信号。早期的kill函数只能向特 定的进程发送一个特定的信号,并且早期的信号处理函数也不能接受附加数据。siqueue和sigaction解决了这个问题。
    下面这个例子中,进程先屏蔽SIGINT和SIGRTMIN两个信号,其中SIGINT是非实时信号,而SIGRTMIN为实时信号,接着进程睡眠,睡眠完成之后再接触对这两个信号的屏蔽,此时可以比较对两种信号的处理方式是否一样。
    #include <stdio.h>
    #include <string.h>
    #include <signal.h>
    #include <unistd.h>
    void sig_handler(int, siginfo_t*, void*);
    int main(int argc,char *argv[])
    {
    struct sigaction act;
    sigset_t newmask, oldmask;
    int rc;    
    sigemptyset(&newmask);
    /* 往信号集中添加一个非实时信号 */
    sigaddset(&newmask, SIGINT);
    /* 往信号集中添加一个实时信号 */
    sigaddset(&newmask, SIGRTMIN);
    /* 屏蔽实时信号SIGRTMIN */
    sigprocmask(SIG_BLOCK, &newmask, &oldmask);
    act.sa_sigaction = sig_handler;
    act.sa_flags = SA_SIGINFO;
    if(sigaction(SIGINT, &act, NULL) < 0) {
    printf("install signal error ");
    }
    if(sigaction(SIGRTMIN, &act, NULL) < 0) {
    printf("install signal error ");
    }
    printf("pid = %d ", getpid());
    /* 进程睡眠,在此时间内的发给该进程的所有实时信号 将排队,不会有信号丢失 */
    sleep(20);    
    /* 解除对SIGRTMIN信号的屏蔽,信号处理函数将会被调用 */
    sigprocmask(SIG_SETMASK, &oldmask, NULL);
    return 0;
    }
    void sig_handler(int signo, siginfo_t *info, void *context)
    {
    if(signo == SIGINT)
    printf("Got a common signal ");
    else
    printf("Got a real time signal ");
    }

    将程序编译好之后,再开一个终端用于发送实时信号。
    # ./sigqueue_receive 
    pid = 8871
    进程开始睡眠……
    在新的终端输入:
    ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
    ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
    ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
    ecy@ecy-geek:~/pthreads$ kill -SIGRTMIN 8871
    连续发送四个SIGRTMIN,接着回到之前的终端,连续四次按下"ctrl+c"。
    ^C^C^C^C
    最后进程终于醒来,整个输出如下:
    pid = 8871
    ^C^C^C^CGot a real time signal
    Got a real time signal
    Got a real time signal
    Got a real time signal
    Got a common signal
    果然接受到四个实时信号,并且四次调用了信号处理函数,而对于SIGINT,虽然也按下了四次"ctrl+c",但是进程对其只做一次处理。这个例子中是先发实时信号后发非实时信号,所以信号处理函数先处理实
    时信号,如果只是按照顺序注册信号的话,这很好理解,但是换一下,先按下了四次"ctrl+c"然后使用kill发四次实时信号,结果发现输出的结果仍然 一样,这说明实时信号的优先级比非实时信号要高,内核每个进程的信号组成一个双向链表,实时信号插入的时候就不是随便插在尾部了。
    信号的优先级:信号实质上是软中断,中断有优先级,信号也有优先级。如果一个进程有多个未决信号,则对于同一个未决的实时信号,内核将按照发送的顺序来递 交信号。如果存在多个未决的实时信号,则值(或者说编号)越大的越先被递送。如果既存在不可靠信号,又存在可靠信号(实时信号),虽然POSIX对这一情 况没有明确规定,但Linux系统和大多数遵循POSIX标准的操作系统一样,将优先递交可靠信号。一个进程如果处理 SIGQUIT(3),SIGINT(2),SIGHUP(1)(通过"kill -l" 可以查看信号的编号),那么先后给该进程发送SIGINT,SIGHUP,SIGQUIT,处理的顺序会是SIGQUIT,SIGINT,SIGHUP, 不论改变这个三个信号的发送顺序,处理的顺序都是一样的。

  • 相关阅读:
    Python元类
    Python魔术方法
    Python反射
    Failed to enable constraints. One or more rows contain values violating non-null, unique, or foreign-key constraints.
    游标使用的简单示例
    C# 指定物理目录下载文件,Response.End导致“正在中止线程”异常的问题
    “一键制作启动u盘失败”的主要原因是什么?
    IE11 不能正常方法网页
    Notepad++的右键菜单
    [datatable]排序时指定某列不可排序
  • 原文地址:https://www.cnblogs.com/lidabo/p/4330525.html
Copyright © 2011-2022 走看看