zoukankan      html  css  js  c++  java
  • 消息队列

    消息队列

    顾名思义,消息队列就是一个消息的列表。用户可以从消息队列种添加消息、读取消息等。

    这些消息又是存在于内核中的,由“队列 ID”来标识。

    消息队列实现

    • 创建或打开消息队列

    msgget函数创建的消息队列的数量会受到系统消息队列数量的限制。

    SYNOPSIS

       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
    
       int msgget(key_t key, int msgflg);
    

    key: IPC_PRIVATE - 建立新消息队列;或一般为ftok建立key的返回值。
    msgflg: 指定IPC_CREAT - key对应的消息队列不存在,则建立新消息队列。
    return - 消息队列ID

    • 添加消息

    msgsnd 函数把消息添加到已打开的消息队列末尾。

    SYNOPSIS

       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
    
       int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
    

    msgp: 指向消息结构

           struct msgbuf {
               long mtype;       /* message type, must be > 0 */
               char mtext[1];    /* message data */
           };
    

    msgsz: 消息字节数
    msgflag: IPC_NOWAIT - 若消息没有发送,立即返回;0 - 阻塞直到满足条件

    • 读取消息

    读取消息使用的函数是 msgrcv,它把消息从消息队列中取走。

    与 FIFO 不同的是,这里可以指定取走某一条消息。

    SYNOPSIS

       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
    
       ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
                      int msgflg);
    

    msgp: 消息缓存区
    msgtyp: 0 - 接收消息队列中第一个消息; >0 - 接收消息队列中第一个类型为 msgtyp 的消息; <0 - 返回其类型小于或等于mtype参数的绝对值的最小的一个消息。
    msgflg:
    MSG_NOERROR - 若返回的消息比 size 字节多,则消息就会截短到size 字节,且不通知消息发送进程。
    IPC_NOWAIT - 若消息并没有立即发送而调用进程会立即返回。
    0: msgsnd 调用阻塞直到条件满足为止

    • 控制消息队列

    控制消息队列使用的函数是 msgctl,它可以完成多项功能。

    SYNOPSIS

       #include <sys/types.h>
       #include <sys/ipc.h>
       #include <sys/msg.h>
    
       int msgctl(int msqid, int cmd, struct msqid_ds *buf);
    

    cmd:
    IPC_STAT - 读取消息队列的数据结构 msqid_ds,并将其存储在buf 指定的地址中
    IPC_SET - 设置消息队列的数据结构 msqid_ds 中的 ipc_perm 元素的值, 这个值取自 buf 参数。
    IPC_RMID - 从系统内核中移走消息队列。

    实例:

    读消息进程创建消息队列,阻塞等待读取消息。读取消息后,删除消息队列。
    写消息进程打开已创建的消息队列,写入消息。

    读消息队列进程:

    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    
    #define BUFSZ 512
    
    struct message{
    	long msg_type;
    	char msg_text[BUFSZ];
    };
    
    int main()
    {
    	int qid;
    	key_t key;
    	int len;
    	struct message msg;
    
    	memset(&msg, 0, sizeof(msg));	
    
    	/* 根据不同的路径产生key */
    	if((key = ftok(".",'a')) == -1){
    		perror("ftok");
    		exit(1);
    	}	
    
    	/* 创建消息队列 */
    	if((qid = msgget(key,IPC_CREAT|0666)) == -1){
    		perror("msgget");
    		exit(1);
    	}
    	printf("opened queue %d
    ", qid);
    /*
    	puts("Please enter the message to queue:");
    	if((fgets((&msg)->msg_text,BUFSZ,stdin)) == NULL){
    		puts("no message");
    		exit(1);
    	}
    	msg.msg_type = getpid();
    	len = strlen(msg.msg_text);
    */
    	/* 添加消息到消息队列 */
    /*	if((msgsnd(qid,&msg,len,0)) < 0){
    		perror("message posted");
    		exit(1);
    	}
    */
    	printf("waiting message ...
    ");
    	/* 读取消息队列 */
    	if(msgrcv(qid,&msg,BUFSZ,0,0) < 0){
    		perror("msgrcv");
    		exit(1);
    	}
    	printf("message is:%s
    ",(&msg)->msg_text);
    
    	/* 从系统内核中移走消息队列 */
    	if((msgctl(qid,IPC_RMID,NULL)) < 0){
    		perror("msgctl");
    		exit(1);
    	}
    
    	exit(0);
    }
    

    写消息队列进程:

    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    
    #define BUFSZ 512
    
    struct message{
    	long msg_type;
    	char msg_text[BUFSZ];
    };
    
    int main()
    {
    	int qid;
    	key_t key;
    	int len;
    	struct message msg;
    
    	/* 根据不同的路径产生key */
    	if((key = ftok(".",'a')) == -1){
    		perror("ftok");
    		exit(1);
    	}	
    
    	/* 打开已存在的消息队列 */
    	if((qid = msgget(key, 0)) == -1){
    		perror("msgget");
    		exit(1);
    	}
    	printf("opened queue %d
    ", qid);
    
    	puts("Please enter the message to queue:");
    	if((fgets((&msg)->msg_text,BUFSZ,stdin)) == NULL){
    		puts("no message");
    		exit(1);
    	}
    	msg.msg_type = getpid();
    	len = strlen(msg.msg_text) - 1; /* -1 for remove newline */
    
    	/* 添加消息到消息队列 */
    	if((msgsnd(qid,&msg,len,0)) < 0){
    		perror("message posted");
    		exit(1);
    	}
    
    	/* 读取消息队列 */
    /*	if(msgrcv(qid,&msg,BUFSZ,0,0) < 0){
    		perror("msgrcv");
    		exit(1);
    	}
    	printf("message is:%s
    ",(&msg)->msg_text);
    */
    	/* 从系统内核中移走消息队列 */
    /*	if((msgctl(qid,IPC_RMID,NULL)) < 0){
    		perror("msgctl");
    		exit(1);
    	}
    */
    	exit(0);
    }
    

    结果如下:

    xxx@xxx-pc:~/Documents$ ./p1
    opened queue 196608
    waiting message ...
    message is:Are you OK ?
    
    xxx@xxx-pc:~/Documents$ ./p2
    opened queue 196608
    Please enter the message to queue:
    Are you OK ?
    

    如果消息队列未创建,就打开则报错:

    xxx@xxx-pc:~/Documents$ ./p2
    msgget: No such file or directory
  • 相关阅读:
    是否是轮回(续)
    夜雨做成秋
    53分
    浮生六记 一成长星和月
    企业信息化常见缩略语汇总
    是否是轮回
    对信号集操作函数的使用方法和顺序
    fcntl.h
    关于linux信号量的基本使用
    linux 共享内存
  • 原文地址:https://www.cnblogs.com/fuluwwa/p/6785172.html
Copyright © 2011-2022 走看看