zoukankan      html  css  js  c++  java
  • Posix消息队列

    转载于:http://blog.csdn.net/zx714311728/article/details/53197196

    1.消息队列

    消息队列可以认为是一个消息链表,消息队列是随内核持续的。队列中每个消息的属性有:一个无符号整数优先级(Poxis)或一个长整数类型(System V);消息的数据部分长度(可以为0);数据本身。链表头含有当前队列的两个属性:队列中运行的最大消息数、每个消息的最大大小。消息队列的可能布局如下:

    Posix消息队列与System V消息队列主要区别:

    1.对Posix消息队列的读总是返回最高优先级的消息,对System V消息队列的读则可以返回任一指定优先级消息。

    2.往空队列放置消息时,Posix消息队列允许产生一个信号或启动一个线程,System V则不提供类似机制

    Posix消息队列与管道或FIFO的主要区别:

    1.在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。对管道和FIFO来说,除非读出者已存在,否则先有写入者是没有意义的。

    2.管道和FIFO是字节流模型,没有消息边界;消息队列则指定了数据长度,有边界。

    2.mq_open 、  mq_close、  mq_unlink

    //创建新消息队列或打开已存在的消息队列

    #include <mqueue.h>

    mqd_t mq_open(const char *name, int oflag, .../*mode_t mode, struct mq_attr *attr*/);

    返回值:成功,消息队列描述符;失败,-1

    消息队列描述符用作其余消息队列函数(mq_unlink除外)的第一个参数。

    name规则:必须以一个斜杠符打头,并且不能再包含任何其他斜杠符

    oflag:O_RDONLY、O_WRONLY、O_RDWR三者之一,按位或上O_CREAT、O_EXCL

    mode:S_ISRUSR、S_ISWUSR、S_ISRGRP、S_ISWGRP、S_ISROTH、S_ISWOTH

    attr:

    struct mq_attr

    {

            long mq_flags;//阻塞标志, 0或O_NONBLOCK

            long mq_maxmsg;//最大消息数

            long mq_msgsize;//每个消息最大大小

            long mq_curmsgs;//当前消息数

    };

    //关闭已打开的消息队列

    int mq_close(mqd_t mqd);

    返回值:成功,0;出错,-1

    一个进程终止时,它的所有打开的消息队列都关闭,如同调用了mq_close。

    //删除消息队列的name

    int mq_unlink(const char *name);

    返回值:成功,0;出错,-1

    每个消息队列有一个保存其当前打开的描述符数的引用计数,只有当引用计数为0时,才删除该消息队列。mq_unlink和mq_close都会让引用数减一

    例1:创建一个消息队列,并可使用排他性检验

    程序:

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <unistd.h>  
    4. #include <sys/stat.h>  
    5. #include <errno.h>  
    6.   
    7. #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)  
    8.   
    9. int main(int argc, char *argv[])  
    10. {  
    11.         int c;  
    12.         int flag = O_RDWR | O_CREAT;  
    13.         while ((c = getopt(argc, argv, "e")) != -1)  
    14.         {  
    15.                 switch(c)  
    16.                 {  
    17.                         case 'e':  
    18.                                 flag |= O_EXCL;  
    19.                                 break;  
    20.                 }  
    21.         }  
    22.         if (optind != argc -1)  
    23.         {  
    24.                 printf("usage: mqcreate [- e] <name> ");  
    25.                 exit(0);  
    26.         }  
    27.         mqd_t mqd = mq_open(argv[optind], flag, FILE_MODE, NULL);  
    28.         if (mqd == -1)  
    29.         {  
    30.                 printf("mq_open() error %d : %s ", errno, strerror(errno));  
    31.                 exit(-1);  
    32.         }  
    33.         mq_close(mqd);  
    34.         exit(0);  
    35. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <unistd.h>
    #include <sys/stat.h>
    #include <errno.h>
    
    #define FILE_MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
    
    int main(int argc, char *argv[])
    {
            int c;
            int flag = O_RDWR | O_CREAT;
            while ((c = getopt(argc, argv, "e")) != -1)
            {
                    switch(c)
                    {
                            case 'e':
                                    flag |= O_EXCL;
                                    break;
                    }
            }
            if (optind != argc -1)
            {
                    printf("usage: mqcreate [- e] <name>
    ");
                    exit(0);
            }
            mqd_t mqd = mq_open(argv[optind], flag, FILE_MODE, NULL);
            if (mqd == -1)
            {
                    printf("mq_open() error %d : %s
    ", errno, strerror(errno));
                    exit(-1);
            }
            mq_close(mqd);
            exit(0);
    }
    

    分析:

    1.mq_xxx()函数不是标准库函数,所以链接时需指定库,通过在最后加上-lrt选项来指定。

    2.程序运行之后可能会看不到消息队列,要看到创建的Posix消息队列,需执行以下操作:

        mkdir /dev/mqueue

        mount -t mqueue none /dev/mqueue

    3.程序通过getopt()函数获取带选项的命令行参数,optind指向下一个要读取的参数在argv[]中的位置。若遇到没包含在getopt第三个参数的选项字母,或者遇到一个没有所需参数的选项字母(通过在第三个参数后跟一个冒号指示)则出错。

    4.若命令行参数带有-e选项,则进行排他性检测。

    5.若mq_open()出错,则通过strerror(errno)函数得到错误说明。

    结果:

    例2:使用mq_unlink删除一个消息队列

    程序:

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <errno.h>  
    4.   
    5. int main(int argc, char *argv[])  
    6. {  
    7.         if (argc != 2)  
    8.         {  
    9.                 printf("argument error ");  
    10.                 exit(-1);  
    11.         }  
    12.         int val = mq_unlink(argv[1]);  
    13.         if (val != 0)  
    14.         {  
    15.                 printf("mq_unlink() error %d : %s ", errno, strerror(errno));  
    16.                 exit(-2);  
    17.         }  
    18.         exit(0);  
    19. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <errno.h>
    
    int main(int argc, char *argv[])
    {
            if (argc != 2)
            {
                    printf("argument error
    ");
                    exit(-1);
            }
            int val = mq_unlink(argv[1]);
            if (val != 0)
            {
                    printf("mq_unlink() error %d : %s
    ", errno, strerror(errno));
                    exit(-2);
            }
            exit(0);
    }
    

    分析:通过strerror(errno)得到错误信息

    结果:

    3.mq_getattr、  mq_setattr

    //获取、设置消息队列属性

    int mq_getattr(mqd_t mqd, struct mq_attr *attr);

    int mq_setattr(mqd_t mqd, const struct mq_attr *attr, struct mq_attr *oattr);

    返回值:成功,0;出错,-1

    消息队列先前属性返回到oattr中

    消息队列有4个属性,如下:

    struct mq_attr

    {

            long mq_flags;//阻塞标志, 0或O_NONBLOCK

            long mq_maxmsg;//最大消息数

            long mq_msgsize;//每个消息最大大小

            long mq_curmsgs;//当前消息数

    };

    其中,mq_setattr只能设置mq_flags属性;mq_open只能设置mq_maxmsg和mq_msgsize属性,并且两个必须要同时设置;mq_getattr返回全部4个属性。

    例3:打开一个消息队列并输出其属性

    程序:

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <errno.h>  
    4.   
    5. int main(int argc, char *argv[])  
    6. {  
    7.         if (argc != 2)  
    8.         {  
    9.                 printf("must have 2 arguments ! ");  
    10.                 exit(-1);  
    11.         }  
    12.         mqd_t mqd = mq_open(argv[1], O_RDONLY);  
    13.         if (mqd == -1)  
    14.         {  
    15.                 printf("mq_open() error %d: %s ", errno, strerror(errno));  
    16.                 exit(-2);  
    17.         }  
    18.         struct mq_attr attr;  
    19.         if (mq_getattr(mqd, &attr) != 0)  
    20.         {  
    21.                 printf("mq_open() error %d: %s ", errno, strerror(errno));  
    22.                 exit(-3);  
    23.         }  
    24.         if (attr.mq_flags == 0)  
    25.                 printf("mq_flags = 0, ");  
    26.         else  
    27.                 printf("mq_flags = O_NONBLOCK, ");  
    28.         printf("mq_maxmsg = %d, ", attr.mq_maxmsg);  
    29.         printf("mq_msgsize = %d, ", attr.mq_msgsize);  
    30.         printf("mq_curmsgs = %d ", attr.mq_curmsgs);  
    31.         exit(0);  
    32. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <errno.h>
    
    int main(int argc, char *argv[])
    {
            if (argc != 2)
            {
                    printf("must have 2 arguments !
    ");
                    exit(-1);
            }
            mqd_t mqd = mq_open(argv[1], O_RDONLY);
            if (mqd == -1)
            {
                    printf("mq_open() error %d: %s
    ", errno, strerror(errno));
                    exit(-2);
            }
            struct mq_attr attr;
            if (mq_getattr(mqd, &attr) != 0)
            {
                    printf("mq_open() error %d: %s
    ", errno, strerror(errno));
                    exit(-3);
            }
            if (attr.mq_flags == 0)
                    printf("mq_flags = 0, ");
            else
                    printf("mq_flags = O_NONBLOCK, ");
            printf("mq_maxmsg = %d, ", attr.mq_maxmsg);
            printf("mq_msgsize = %d, ", attr.mq_msgsize);
            printf("mq_curmsgs = %d
    ", attr.mq_curmsgs);
            exit(0);
    }
    

    分析:因为要先通过mq_open打开消息队列,才能获取其属性,所以消息队列的命令行参数必须要符合name的规则。

    结果:

    例4:重新实现例1,这次创建消息队列时,指定其属性。

    程序:

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <errno.h>  
    4. #include <unistd.h>  
    5. #include <fcntl.h>  
    6.   
    7. #define MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)  
    8.   
    9. struct mq_attr attr;  
    10.   
    11. int main(int argc, char *argv[])  
    12. {  
    13.         int c;  
    14.         int flag = O_RDWR | O_CREAT;  
    15.         while ((c = getopt(argc, argv, "em:z:")) != -1)  
    16.         {  
    17.                 switch (c)  
    18.                 {  
    19.                         case 'e':  
    20.                                 flag |= O_EXCL;  
    21.                                 break;  
    22.                         case 'm':  
    23.                                 attr.mq_maxmsg = atol(optarg);  
    24.                                 break;  
    25.                         case 'z':  
    26.                                 attr.mq_msgsize = atol(optarg);  
    27.                                 break;  
    28.                 }  
    29.         }  
    30.         if (optind != argc -1)  
    31.         {  
    32.                 printf("usage: mqcreate -e [-m maxsg] [-z msgsize] <name> ");  
    33.                 exit(-1);  
    34.         }  
    35.         if ((attr.mq_maxmsg != 0 && attr.mq_msgsize == 0)  
    36.                 || (attr.mq_maxmsg == 0 && attr.mq_msgsize != 0))  
    37.         {  
    38.                 printf("must specify  both -m maxmsg and -z msgsize ");  
    39.                 exit(-2);  
    40.         }  
    41.         mqd_t mqd = mq_open(argv[optind], flag, MODE, (attr.mq_maxmsg != 0) ? &attr : NULL);  
    42.         if (mqd == -1)  
    43.         {  
    44.                 printf("mq_open() error %d: %s ", errno, strerror(errno));  
    45.                 exit(-3);  
    46.         }  
    47.         int rtn = mq_close(mqd);  
    48.         if (rtn == -1)  
    49.         {  
    50.                 printf("mq_close() error ");  
    51.                 exit(-4);  
    52.         }  
    53.         exit(0);  
    54. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <errno.h>
    #include <unistd.h>
    #include <fcntl.h>
    
    #define MODE (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)
    
    struct mq_attr attr;
    
    int main(int argc, char *argv[])
    {
            int c;
            int flag = O_RDWR | O_CREAT;
            while ((c = getopt(argc, argv, "em:z:")) != -1)
            {
                    switch (c)
                    {
                            case 'e':
                                    flag |= O_EXCL;
                                    break;
                            case 'm':
                                    attr.mq_maxmsg = atol(optarg);
                                    break;
                            case 'z':
                                    attr.mq_msgsize = atol(optarg);
                                    break;
                    }
            }
            if (optind != argc -1)
            {
                    printf("usage: mqcreate -e [-m maxsg] [-z msgsize] <name>
    ");
                    exit(-1);
            }
            if ((attr.mq_maxmsg != 0 && attr.mq_msgsize == 0)
                    || (attr.mq_maxmsg == 0 && attr.mq_msgsize != 0))
            {
                    printf("must specify  both -m maxmsg and -z msgsize
    ");
                    exit(-2);
            }
            mqd_t mqd = mq_open(argv[optind], flag, MODE, (attr.mq_maxmsg != 0) ? &attr : NULL);
            if (mqd == -1)
            {
                    printf("mq_open() error %d: %s
    ", errno, strerror(errno));
                    exit(-3);
            }
            int rtn = mq_close(mqd);
            if (rtn == -1)
            {
                    printf("mq_close() error
    ");
                    exit(-4);
            }
            exit(0);
    }
    

    分析:

    1.通过getopt参数获取命令行选项,可见例1的分析。其中“em:z:”,选项字母后带冒号的意思是,-m选项后要跟参数,-z选项后也要跟参数,否则出错。

    2.optarg代表选项后的参数。

    3.如果设置,则必须同时设置mq_maxmsg和mq_msgsize两个属性。

    结果:

    4.mq_send、  mq_receive

    //往消息队列中放置一个消息

    int mq_send(mqd_t mqd, const char *ptr, size_t len, unsigned int prio);

            返回值:成功,0;出错,-1

    说明:优先级prio要小于MQ_PRIO_MAX(此值最少为32)。

    //从消息队列中取走一个消息

    ssize_t mq_receive(mqd_t mqd, char *ptr, size_t len, unsigned int *prio);

            返回值:成功,消息中字节数;出错,-1

    说明:消息内存的长度len,最小要等于mq_msgsize。mq_receive总是返回消息队列中最高优先级的最早消息。

    #include <mqueue.h>
    int mq_timedsend(mqd_t mqdes, const char *msg_ptr,size_t
    msg_len, unsigned msg_prio,const struct timespec *abs_timeout);
    需要 -lrt 来链接

    mq_timedsend():
    _XOPEN_SOURCE >= 600 || _POSIX_C_SOURCE >= 200112L

    mq_timedsend() 的行为与 mq_send() 很类似,只是消息队列满且
    O_NONBLOCK 标志没有设置时,abs_timeout 指向的结构指定了一个阻塞的
    软上限时间。这个上限通过从 Epoch 1970-01-01 00:00:00 +0000 (UTC) 开始的
    绝对的秒和纳秒数指定超时,它的结构定义如下:
    struct timespec { time_t tv_sec; /* 秒 */ long tv_nsec; /* 纳秒 */ };
    如果消息队列满,并且调用时超时设置已经达到,mq_timedsend() 立刻返回

    #include <time.h>
    #include <mqueue.h>
    ssize_t mq_timedreceive(mqd_t mqdes, char *msg_ptr,size_t
    msg_len, unsigned *msg_prio,const struct timespec *abs_timeout);
    需要 -lrt 来链接。

    mq_timedreceive():
    _XOPEN_SOURCE >= 600 || _POSIX_C_SOURCE >= 200112L 描述
    mq_timedreceive() 的行为与 mq_receive() 很相似,只不过当队列是空且
    O_NONBLOCK 标志没有针对相应消息队列描述符启用时,调用会阻塞到
    abs_timeout 时间点时返回。这个软上限是一个绝对的时刻值,它计算从
    Epoch 1970-01-01 00:00:00 +0000 (UTC) 开始的秒数和纳秒数,这个结构的定
    义如下:
    struct timespec { time_t tv_sec; /* 秒 */ long tv_nsec; /* 纳秒 */ }; 如果没有消
    息有效,并且超时的时刻已经达到,mq_timedreceive() 立即返回。

    例5:往消息队列中添加一个消息

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <errno.h>  
    4.   
    5. int main(int argc, char *argv[])  
    6. {  
    7.         char *ptr;  
    8.         size_t size;  
    9.         unsigned long prio;  
    10.         mqd_t mqd;  
    11.         if (argc != 4)  
    12.         {  
    13.                 printf("usage: mqsend <name> <#bytes> <priority> ");  
    14.                 exit(-1);  
    15.         }  
    16.         if ((mqd = mq_open(argv[1], O_WRONLY)) == -1)  
    17.         {  
    18.                 printf("mq_open() error %d: %s ", errno, strerror(errno));  
    19.                 exit(-1);  
    20.         }  
    21.         size = atoi(argv[2]);  
    22.         prio = atoi(argv[3]);  
    23.         ptr = calloc(size, sizeof(char));  
    24.         if (mq_send(mqd, ptr, size, prio) == -1)  
    25.         {  
    26.                 printf("mq_send() error %d: %s ", errno, strerror(errno));  
    27.                 exit(-1);  
    28.         }  
    29.         exit(0);  
    30. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <errno.h>
    
    int main(int argc, char *argv[])
    {
            char *ptr;
            size_t size;
            unsigned long prio;
            mqd_t mqd;
            if (argc != 4)
            {
                    printf("usage: mqsend <name> <#bytes> <priority>
    ");
                    exit(-1);
            }
            if ((mqd = mq_open(argv[1], O_WRONLY)) == -1)
            {
                    printf("mq_open() error %d: %s
    ", errno, strerror(errno));
                    exit(-1);
            }
            size = atoi(argv[2]);
            prio = atoi(argv[3]);
            ptr = calloc(size, sizeof(char));
            if (mq_send(mqd, ptr, size, prio) == -1)
            {
                    printf("mq_send() error %d: %s
    ", errno, strerror(errno));
                    exit(-1);
            }
            exit(0);
    }
    

    分析:

    1.通过atoi()函数将命令行参数转换成int变量

    2.通过calloc()函数分配内存,并初始化为0

    3.要发送消息,要先为消息分配内存。 结果:和例6一起

    例6:从消息队列中取出一个消息

    程序:

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <errno.h>  
    4. #include <stdlib.h>  
    5. #include <fcntl.h>  
    6. #include <unistd.h>  
    7.   
    8. #define ERR_EXIT(m)  
    9.         {  
    10.                 printf("%s() error %d: %s ", m, errno, strerror(errno));  
    11.                 exit(EXIT_FAILURE);  
    12.         }  
    13.   
    14. int main(int argc, char *argv[])  
    15. {  
    16.         int c;  
    17.         int flag = O_RDONLY;  
    18.         //设置非阻塞方式  
    19.         while ((c = getopt(argc, argv, "n")) != -1)  
    20.         {  
    21.                 switch(c)  
    22.                 {  
    23.                         case 'n':  
    24.                                 flag |= O_NONBLOCK;  
    25.                                 break;  
    26.                 }  
    27.         }  
    28.         if (optind != argc -1)  
    29.         {  
    30.                 printf("usage: mqreceive [-n] <name> ");  
    31.                 exit(-1);  
    32.         }  
    33.         //打开消息队列  
    34.         mqd_t mqd;  
    35.         if ((mqd = mq_open(argv[optind], flag)) == -1)  
    36.                 ERR_EXIT("mq_open");  
    37.   
    38.         //获取消息队列中每个消息最大大小  
    39.         struct mq_attr attr;  
    40.         if ((mq_getattr(mqd, &attr)) == -1)  
    41.                 ERR_EXIT("mq_getattr");  
    42.         long len = attr.mq_msgsize;  
    43.   
    44.         //从消息队列中取出消息  
    45.         unsigned int prio;//无符号整型  
    46.         ssize_t n;  
    47.         char *buff = (char *)malloc(attr.mq_msgsize * sizeof(char));  
    48.         if ((n = mq_receive(mqd, buff, len, &prio)) == -1)  
    49.                 ERR_EXIT("mq_getattr");  
    50.         printf("read %d bytes, priority = %d ", n, prio);  
    51.         exit(0);  
    52. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <fcntl.h>
    #include <unistd.h>
    
    #define ERR_EXIT(m)
            {
                    printf("%s() error %d: %s
    ", m, errno, strerror(errno));
                    exit(EXIT_FAILURE);
            }
    
    int main(int argc, char *argv[])
    {
            int c;
            int flag = O_RDONLY;
            //设置非阻塞方式
            while ((c = getopt(argc, argv, "n")) != -1)
            {
                    switch(c)
                    {
                            case 'n':
                                    flag |= O_NONBLOCK;
                                    break;
                    }
            }
            if (optind != argc -1)
            {
                    printf("usage: mqreceive [-n] <name>
    ");
                    exit(-1);
            }
            //打开消息队列
            mqd_t mqd;
            if ((mqd = mq_open(argv[optind], flag)) == -1)
                    ERR_EXIT("mq_open");
    
            //获取消息队列中每个消息最大大小
            struct mq_attr attr;
            if ((mq_getattr(mqd, &attr)) == -1)
                    ERR_EXIT("mq_getattr");
            long len = attr.mq_msgsize;
    
            //从消息队列中取出消息
            unsigned int prio;//无符号整型
            ssize_t n;
            char *buff = (char *)malloc(attr.mq_msgsize * sizeof(char));
            if ((n = mq_receive(mqd, buff, len, &prio)) == -1)
                    ERR_EXIT("mq_getattr");
            printf("read %d bytes, priority = %d
    ", n, prio);
            exit(0);
    }
    

    分析:

    1.要取出消息,则先要为消息分配内存。

    2.open时,若未设置O_NONBLOCK选项,则一直阻塞,直到消息队列中有消息为止;设置了O_NONBLOCK选项,若消息队列中没有消息,则立刻出错返回。

    结果:

    5.mq_notify

    Posix消息队列允许异步事件通知,以告知何时有一个消息放置到了某个空消息队列中。这种通知有两种方式可供选择:

    (1)产生一个信号

    (2)创建一个线程来执行指定的函数

    //这种通知通过调用mq_notify建立

    int mq_notify(mqd_t mqd, const struct sigevent *notification);

    返回值:成功,0;出错-1

    1.notification非空,该进程被注册为接收该消息队列的通知

    2.notification空,且当前队列已被注册,则已存在的注册被撤销

    3.对一个消息队列来说,任一时刻只有一个进程可以被注册

    4.在mq_receive调用中的阻塞比任何通知的注册都优先收到消息

    5.当通知被发送给注册进程时,注册即被撤销,该进程若要重新注册,则必须重新调用mq_notify()

    #include <signal.h>

    struct sigevent

    {

    int sigev_notify;

    int sigev_signo;

    union sigval sigev_value;

    void (*sigev_notify_function)(union sigval);

    pthread_attr_t *sigev_notify_attributes;

    };

    sigev_notify取值:

    SIGEV_NONE:事件发生时,什么也不做;SIGEV_SIGNAL:事件发生时,将sigev_signo指定的信号发送给指定的进程;SIGEV_THREAD:事件发生时,内核会(在此进程内)以sigev_notify_attributes为线程属性创建一个线程,并让其执行sigev_notify_function,并以sigev_value为其参数

    sigev_signo:在sigev_notify=SIGEV_SIGNAL时使用,指定信号类别

    sigev_value:sigev_notify=SIGEV_SIGEV_THREAD时使用,作为sigev_notify_function的参数

    union sigval

    {

    int sival_int;

    void *sival_ptr;

    };

    sigev_notify_function:在sigev_notify=SIGEV_THREAD时使用,其他情况下置NULL

    sigev_notify_attributes:在sigev_notify=SIGEV_THREAD时使用,指定创建线程的属性,其他情况下置NULL

    例7:注册一个进程,当消息队列中有消息到来时,该进程被通知,并在其信号处理程序中取出消息。

    程序:

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <errno.h>  
    4. #include <stdlib.h>  
    5. #include <signal.h>  
    6.   
    7. #define ERR_EXIT(m)  
    8.         {  
    9.                 printf("%s() error %d: %s ", m, errno, strerror(errno));  
    10.                 exit(EXIT_FAILURE);  
    11.         }  
    12.   
    13. mqd_t mqd;  
    14. void *buff;  
    15. struct mq_attr attr;  
    16. struct sigevent event;  
    17.   
    18. static void sig_usr1(int signo);  
    19. int main(int argc, char *argv[])  
    20. {  
    21.         if (signal(SIGUSR1, sig_usr1) == SIG_ERR)  
    22.                 ERR_EXIT("signal");  
    23.         if ((mqd = mq_open(argv[1], O_RDONLY)) == -1)  
    24.                 ERR_EXIT("mq_open");  
    25.         if (mq_getattr(mqd, &attr) == -1)  
    26.                 ERR_EXIT("mq_getattr");  
    27.         buff = malloc(attr.mq_msgsize);  
    28.   
    29.         //注册进程接收队列的通知  
    30.         event.sigev_notify = SIGEV_SIGNAL;  
    31.         event.sigev_signo = SIGUSR1;  
    32.         if (mq_notify(mqd, &event) == -1)//注册进程,消息队列中有消息来时,触发事件。  
    33.                 ERR_EXIT("mq_notify");  
    34.   
    35.         //一直执行,等待信号到来  
    36.         for (;;)  
    37.                 pause();  
    38.         exit(0);  
    39. }  
    40.   
    41. static void sig_usr1(int signo)  
    42. {  
    43.         //重新注册  
    44.         if (mq_notify(mqd, &event) == -1)  
    45.                 ERR_EXIT("mq_notify");  
    46.   
    47.         //取出消息  
    48.         ssize_t n;  
    49.         unsigned int prio;  
    50.         if ((n = mq_receive(mqd, buff, attr.mq_msgsize, &prio)) == -1)  
    51.                 ERR_EXIT("mq_receive");  
    52.         printf("SIGUSR1 received, read %d bytes, priority = %d ", n, prio);  
    53. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <signal.h>
    
    #define ERR_EXIT(m)
            {
                    printf("%s() error %d: %s
    ", m, errno, strerror(errno));
                    exit(EXIT_FAILURE);
            }
    
    mqd_t mqd;
    void *buff;
    struct mq_attr attr;
    struct sigevent event;
    
    static void sig_usr1(int signo);
    int main(int argc, char *argv[])
    {
            if (signal(SIGUSR1, sig_usr1) == SIG_ERR)
                    ERR_EXIT("signal");
            if ((mqd = mq_open(argv[1], O_RDONLY)) == -1)
                    ERR_EXIT("mq_open");
            if (mq_getattr(mqd, &attr) == -1)
                    ERR_EXIT("mq_getattr");
            buff = malloc(attr.mq_msgsize);
    
            //注册进程接收队列的通知
            event.sigev_notify = SIGEV_SIGNAL;
            event.sigev_signo = SIGUSR1;
            if (mq_notify(mqd, &event) == -1)//注册进程,消息队列中有消息来时,触发事件。
                    ERR_EXIT("mq_notify");
    
            //一直执行,等待信号到来
            for (;;)
                    pause();
            exit(0);
    }
    
    static void sig_usr1(int signo)
    {
            //重新注册
            if (mq_notify(mqd, &event) == -1)
                    ERR_EXIT("mq_notify");
    
            //取出消息
            ssize_t n;
            unsigned int prio;
            if ((n = mq_receive(mqd, buff, attr.mq_msgsize, &prio)) == -1)
                    ERR_EXIT("mq_receive");
            printf("SIGUSR1 received, read %d bytes, priority = %d
    ", n, prio);
    }
    

    分析:

    因为通知被发送给注册进程时,其注册即被撤销,所以在信号处理程序中重新注册。 结果:

    例7中的程序有个问题:

    在信号处理程序中调用的函数都不是异步信号安全函数,即不可重入的,具体参考apue 10.6节。http://blog.csdn.net/zx714311728/article/details/53056927#t4

    前面说过,通知有两种方式,一种是产生一个信号,另一种是创建一个线程执行指定函数。例7是产生一个信号,下面例8是创建一个线程。

    例8:空消息队列有消息到来时,通知注册进程,注册进程创建一个线程执行指定函数操作。

    程序:

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <errno.h>  
    4. #include <stdlib.h>  
    5. #include <fcntl.h>  
    6. #include <unistd.h>  
    7.   
    8. #define ERR_EXIT(m)  
    9.         {  
    10.                 printf("%s() error %d: %s ", m, errno, strerror(errno));  
    11.                 exit(EXIT_FAILURE);  
    12.         }  
    13.   
    14. mqd_t mqd;  
    15. struct mq_attr attr;  
    16. struct sigevent event;  
    17.   
    18. static void thread_func(union sigval);  
    19.   
    20. int main(int argc, char *argv[])  
    21. {  
    22.         if (argc != 2)  
    23.         {  
    24.                 printf("argument error ");  
    25.                 exit(-1);  
    26.         }  
    27.   
    28.         //打开消息队列  
    29.         if ((mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK)) == -1)  
    30.                 ERR_EXIT("mq_open");  
    31.         if (mq_getattr(mqd, &attr) == -1)  
    32.                 ERR_EXIT("mq_getattr");  
    33.   
    34.         //注册进程  
    35.         event.sigev_notify = SIGEV_THREAD;  
    36.         event.sigev_value.sival_ptr = NULL;  
    37.         event.sigev_notify_attributes = NULL;  
    38.         event.sigev_notify_function = thread_func;  
    39.         if (mq_notify(mqd, &event) == -1)  
    40.                 ERR_EXIT("mq_notify");  
    41.   
    42.         for (;;)  
    43.                 pause();  
    44.         exit(0);  
    45. }  
    46.   
    47. void thread_func(union sigval value)  
    48. {  
    49.         ssize_t n;  
    50.         void *buff;  
    51.         long len;  
    52.         unsigned int prio;  
    53.   
    54.         buff = malloc(attr.mq_msgsize);  
    55.         len = attr.mq_msgsize;  
    56.   
    57.         if (mq_notify(mqd, &event) == -1)//重新注册进程  
    58.                 ERR_EXIT("mq_notify");  
    59.         if ((n = mq_receive(mqd, buff, len, &prio)))  
    60.                 printf("read %d bytes, priority = %d ", n, prio);  
    61.         free(buff);  
    62.         pthread_exit((void *)0);  
    63. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <fcntl.h>
    #include <unistd.h>
    
    #define ERR_EXIT(m)
            {
                    printf("%s() error %d: %s
    ", m, errno, strerror(errno));
                    exit(EXIT_FAILURE);
            }
    
    mqd_t mqd;
    struct mq_attr attr;
    struct sigevent event;
    
    static void thread_func(union sigval);
    
    int main(int argc, char *argv[])
    {
            if (argc != 2)
            {
                    printf("argument error
    ");
                    exit(-1);
            }
    
            //打开消息队列
            if ((mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK)) == -1)
                    ERR_EXIT("mq_open");
            if (mq_getattr(mqd, &attr) == -1)
                    ERR_EXIT("mq_getattr");
    
            //注册进程
            event.sigev_notify = SIGEV_THREAD;
            event.sigev_value.sival_ptr = NULL;
            event.sigev_notify_attributes = NULL;
            event.sigev_notify_function = thread_func;
            if (mq_notify(mqd, &event) == -1)
                    ERR_EXIT("mq_notify");
    
            for (;;)
                    pause();
            exit(0);
    }
    
    void thread_func(union sigval value)
    {
            ssize_t n;
            void *buff;
            long len;
            unsigned int prio;
    
            buff = malloc(attr.mq_msgsize);
            len = attr.mq_msgsize;
    
            if (mq_notify(mqd, &event) == -1)//重新注册进程
                    ERR_EXIT("mq_notify");
            if ((n = mq_receive(mqd, buff, len, &prio)))
                    printf("read %d bytes, priority = %d
    ", n, prio);
            free(buff);
            pthread_exit((void *)0);
    }
    

    结果:已测试通过,和例7一样,懒得放图了。

    6.sigwait

    在多线程的环境中应使用sigwait、sigwaitinfo和sigtimedwait来处理所有信号,而绝不要用异步信号处理程序。多线程环境中也不应使用sigprocmask,而应使用pthread_sigmask。具体见:http://blog.csdn.net/yusiguyuan/article/details/14230719

    #include <signal.h>

    int sigwait(const sigset_t *set, int *sig);

    返回值:成功,0;出错,正的Exxx值

    调用sigwait前阻塞set信号集,sigwait然后一直阻塞到这些信号中有一个或多个待处理,这时它返回其中一个信号存放到sig。此过程为“同步的等待一个异步事件”:我们使用了信号,但没有涉及异步信号处理程序。

    例9:修改例7的问题,不使用异步信号处理程序,而是使用sigwait。

    程序:

    1. #include <stdio.h>  
    2. #include <mqueue.h>  
    3. #include <signal.h>  
    4. #include <errno.h>  
    5. #include <stdlib.h>  
    6.   
    7. #define ERR_EXIT(m)  
    8.         {  
    9.                 printf("%s() error %d: %s ", m, errno, strerror(errno));  
    10.                 exit(EXIT_FAILURE);  
    11.         }  
    12.   
    13. int main(int argc, char *argv[])  
    14. {  
    15.         if (argc != 2)  
    16.         {  
    17.                 printf("argument error ");  
    18.                 exit(-1);  
    19.         }  
    20.   
    21.         mqd_t mqd;  
    22.         struct sigevent event;  
    23.         sigset_t newmask, oldmask, zeromask;  
    24.         struct mq_attr attr;  
    25.         void *buff;  
    26.         long len;  
    27.         int signo;  
    28.         ssize_t n;  
    29.         unsigned int prio;  
    30.   
    31.         //打开消息队列,获取消息队列属性  
    32.         if ((mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK)) == -1)  
    33.                 ERR_EXIT("mq_open");  
    34.         if (mq_getattr(mqd, &attr) == -1)  
    35.                 ERR_EXIT("mq_getattr");  
    36.         buff = malloc(attr.mq_msgsize);  
    37.         len = attr.mq_msgsize;  
    38.   
    39.         //设置阻塞信号集,阻塞信号  
    40.         sigemptyset(&newmask);  
    41.         sigaddset(&newmask, SIGUSR1);  
    42.         sigprocmask(SIG_BLOCK, &newmask, NULL);  
    43.   
    44.         //注册进程  
    45.         event.sigev_notify = SIGEV_SIGNAL;  
    46.         event.sigev_signo = SIGUSR1;  
    47.         if (mq_notify(mqd, &event) == -1)  
    48.                 ERR_EXIT("mq_notify");  
    49.   
    50.         //使注册进程永久执行,当空消息队列中有消息到来时,进行处理  
    51.         for (;;)  
    52.         {  
    53.                 sigwait(&newmask, &signo);//阻塞,直到有个信号到来  
    54.                 if (signo == SIGUSR1)  
    55.                 {  
    56.                         if (mq_notify(mqd, &event) == -1)//重新注册  
    57.                                 ERR_EXIT("mq_notify");  
    58.                         if ((n = mq_receive(mqd, buff, len, &prio)) == -1)//获取消息队列中消息  
    59.                         {  
    60.                                 ERR_EXIT("mq_receive");  
    61.                         }  
    62.                         else  
    63.                                 printf("SIGUSR1 received, read %dbytes, priority = %d ", n, prio);  
    64.                 }  
    65.         }  
    66.         exit(0);  
    67. }  
    #include <stdio.h>
    #include <mqueue.h>
    #include <signal.h>
    #include <errno.h>
    #include <stdlib.h>
    
    #define ERR_EXIT(m)
            {
                    printf("%s() error %d: %s
    ", m, errno, strerror(errno));
                    exit(EXIT_FAILURE);
            }
    
    int main(int argc, char *argv[])
    {
            if (argc != 2)
            {
                    printf("argument error
    ");
                    exit(-1);
            }
    
            mqd_t mqd;
            struct sigevent event;
            sigset_t newmask, oldmask, zeromask;
            struct mq_attr attr;
            void *buff;
            long len;
            int signo;
            ssize_t n;
            unsigned int prio;
    
            //打开消息队列,获取消息队列属性
            if ((mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK)) == -1)
                    ERR_EXIT("mq_open");
            if (mq_getattr(mqd, &attr) == -1)
                    ERR_EXIT("mq_getattr");
            buff = malloc(attr.mq_msgsize);
            len = attr.mq_msgsize;
    
            //设置阻塞信号集,阻塞信号
            sigemptyset(&newmask);
            sigaddset(&newmask, SIGUSR1);
            sigprocmask(SIG_BLOCK, &newmask, NULL);
    
            //注册进程
            event.sigev_notify = SIGEV_SIGNAL;
            event.sigev_signo = SIGUSR1;
            if (mq_notify(mqd, &event) == -1)
                    ERR_EXIT("mq_notify");
    
            //使注册进程永久执行,当空消息队列中有消息到来时,进行处理
            for (;;)
            {
                    sigwait(&newmask, &signo);//阻塞,直到有个信号到来
                    if (signo == SIGUSR1)
                    {
                            if (mq_notify(mqd, &event) == -1)//重新注册
                                    ERR_EXIT("mq_notify");
                            if ((n = mq_receive(mqd, buff, len, &prio)) == -1)//获取消息队列中消息
                            {
                                    ERR_EXIT("mq_receive");
                            }
                            else
                                    printf("SIGUSR1 received, read %dbytes, priority = %d
    ", n, prio);
                    }
            }
            exit(0);
    }
    

    分析:

    此程序没有使用异步信号处理程序,而是使用了sigwait以接收信号 结果:

    7.Posix实时信号

    信号可划分为两个大组:

    (1)值在SIGRTMIN和SIGRTMAX之间(包括两者在内)的实时信号

    (2)所有其他信号:SIGALRM、SIGINT、SIGKILL等

    实时信号中“实时”的意思是:

    (1)信号是排队的,是按先进先出(FIFO)顺序排队的。同一信号产生3次,则递交3次

    (2)当有多个SIGRTMIN到SIGRTMAX范围内的解阻塞信号排队时,值较小的信号先与较大的信号递交

    (3)实时信号能比非实时信号携带更多信息

    此部分因涉及到其他部分内容,只粗略看了下。

  • 相关阅读:
    hadoop生态--ElasticSearch--ES操作
    Haoop生态--ElasticSeaarch(1)--ES预备知识(全文检索的概念、Lucence、倒排索引)
    hadoop生态--Hive(2)--Hive的使用方式
    hadoop生态--Zookeeper
    gsoap使用
    set容器
    如何杀死defunct进程
    关于多态
    数组类型与函数指针基本语法知识
    syslog日志
  • 原文地址:https://www.cnblogs.com/baiduboy/p/6107620.html
Copyright © 2011-2022 走看看