zoukankan      html  css  js  c++  java
  • 《Linux/UNIX系统编程手册》第52章 POSIX消息队列

    关键词:O_CREAT/O_EXCL、O_NONBLOCK、mq_maxmsg、mq_msgsize、SIGEV_SIGNAL、SIGEV_THREAD等等。

    POSIX消息队列允许进程之间以消息的形式交换数据。POSIX消息队列和System V消息队列相似之处在于数据的交换单位都是整个消息。

    差别在于:

    POSIX消息队列时引用计数的。只有当所有当前使用队列的进程都关闭了队列之后才会对队列进行标记以便删除。

    每个System V消息都有一个整数类型,并且通过msgrcv()可以以各种方式类型选择消息。POSIX消息有一个关联的优先级,并且消息之间是严格按照优先级顺序排队的。

    POSIX消息队列提供了一个特性允许在队列中的一条消息可用于异步地通知进程。

    1. 概述

    POSIX消息队列API主要函数如下:

    mq_open():创建一个新消息队列或打开一个既有队列,返回后继调用中会用到的消息队列描述符。

    mq_send():向对列写入一条信息。

    mq_receive():从队列中读取一条信息。

    mq_close():关闭进程之前打开的一个消息队列。

    mq_unlink():删除一个消息队列名并当所有进程关闭该队列时对队列进行标记以便删除。

    mq_getatrr()/mq_setattr():每个消息队列都有一组关联的特性,可以在mq_uoen()创建或者打开队列时进行设置。获取和修改队列特性工作还可以由此两特性完成。

    mq_notify():允许一个进程向一个队列注册接收消息通知。注册完成后,当一条消息可用时会通过发送一个信号或在一个单独的线程中调用一个函数来通知进程。

    2. 打开、关闭和断开链接消息队列

    mq_open()函数创建一个消息队列或打开一个既有队列。

    #include <fcntl.h> /* Defines O_* constants */
    #include <sys/stat.h> /* Defines mode constants */
    #include <mqueue.h>
    mqd_t mq_open(const char *name, int oflag);
    mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr);
      Returns a message queue descriptor on success, or (mqd_t) –1 on error

    name:标识了消息队列,必须使用斜线打头后面跟着一个或多个非斜线字符的名字。Linux和其它一些实现允许采用这种可移植的命名方式来给IPC对象命名。

    oflag:是一个位掩码,具体如下:

    oflags其中一个用途是,确定是打开一个既有队列还是创建和打开一个新队列。

    • 如果oflags中不包含O_CREAT,那么将会打开一个既有队列;如果队列不存在,则返回错误。
    • 如果oflags中包含了O_CREAT,并且给定的name队列不存在,则创建一个新的空队列;如果队列已经存在,则打开此既有队列。
    • 如果oflags中包含了O_CREAT和O_EXCL,并且给定name的队列已经存在,那么就会失败;如果不存在则创建一个新的空队列。

        oflags的O_RDONLY、O_WRONLY、O_RDWR表明调用进程在消息队列上的访问方式。

        oflags标记O_NONBLOCK将会导致以非阻塞式模式打开队列。如果后续的mq_receive()和mq_send()无法在不阻塞情况下执行,那么调用就会立即返回EAGAIN错误。

    mode:是一个位掩码,指定了施加于新消息队列之上的权限。并且与open()一样,mode中的值会与进程的umask取掩码。

    attr:是一个mq_attr结构,制定了新消息队列的特性。如果attr为NULL,那么将使用系统定义的默认特性创建队列。

    fork()、exec()以及进程终止对消息队列描述符的影响

    在fork()中子进程会接收其父进程的消息队列描述符的副本,并且这些描述符会引用同样的打开着的消息队列描述。

    子进程不会继承其父进程的任何消息通知注册。

    当一个进程执行了一个exec()或终止时,所有其打开的消息队列描述符会被关闭。关闭消息队列描述符的结果是进程在相应队列上的消息通知会被注销。

    mq_close()函数关闭消息队列描述符mqdes。

    #include <mqueue.h>
    int mq_close(mqd_t mqdes);
      Returns 0 on success, or –1 on error

    如果调用进程已经通过mqdes在队列上注册了消息通知,那么通知注册会自动被删除,并且另一个进程可以随后向该队列注册消息通知。

    当进程终止或调用exec()时,消息队列描述符会被自动关闭。与文件描述符一阳,应用程序应该在不再使用消息队列描述符的时候显式地关闭消息队列描述符以防止出现进程耗尽消息队列描述符的情况。

    mq_unlink()函数删除通过name标识的消息队列,并将队列标记为在所有进程使用完该队列之后销毁该队列。

    #include <mqueue.h>
    int mq_unlink(const char *name);
        Returns 0 on success, or –1 on error

     mq_close()并不会删除消息队列,mq_unlink()才会删除消息队列。 

    3. 描述符和消息队列之间的关系

    消息队列描述符和打开着的消息队列之间的关系文件描述符和打开着的文件之间的关系类似

    消息队列描述符是一个进程级别的句柄,它引用了系统层面的打开着的消息队列描述表中的一个条目,而该条目则引用了一个消息队列对象。

    • 一个打开的消息队列描述符拥有一组关联的标记,即NONBLOCK,它确定了I/O是否是非阻塞的。
    • 两个进程能够持有引用同一个打开的消息队列描述的消息队列描述符(进程A中的x和进程B中的x)。当一个进程在打开了一个消息队列之后调用fork()时就会发生这种情况。这些描述符会共享O_NONBLOCK标记的状态。
    • 两个进程能够持有引用不同消息队列描述的打开的消息队列描述(进程A中的z和进程B中的y都引用了/mq-r)。当两个进程分别使用mq_open()打开同一个队列时就会发生这种情况。

    4. 消息队列特性

     mq_open()、mq_getattr()以及mq_setattr()都可以设置或者读取消息队列特性,通过指向mq_attr结构的指针实现。

    struct mq_attr {
      long mq_flags;   --消息队列flag[mq_getattr(), mq_setattr()]
      long mq_maxmsg;  --定义了使用mq_send()向消息队列上添加消息数量上限,必须大于0。mq_open()时确定,后面无法修改。[mq_open(), mq_getattr()]
      long mq_msgsize; --定义了加入详细队列的每条消息的大小上限,必须大于0。mq_open()时确定,后面无法修改。[mq_open(), mq_getattr()]
      long mq_curmsgs; --当前消息队列中消息数目[mq_getattr()]
    };

    获取消息队列特性

    mq_getattr()函数返回一个包含与描述符mqdes相关联的消息队列的相关信息mq_attr结构。

    #include <mqueue.h>
    int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
      Returns 0 on success, or –1 on error

    mq_flags取值只有一个O_NONBLOCK,这个标记是mq_open()的oflag参数来初始化的,并且使用mq_setattr()可以修改这个标记。 

    修改消息队列特性

    mq_setattr()函数设置与mqdes香瓜年的详细队列描述符的特性。

    #include <mqueue.h>
    int mq_setattr(mqd_t mqdes, const struct mq_attr *newattr, struct mq_attr *oldattr);
      Returns 0 on success, or –1 on error

    使用 newaddr中ma_flags字段来修改mqdes相关联消息队列描述的标记。

    如果oldattr不为NULL,那么就返回一个之前消息特性的mq_attr结构。

    SUSv3规定使用mq_setattr()能够修改的唯一特性是O_NONBLOCK标记的状态。

    5. 交换信息

    5.1 发送消息

    mq_send()函数将位于msg_ptr指向的缓冲区中的消息添加到描述符mqdes所引用的消息队列中。

    #include <mqueue.h>
    int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio);
      Returns 0 on success, or –1 on error

    msg_len参数指定了msg_ptr指向消息的长度,必须小于等于mq_msgsize。否则返回EMSGSIZE错误。长度为零是允许的。

    msg_prio表示消息的优先级,0表示最低优先级,最大优先级为MQ_PRIO_MAX或者同sysconf(_SC_MQ_PRIO_MAX)返回值来确定。

    一条消息被添加到队列中时,它会被放置在队列中具有相同优先级的所有消息之后。

    如果消息队列满,那么后续mq_send()调回会阻塞直到队列中存在可用空间为止,或者在O_NONBLOCK情况下立即失败并返回EAGAIN错误。

    5.2 接收消息

    mq_receive()函数从mqdes引用的消息队列中删除一条优先级最高、存在时间最长的消息,并将删除的消息放置在msg_ptr指向的缓冲区。

    #include <mqueue.h>
    ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio);
      Returns number of bytes in received message on success, or –1 on error

    msg_len指定msg_ptr指向的换种区中的可用字节数。

    msg_len必须大于或等于队列ms_msgsize,否则mq_receive()就会失败并返回EMSGSIZE错误。

    msg_prio不为NULL,那么接收到消息的优先级就会被复制到msg_prio。

    如果消息队列为空,那么mq_receive()会阻塞直到存在可用的消息,或者在O_NONBLOCK情况下会立即失败并返回EAGAIN。

    5.3 设置发送和接收消息超时时间

    mq_timedsend()和mq_timedreceive()函数与mq_sned()和mq_receive()基于一样,唯一的区别在于:如果操作无法立即执行,并且该消息队列oflag非O_NONBLOCK,那么abs_timeout参数就会为调用阻塞的时间制定一个上限。

    #define _XOPEN_SOURCE 600
    #include <mqueue.h>
    #include <time.h>
    int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned int msg_prio, const struct timespec *abs_timeout);
      Returns 0 on success, or –1 on error
    ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned int *msg_prio, const struct timespec *abs_timeout);
      Returns number of bytes in received message on success, or –1 on error

    abs_time是一个绝对值,如果要指定一个相对超时可以使用clock_gettime()来获取CLOCK_REALTIME的当前值并加上所需的时间量来生成一个恰当初始化过侧timespec结构。

    如果mq_timedsend()或mq_timedreceive()调用因超时而无法完成操作,那么调用就会失败并返回ETIMEOUT错误。

    如果abs_timeout为NULL,表示永远不会超时。

    5.4 消息队列的创建、发送、接收

    #include <mqueue.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    #include "tlpi_hdr.h"
    
    static void
    usageError(const char *progName)
    {
        fprintf(stderr, "Usage: %s [-cx] [-m maxmsg] [-s msgsize] mq-name "
                "[octal-perms]
    ", progName);
        fprintf(stderr, "    -c          Create queue (O_CREAT)
    ");
        fprintf(stderr, "    -m maxmsg   Set maximum # of messages
    ");
        fprintf(stderr, "    -s msgsize  Set maximum message size
    ");
        fprintf(stderr, "    -x          Create exclusively (O_EXCL)
    ");
        exit(EXIT_FAILURE);
    }
    
    int
    main(int argc, char *argv[])
    {
        int flags, opt;
        mode_t perms;
        mqd_t mqd;
        struct mq_attr attr, *attrp;
    
        /* If 'attrp' is NULL, mq_open() uses default attributes. If an
           option specifying a message queue attribute is supplied on the
           command line, we save the attribute in 'attr' and set 'attrp'
           pointing to 'attr'. We assign some (arbitrary) default values
           to the fields of 'attr' in case the user specifies the value
           for one of the queue attributes, but not the other. */
    
        attrp = NULL;
        attr.mq_maxmsg = 10;
        attr.mq_msgsize = 2048;
        flags = O_RDWR;
    
        /* Parse command-line options */
    
        while ((opt = getopt(argc, argv, "cm:s:x")) != -1) {
            switch (opt) {
            case 'c':
                flags |= O_CREAT;
                break;
    
            case 'm':
                attr.mq_maxmsg = atoi(optarg);
                attrp = &attr;
                break;
    
            case 's':
                attr.mq_msgsize = atoi(optarg);
                attrp = &attr;
                break;
    
            case 'x':
                flags |= O_EXCL;
                break;
    
            default:
                usageError(argv[0]);
            }
        }
    
        if (optind >= argc)
            usageError(argv[0]);
    
        perms = (argc <= optind + 1) ? (S_IRUSR | S_IWUSR) :
                    getInt(argv[optind + 1], GN_BASE_8, "octal-perms");
    
        mqd = mq_open(argv[optind], flags, perms, attrp);
        if (mqd == (mqd_t) -1)
            errExit("mq_open");
    
        exit(EXIT_SUCCESS);
    }

     

    #include <mqueue.h>
    #include <fcntl.h>              /* For definition of O_NONBLOCK */
    #include "tlpi_hdr.h"
    
    static void
    usageError(const char *progName)
    {
        fprintf(stderr, "Usage: %s [-n] mq-name
    ", progName);
        fprintf(stderr, "    -n           Use O_NONBLOCK flag
    ");
        exit(EXIT_FAILURE);
    }
    
    int
    main(int argc, char *argv[])
    {
        int flags, opt;
        mqd_t mqd;
        unsigned int prio;
        void *buffer;
        struct mq_attr attr;
        ssize_t numRead;
    
        flags = O_RDONLY;
        while ((opt = getopt(argc, argv, "n")) != -1) {
            switch (opt) {
            case 'n':   flags |= O_NONBLOCK;        break;
            default:    usageError(argv[0]);
            }
        }
    
        if (optind >= argc)
            usageError(argv[0]);
    
        mqd = mq_open(argv[optind], flags);
        if (mqd == (mqd_t) -1)
            errExit("mq_open");
    
        /* We need to know the 'mq_msgsize' attribute of the queue in
           order to determine the size of the buffer for mq_receive() */
    
        if (mq_getattr(mqd, &attr) == -1)
            errExit("mq_getattr");
    
        buffer = malloc(attr.mq_msgsize);
        if (buffer == NULL)
            errExit("malloc");
    
        numRead = mq_receive(mqd, buffer, attr.mq_msgsize, &prio);
        if (numRead == -1)
            errExit("mq_receive");
    
        printf("Read %ld bytes; priority = %u
    ", (long) numRead, prio);
        if (write(STDOUT_FILENO, buffer, numRead) == -1)
            errExit("write");
        write(STDOUT_FILENO, "
    ", 1);
    
        exit(EXIT_SUCCESS);
    }
    #include <mqueue.h>
    #include <fcntl.h>              /* For definition of O_NONBLOCK */
    #include "tlpi_hdr.h"
    
    static void
    usageError(const char *progName)
    {
        fprintf(stderr, "Usage: %s [-n] mq-name msg [prio]
    ", progName);
        fprintf(stderr, "    -n           Use O_NONBLOCK flag
    ");
        exit(EXIT_FAILURE);
    }
    
    int
    main(int argc, char *argv[])
    {
        int flags, opt;
        mqd_t mqd;
        unsigned int prio;
    
        flags = O_WRONLY;
        while ((opt = getopt(argc, argv, "n")) != -1) {
            switch (opt) {
            case 'n':   flags |= O_NONBLOCK;        break;
            default:    usageError(argv[0]);
            }
        }
    
        if (optind + 1 >= argc)
            usageError(argv[0]);
    
        mqd = mq_open(argv[optind], flags);
        if (mqd == (mqd_t) -1)
            errExit("mq_open");
    
        prio = (argc > optind + 2) ? atoi(argv[optind + 2]) : 0;
    
        if (mq_send(mqd, argv[optind + 1], strlen(argv[optind + 1]), prio) == -1)
            errExit("mq_send");
        exit(EXIT_SUCCESS);
    }

    6. 消息通知

    POSIX消息队列区别于System V消息队列的一个特性是POSIX消息队列能够接收之前为空的队列上有可用消息的异步通知,即这个队列从空变成了非控

    这个特性意味着无需执行一个阻塞或者将消息标记为O_NONBLOCK并在队列上定期执行mq_receive()调用。因为一个进程能够请求消息到达后通知,然后继续执行其他任务直到收到通知为止。

    进程可以选择通过信号的形式或创建一个单独的线程处理函数的形式来接收通知。

    • 任一时刻只有一个进程能够向一个特定的消息队列注册接收通知。如果消息队列上已经存在注册进程,那么后续在该队列上的注册请求将会失败,mq_notify()返回EBUSY错误。
    • 只有当一条新消息进入之前为空的队列时注册进程才会收到通知。如果在注册的时候队列中已经包含消息,那么之后当队列被清空之后有一条新消息到达之时才会发出通知。
    • 向注册进程发送了一个通知之后就会删除注册信息,之后任何进程就能够向队列注册接收通知了。一个进程要想持续地接收通知,那么他就必须要在每次接收到通知之后再次调用mq_notify()来注册自己
    • 注册进程只有在当前不存在其他在该队列上调用mq_receive()而发生阻塞的进程时才会收到通知。如果其他进程在mq_receive()调用中被阻塞了,那么该进程会读取消息,注册进程会保持注册状态。
    • 一个进程可以通过在调用mq_notify()时传入一个值为NULL的notification参数来撤销自己在消息通知上的注册信息。

    mq_notify()函数注册调用进程在一条消息进入描述符mqdes引用的空队列时接收通知

    #include <mqueue.h>
    int mq_notify(mqd_t mqdes, const struct sigevent *notification);
        Returns 0 on success, or –1 on error

    notification参数指定了进程接收通知的机制。

    union sigval {
        int sival_int; /* Integer value for accompanying data */
        void *sival_ptr; /* Pointer value for accompanying data */
    };
    
    struct sigevent {
        int sigev_notify; /* Notification method */
        int sigev_signo; /* Notification signal for SIGEV_SIGNAL */
        union sigval sigev_value; /* Value passed to signal handler or thread function */
        void (*sigev_notify_function) (union sigval); /* Thread notification function */
        void *sigev_notify_attributes; /* Really 'pthread_attr_t' */
    };

    其中sigev_notify字段被设置成下列值中的一个:

    SIGEV_NONE:注册这个进程接收通知,但当一条消息进入之前为空的队列时不通知改进程。与往常一样,当新消息进入空队列之后注册消息会被删除。

    SIGEV_SIGNAL:通过生成一个sigev_signo指定的信号来通知进程。如果sigev_signo是一个实时信号,那么sigev_value字段将会指定信号都带的数据。通过传入信号处理器的siginfo_t中的si_value或通过sigwaitinfo()或sigtimedwait()返回值都能够去的这部分数据。

    typedef struct {
      int si_signo; /* Signal number */
      int si_code; /* Signal code */----------对应应该为SI_MESGQ
      int si_trapno; /* Trap number for hardware-generated signal (unused on most architectures) */
      union sigval si_value; /* Accompanying data from sigqueue() */
      pid_t si_pid; /* Process ID of sending process */
      uid_t si_uid; /* Real user ID of sender */
    ...
    } siginfo_t;

    SIGEV_THREAD:通过调用在sigev_notify_function中指定的函数来通知进程,就像是在一个新线程中启动该函数一样。sigev_notify_attreibutes字段你可以为NULL或是一个指向定义了线程特性的pthread_attr_t指针。sigev_value中指定的sigval值将作为参数传入这个参数。

    6.1 通过信号接收通知

    通过sigaction()注册信号处理函数,通过mq_notify()在消息队列上注册通知,当空队列有新消息到达后,会产生一个信号。

    系统会执行信号处理函数,然后循环读取队列知道清空;在for()中循环注册通知,然后读取消息。

    #include <signal.h>
    #include <mqueue.h>
    #include <fcntl.h>              /* For definition of O_NONBLOCK */
    #include "tlpi_hdr.h"
    
    #define NOTIFY_SIG SIGUSR1
    
    static void handler(int sig)
    {
        /* Just interrupt sigsuspend() */
        printf("New message coming.
    ");
    }
    
    int main(int argc, char *argv[])
    {
        struct sigevent sev;
        mqd_t mqd;
        struct mq_attr attr;
        void *buffer;
        ssize_t numRead;
        sigset_t blockMask, emptyMask;
        struct sigaction sa;
    
        if (argc != 2 || strcmp(argv[1], "--help") == 0)
            usageErr("%s mq-name
    ", argv[0]);
    
        mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);------------以O_NONBLOCK的方式打开消息队列。
        if (mqd == (mqd_t) -1)
            errExit("mq_open");
    
        /* Determine mq_msgsize for message queue, and allocate an input buffer
           of that size */
    
        if (mq_getattr(mqd, &attr) == -1)--------从消息队列获取mq_msgsize特性值。
            errExit("mq_getattr");
    
        buffer = malloc(attr.mq_msgsize);--------根据mq_msgsize的值申请接收消息缓冲区。
        if (buffer == NULL)
            errExit("malloc");
    
        /* Block the notification signal and establish a handler for it */
    
        sigemptyset(&blockMask);
        sigaddset(&blockMask, NOTIFY_SIG);
        if (sigprocmask(SIG_BLOCK, &blockMask, NULL) == -1)------阻塞SIGUSR1通知信号。
            errExit("sigprocmask");
    
        sigemptyset(&sa.sa_mask);
        sa.sa_flags = 0;
        sa.sa_handler = handler;
        if (sigaction(NOTIFY_SIG, &sa, NULL) == -1)--------------给SIGUSR1通知信号建立处理器handler()。
            errExit("sigaction");
    
        /* Register for message notification via a signal */
    
        sev.sigev_notify = SIGEV_SIGNAL;
        sev.sigev_signo = NOTIFY_SIG;
        if (mq_notify(mqd, &sev) == -1)--------------------------调用mq_notify()来注册进程消息通知。
            errExit("mq_notify");
    
        sigemptyset(&emptyMask);
    
        for (;;) {
            sigsuspend(&emptyMask);--------------------在此解除通知信号的阻塞状态,并等待;直到消息队列变为空情况下有新消息到达,产生信号后被捕获。然后执行handler()处理器函数。同时进程的消息通知注册信息会被删除。
                                   --------------------这里使用sigsuspend()而不是pause(),是为了防止出现程序在执行for中其他代码是错误信号的情况。
            if (mq_notify(mqd, &sev) == -1)------------重新注册进程接收消息通知。如果不重新注册,那么将永远不会产生。
                errExit("mq_notify");
                                           ------------使用while循环读取队列中所有消息,可以清空队列能够确保当一条新消息到达之后会产生一个新通知。
            while ((numRead = mq_receive(mqd, buffer, attr.mq_msgsize, NULL)) >= 0)-----从当前消息队列中读取消息。
                printf("Read %ld bytes
    ", (long) numRead);
    
            if (errno != EAGAIN)            /* Unexpected error */------------因为O_NONBLOCK的方式打开,所以在消息队列被清空之后就会终止,mq_receive()会失败并返回EAGAIN。
                errExit("mq_receive");
        }
    }

    从实际使用看,O_NONBLOCK+信号处理的方式要比以阻塞打开,创建一个线程并在线程中进行mq_receive()复杂。

    1. mq_notify()生效时消息队列需要为空。

    2. 需要重复注册mq_notify()。

    6.2 通过线程接收通知

    通过mq_notify()注册线程消息通知,在空队列有消息到达后创建一个线程执行线程函数。

    #include <pthread.h>
    #include <mqueue.h>
    #include <fcntl.h>              /* For definition of O_NONBLOCK */
    #include "tlpi_hdr.h"
    
    static void notifySetup(mqd_t *mqdp);
    
    static void threadFunc(union sigval sv)
    {
        ssize_t numRead;
        mqd_t *mqdp;
        void *buffer;
        struct mq_attr attr;
    
        mqdp = sv.sival_ptr;
    
        if (mq_getattr(*mqdp, &attr) == -1)
            errExit("mq_getattr");
    
        buffer = malloc(attr.mq_msgsize);------------根据消息队列属性申请内存。
        if (buffer == NULL)
            errExit("malloc");
    
        /* Reregister for message notification */
    
        notifySetup(mqdp);---------------必须在清空消息队列之前,重新注册消息通知。
    
        while ((numRead = mq_receive(*mqdp, buffer, attr.mq_msgsize, NULL)) >= 0)-----一次性读取完所有消息,清空队列。
            printf("Read %ld bytes
    ", (long) numRead);
    
        if (errno != EAGAIN)                        /* Unexpected error */
            errExit("mq_receive");
    
        free(buffer);
    }
    
    static void
    notifySetup(mqd_t *mqdp)
    {
        struct sigevent sev;
    
        sev.sigev_notify = SIGEV_THREAD;            /* Notify via thread */
        sev.sigev_notify_function = threadFunc;----------------设置消息通知方式为启动线程,并且线程处理函数为threadFunc()。
        sev.sigev_notify_attributes = NULL;
                /* Could be pointer to pthread_attr_t structure */
        sev.sigev_value.sival_ptr = mqdp;      ----------------确保在threadFunc中可以调用mq_getattr()/mq_receive()等。
    
        if (mq_notify(*mqdp, &sev) == -1)
            errExit("mq_notify");
    }
    
    int
    main(int argc, char *argv[])
    {
        mqd_t mqd;
    
        if (argc != 2 || strcmp(argv[1], "--help") == 0)
            usageErr("%s mq-name
    ", argv[0]);
    
        mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);
        if (mqd == (mqd_t) -1)
            errExit("mq_open");
    
        notifySetup(&mqd);
        pause(); -----------------主程序永远暂停在此。
    }

    7. Linux特有的特性

    可以通过mount将POSIX消息队列挂载到文件系统中:

    mount -t mqueue none /dev/mqueue

    在/dev/mqueue中,可以查看每个mqueue的属性:

    cat /dev/mqueue/mq 
    QSIZE:0          NOTIFY:0     SIGNO:0     NOTIFY_PID:0

    QSIZE:为队列中所有数据的总字节数。

    NOTIFY:与sigev_notify对应的值,0表示SIGEV_SIGNAL,1表示SIGEV_NONE,2表示SIGEV_THREAD。

    SIGNO:为使用哪个信号来分发消息通知,0表示没有采用信号处理。

    NOTIFY_PID:为向该队列注册通知的进程ID,0表示没有注册。

    8. 消息队列限制

    MQ_PRIO_MAX:定义一条消息的最大优先级

    MQ_OPEN_MAX:限制一个进程最多能打开的消息队列数量

    msg_max:限制为新消息队列的mq_maxmsg特性的取值规定了一个上限

    msgsize_max:限制为非特权进程创建的新消息队列的mq_msgsize特性的取值规定了一个上限。当一个特权进程调用mq_open()时会忽略这个限制

    queues_max:是一个系统级别的限制,规定了系统上最多能够创建的消息队列的数量。一旦达到这个限制,只有特权进程才能够创建新都列。

    9. POSIX和System V消息队列比较

    POSIX消息队列相对于System V消息队列优势:

    1. POSIX IPC接口更加简单,并与UNIX文件模型更加一致。

    2. POSIX IPC对象是引用技术的,简化了确定合适删除一个对象的任务。

    3. 消息通知特性允许一个进程能够在一条消息进入之前为空的队列时异步地通过信号或线程的实例化来接收通知。

    4. 在Linux上可以使用poll()、select()以及epoll来监控POSIX消息队列。

    POSIX消息队列相对劣势:

    1. POSIX消息队列可移植性稍差。

    2. 与POSIX消息队列严格按照优先级排序相比,System V消息队列能够根据类型来选择消息的功能更加灵活。

  • 相关阅读:
    golang实现并发爬虫一(单任务版本爬虫功能)
    golang实现rabbitmq的五种模式
    KNN笔记
    Observer Pattern
    关于data-属性
    Python中的装饰器
    Python中的高阶函数与匿名函数
    Python中的列表生成式和多层表达式
    Thymeleaf读取国际化文本时出现??xxxxxx_zh_CN??问题
    Java8新特性(1):Lambda表达式
  • 原文地址:https://www.cnblogs.com/arnoldlu/p/12394140.html
Copyright © 2011-2022 走看看