zoukankan      html  css  js  c++  java
  • Linux IPC实践(5) --System V消息队列(2)

    消息发送/接收API

    msgsnd函数

    int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

    参数

       msgid: 由msgget函数返回的消息队列标识码, 也可以是通过ipcs命令查询出来的一个已经存在的消息队列的ID号

       msgp:是一个指针,指针指向准备发送的消息,

       msgsz:是msgp指向的消息长度, 注意:这个长度不含保存消息类型的那个long int长整型

       msgflg:控制着当前消息队列满到达系统上限时将要发生的事情,如果msgflg = IPC_NOWAIT表示队列满不等待,返回EAGAIN错误。

    消息结构在两方面受到制约: (1)它必须小于系统规定的上限值(MSGMAX); (2)必须以一个long int长整数开始,接收者函数将利用这个长整数确定消息的类型;

    //消息结构参考形式如下:
    struct msgbuf
    {
        long mtype;       /* message type, must be > 0 */
        char mtext[1];    /* message data, 可以设定为更多的字节数 */
    };
    /**示例1: 
    测试1: 发送消息的最大长度为8192字节, 一旦超过这个值, 则msgsnd出错, 提示 Invalid argument错误;
    测试2: 消息队列所能够接收的最大字节数16384字节, 一旦超过这个长度, 如果msgflg为0(阻塞模式), 则进程会一直阻塞下去, 直到有进程来将消息取走; 而如果msgflg为IPC_NOWAIT模式, 则一个字节也不会写入消息队列, 直接出错返回;
    **/
    int main(int argc, char *argv[])
    {
        if (argc != 3)
            err_quit("Usage: ./main <type> <length>");
    
        int type = atoi(argv[1]);
        int len = atoi(argv[2]);
    
        int msgid = msgget(0x255, 0666|IPC_CREAT);
        if (msgid == -1)
            err_exit("msgget error");
    
        struct msgbuf *buf;
        buf = (struct msgbuf *)malloc(len + sizeof(msgbuf::mtype));
        buf->mtype = type;
        if (msgsnd(msgid, buf, len, IPC_NOWAIT) == -1)
            err_exit("msgsnd error");
    }

    msgrcv函数

    ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg);

    参数

       msgid: 由msgget函数返回的消息队列标识码

       msgp:是一个指针,指针指向准备接收的消息;

       msgsz:是msgp指向的消息长度,这个长度不含保存消息类型的那个long int长整型

       msgtype:它可以实现接收优先级的简单形式(见下图)

       msgflg:控制着队列中没有相应类型的消息可供接收时将要发生的事(见下图)

    返回值:

       成功->返回实际放到接收缓冲区里去的字节数(注意: 此处并不包含msgbuf中的mtype的长度[man-page: msgrcv() returns the number of bytes actually copied into the mtext array.]);

       失败->返回-1;

     

    msgtyp参数

    msgtyp=0

    返回队列第一条信息

    msgtyp>0

    返回队列第一条类型等于msgtype的消息

    msgtyp<0

    返回队列第一条类型小于等于(<=)msgtype绝对值的消息,并且是满足条件的消息类型最小的消息(按照类型进行排序的顺序进行接收消息)

     

    msgflg参数

    msgflg=IPC_NOWAIT

    队列没有可读消息不等待,返回ENOMSG错误。

    msgflg=MSG_NOERROR

    消息大小超过msgsz(msgrcv 函数的第三个参数)时被截断, 并且不会报错

    msgtyp>0且msgflg=MSG_EXCEPT

    接收类型不等于msgtype的第一条消息

    /** 示例2: 消息接收(配合示例1中程序使用)
    说明: 	-t [number], 指定接收消息的类型, 类型为number的值
    -n ,指定以IPC_NOWAIT模式接收消息
    **/
    int main(int argc, char *argv[])
    {
        /** 解析参数 **/
        int type = 0;
        int flag = 0;
        int opt;
        while ((opt = getopt(argc, argv, "nt:")) != -1)
        {
            switch (opt)
            {
            case 'n':   // 指定IPC_NOWAIT选项
                flag |= IPC_NOWAIT;
                break;
            case 't':   // 指定接收的类型, 如果为0的话,说明是按照顺序接收
                type = atoi(optarg);
                break;
            default:
                exit(EXIT_FAILURE);
            }
        }
    
        int msgid = msgget(0x255, 0);
        if (msgid == -1)
            err_exit("msgget error");
    
        const int MSGMAX = 8192;    //指定一条消息的最大长度
        struct msgbuf *buf;
        buf = (struct msgbuf *)malloc(MSGMAX + sizeof(buf->mtype));
    
        ssize_t nrcv;
        if ((nrcv = msgrcv(msgid, buf, MSGMAX, type, flag)) == -1)
            err_exit("msgrcv error");
        cout << "recv " << nrcv << " bytes, type = " << buf->mtype << endl;
    }

    /** 综合示例: msgsnd/msgrcv, 消息发送/接收实践 **/
    //1. 消息发送
    int main()
    {
        int msgid = msgget(0x1234,0666|IPC_CREAT);
        if (msgid == -1)
            err_exit("msgget error");
    
        struct msgBuf myBuffer;
        for (int i = 0; i < 128; ++i)
        {
            myBuffer.mtype = i+1;
            sprintf(myBuffer.mtext,"Hello, My number is %d",i+1);
            if (msgsnd(msgid,&myBuffer,strlen(myBuffer.mtext),IPC_NOWAIT) == -1)
                err_exit("msgsnd error");
        }
    }
    //2. 消息接收:从队首不断的取数据
    int main(int argc, char *argv[])
    {
        int msgid = msgget(0x1234, 0);
        if (msgid == -1)
            err_exit("msgget error");
    
        struct msgBuf buf;
        ssize_t nrcv;
        while ((nrcv = msgrcv(msgid, &buf, sizeof(buf.mtext), 0, IPC_NOWAIT)) > 0)
        {
            cout << "recv " << nrcv << " bytes, type: " << buf.mtype
            << ", message: " << buf.mtext << endl;
        }
    }

    [附]-getopt函数的用法

    #include <unistd.h>
    int getopt(int argc, char * const argv[],
                      const char *optstring);
    
    extern char *optarg;
    extern int optind, opterr, optopt;
    //示例: 解析 ./main -n -t 3 中的参数选项
    int main(int argc, char *argv[])
    {
        while (true)
        {
            int opt = getopt(argc, argv, "nt:");
            if (opt == '?')
                exit(EXIT_FAILURE);
            else if (opt == -1)
                break;
    
            switch (opt)
            {
            case 'n':
                cout << "-n" << endl;
                break;
            case 't':
                int n = atoi(optarg);
                cout << "-t " << n << endl;
                break;
            }
        }
    }

  • 相关阅读:
    vue生命周期过程做了什么
    css_css3_实用属性_随时补充更新
    echarts的symbol引用本地图片写法
    无废话设计模式(1)--简单工厂、工厂方法、抽象工厂
    JavaWeb--Maven学习
    SpringCloud Alibaba实战 -引入服务网关Gateway
    从ReentrantLock看AQS (AbstractQueuedSynchronizer) 运行流程 抽象的队列式同步器
    架构的搭建(一)SpringCloud Alibaba
    配置中心之Nacos简介,使用及Go简单集成
    RabbitMQ
  • 原文地址:https://www.cnblogs.com/itrena/p/5926962.html
Copyright © 2011-2022 走看看