zoukankan      html  css  js  c++  java
  • POSIX 消息队列 和 系列函数

    一、在前面介绍了system v 消息队列的相关知识,现在来稍微看看posix 消息队列。

    posix消息队列的一个可能实现如下图:


    其实消息队列就是一个可以让进程间交换数据的场所,而两个标准的消息队列最大的不同可能只是api 函数的不同,如system v 的系列函数是msgxxx,而posix 是mq_xxx。posix 消息队列也有一些对消息长度等的限制,man 7 mq_overview:

    simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/msg_max 
    10
    simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/msgsize_max 
    8192
    simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ cat /proc/sys/fs/mqueue/queues_max 
    256

    即一个消息队列最多能有10条消息,每条消息的最大长度为8192字节,一个系统最多能有256个消息队列。

    还有一点是,在Linux上,posix 消息队列是以虚拟文件系统实现的,必须将其挂载到某个目录才能看见,如


               # mkdir /dev/mqueue
               # mount -t mqueue none /dev/mqueue

    通过cat 命令查看消息队列的状态,假设mymq 是创建的一条消息队列的名字
               $ cat /dev/mqueue/mymq
               QSIZE:129     NOTIFY:2    SIGNO:0    NOTIFY_PID:8260

    QSIZE:消息队列所有消息的数据长度

    NOTIFY_PID:某个进程使用mq_notify 注册了消息到达异步通知事件,即此进程的pid

    NOTIFY:通知方式: 0 is SIGEV_SIGNAL; 1 is SIGEV_NONE; and 2 is SIGEV_THREAD.

    SIGNO:当以信号方式通知的时候,表示信号的编号.


    二、系列函数,编译时候加上 -lrt 选项,即连接librt 库 (实时库)

          #include <fcntl.h>           /* For O_* constants */
          #include <sys/stat.h>        /* For mode constants */
          #include <mqueue.h>

    功能:用来创建和访问一个消息队列
    原型
    mqd_t mq_open(const char *name, int oflag);
    mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
    参数
    name: 某个消息队列的名字,必须以/打头,并且后续不能有其它/ ,形如/somename长度不能超过NAME_MAX(255)
    oflag:与open函数类似,可以是O_RDONLY、O_WRONLY、O_RDWR,还可以按位或上O_CREAT、O_EXCL、O_NONBLOCK;
    mode:如果oflag指定了O_CREAT,需要设置mode。
    返回值:成功返回消息队列文件描述符;失败返回-1


    功能:关闭消息队列
    原型
    mqd_t mq_close(mqd_t mqdes);
    参数
    mqdes : 消息队列描述符
    返回值:成功返回0;失败返回-1


    功能:删除消息队列
    原型
    mqd_t mq_unlink(const char *name);
    参数
    name: 消息队列的名字
    返回值:成功返回0;失败返回-1


    功能:获取/设置消息队列属性
    原型
    mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);
    mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
    返回值:成功返回0;失败返回-1

               struct mq_attr {
                   long mq_flags;       /* Flags: 0 or O_NONBLOCK */
                   long mq_maxmsg;      /* Max. # of messages on queue */ 
                   long mq_msgsize;     /* Max. message size (bytes) */
                   long mq_curmsgs;     /* # of messages currently in queue */
               };

    mq_flags 是标志;mq_maxmsg 即一个消息队列消息个数上限;mq_msgsize即一条消息的数据上限;mq_curmsgs即当前消息个数


    功能:发送消息
    原型
    mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
    参数
    mqdes:消息队列描述符
    msg_ptr:指向消息的指针
    msg_len:消息长度
    msg_prio:消息优先级
    返回值:成功返回0;失败返回-1


    功能:接收消息
    原型
    ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
    参数
    mqdes:消息队列描述符
    msg_ptr:返回接收到的消息
    msg_len:消息长度,一般需要指定为msgsize_max
    msg_prio:返回接收到的消息优先级
    返回值:成功返回接收到的消息字节数;失败返回-1
    注意:返回指定消息队列中最高优先级的最早消息,优先级最低为0


    功能:建立或者删除消息到达通知事件
    原型
    mqd_t mq_notify(mqd_t mqdes, const struct sigevent *notification);
    参数
    mqdes:消息队列描述符
    notification:
    非空表示当消息到达且消息队列先前为空,那么将得到通知;
    NULL表示撤消已注册的通知
    返回值:成功返回0;失败返回-1
    通知方式:
    产生一个信号
    创建一个线程执行一个指定的函数

    union sigval {          /* Data passed with notification */
    int     sival_int;/* Integer value */
    void   *sival_ptr;/* Pointer value */
    };


    struct sigevent {
    int           sigev_notify; /* Notification method */
    int           sigev_signo; /* Notification signal */
    union sigval sigev_value; /* Data passed with notification */
    void (*sigev_notify_function) (union sigval);
    /* Function for thread notification */
    void *sigev_notify_attributes;
    /* Thread function attributes */
    };

    sigev_notify 的取值有3个:

    SIGEV_NONE:消息到达不会发出通知

    SIGEV_SIGNAL:以信号方式发送通知,当设置此选项时,sigev_signo 设置信号的编号,且只有当信号为实时信号时才可以通过sigev_value顺带数据,参考这里

    SIGEV_THREAD:以线程方式通知,当设置此选项时,sigev_notify_function 即一个函数指针,sigev_notify_attributes 即线程的属性


    mq_notify 函数注意点:

    1、任何时刻只能有一个进程可以被注册为接收某个给定队列的通知
    2、当有一个消息到达某个先前为空的队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用的前提下,通知才会发出。
    3、当通知被发送给它的注册进程时,其注册被撤消。进程必须再次调用mq_notify以重新注册(如果需要的话),重新注册要放在从消息队列读出消息之前而不是之后。


    下面写两个小程序测试一下:

    mq_send.c

     C++ Code 
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
     
    #include<stdio.h>
    #include<stdlib.h>
    #include<sys/ipc.h>
    #include<sys/msg.h>
    #include<sys/types.h>
    #include<unistd.h>
    #include<errno.h>
    #include<mqueue.h>
    #include<fcntl.h>
    #include<sys/stat.h>
    #include<string.h>

    #define ERR_EXIT(m) 
        do { 
            perror(m); 
            exit(EXIT_FAILURE); 
        } while(0)

    typedef struct stu
    {
        char name[32];
        int age;
    } STU;

    int main(int argc, char *argv[])
    {
        if (argc != 2)
        {
            fprintf(stderr, "Usage: %s <prio> ", argv[0]);
            exit(EXIT_FAILURE);
        }
        mqd_t mqid;
        mqid = mq_open("/abc", O_WRONLY);
        if (mqid == (mqd_t) - 1)
            ERR_EXIT("mq_open");
        printf("mq_open succ ");

        STU stu;
        strcpy(stu.name, "test");
        stu.age = 20;
        unsigned int prio = atoi(argv[1]);
        mq_send(mqid, (const char *)&stu, sizeof(stu), prio);
        mq_close(mqid);

        return 0;
    }

    mq_notify.c

     C++ Code 
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
     
    #include<stdio.h>
    #include<stdlib.h>
    #include<sys/ipc.h>
    #include<sys/msg.h>
    #include<sys/types.h>
    #include<unistd.h>
    #include<errno.h>
    #include<mqueue.h>
    #include<fcntl.h>
    #include<sys/stat.h>
    #include<string.h>
    #include<signal.h>

    #define ERR_EXIT(m) 
        do { 
            perror(m); 
            exit(EXIT_FAILURE); 
        } while(0)

    typedef struct stu
    {
        char name[32];
        int age;
    } STU;

    size_t size;

    mqd_t mqid;

    struct sigevent sigev;

    void handle(int sig)
    {
        mq_notify(mqid, &sigev);
        STU stu;
        unsigned int prio;
        mq_receive(mqid, (char *)&stu, size, &prio);
        printf("name=%s, age=%d, prio=%d ", stu.name, stu.age, prio);

    }


    int main(int argc, char *argv[])
    {
        mqid = mq_open("/abc", O_RDONLY);
        if (mqid == (mqd_t) - 1)
            ERR_EXIT("mq_open");
        printf("mq_open succ ");

        struct mq_attr attr;
        mq_getattr(mqid, &attr);
        size = attr.mq_msgsize;

        signal(SIGUSR1, handle);

        sigev.sigev_notify = SIGEV_SIGNAL;
        sigev.sigev_signo = SIGUSR1;

        mq_notify(mqid, &sigev);

        for (; ;)
            pause();

        mq_close(mqid);

        return 0;
    }


    mq_open.c

     C++ Code 
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
     
    #include<stdio.h>
    #include<stdlib.h>
    #include<sys/ipc.h>
    #include<sys/msg.h>
    #include<sys/types.h>
    #include<unistd.h>
    #include<errno.h>
    #include<mqueue.h>
    #include<fcntl.h>
    #include<sys/stat.h>

    #define ERR_EXIT(m) 
        do { 
            perror(m); 
            exit(EXIT_FAILURE); 
        } while(0)

    int main(void)
    {
        mqd_t mqid;
        mqid = mq_open("/abc", O_CREAT | O_RDWR, 0666, NULL);
        if (mqid == (mqd_t) - 1)
            ERR_EXIT("mq_open");
        mq_close(mqid);
        return 0;
    }

    先运行./mq_open 用mq_open 创建了一个消息队列并mount 到/dev/mqueue 上,可以查看状态:

    simba@ubuntu:/dev/mqueue$ cat /dev/mqueue/abc 
    QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0  

    接着运行./mq_notify 再查看状态:

    simba@ubuntu:/dev/mqueue$ cat /dev/mqueue/abc 
    QSIZE:0          NOTIFY:0     SIGNO:10    NOTIFY_PID:3572 

    准备通知的信号是10号,即SIGUSR1,等待的进程pid即./mq_notify ,此时./mq_notify 进程是阻塞的,正在等待其他进程往消息队列写入消息,现在运行./mq_send


    simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send 
    Usage: ./mq_send <prio>
    simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send 1
    mq_open succ
    simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_send 1
    mq_open succ
    simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ 

    回头看./mq_notify 的输出:

    simba@ubuntu:~/Documents/code/linux_programming/UNP/posix$ ./mq_notify 
    mq_open succ
    name=test, age=20, prio=1
    name=test, age=20, prio=1

    ...........................................

    即./mq_send 发送的两条消息都接收到了,需要注意的是虽然通知是一次性的,但我们在消息处理函数再次注册了通知,故能多次接收到通知,但通知只发生在消息队列由空到不空的时候,如果先运行./mq_send 先往消息队列发送了n条消息,那么执行./mq_notify 是不会接收到通知的,一直阻塞着。

  • 相关阅读:
    Ubuntu在下面LAMP(Linux+Apache+MySQL+PHP) 开发环境的搭建
    直接插入排序、折半插入排序、Shell排序、冒泡排序,选择排序
    java插入字符串
    bash,bg,bind,break,builtin,caller,compgen, complete,compopt,continue,declare,dirs,disown,enable,eval,exec,expo
    socket用法
    org.jsoup.select.Selector
    达达技术
    CentOS 6.4 文件夹打开方式
    shell加法
    shell统计
  • 原文地址:https://www.cnblogs.com/alantu2018/p/8477008.html
Copyright © 2011-2022 走看看