zoukankan      html  css  js  c++  java
  • 消息队列

    消息队列是比较常用的进程间通信方式。在公司的代码中应用也比较广泛。

    消息队列一般用在需要异步执行,并且消息内容比较短的情况。一般用来发控制消息,和反馈的消息。

    比如说,一个进程需要另外一个进程做某些操作,并且希望得到操作完成的反馈信息。

    首先将消息队列的函数进行封装,并编译成so.

    基本的结构体和封装函数如msg.h

    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <stdio.h>
    #include <errno.h>
    typedef enum{
      OPEN,
      CLOSE,
      PLAY,
      PAUSE,
      PREV,
      NEXT,
    }CtrlCmd;//控制命令
    typedef enum{
      OPEN_DONE,
      CLOSE_DONE,
      PLAY_DONE,
      PAUSE_DONE,
      PREV_DONE,
      NEXT_DONE,
    }CtrlFeedback;//反馈结果
    typedef struct{
      union {
      CtrlCmd _ctlCmd;
      CtrlFeedback _ctlFeedback;
      }u;
      int param;
    }CtrlInfo;
    typedef enum {
      CTRL_CMD = 1,//控制消息
      CTRL_FEEDBACK = 2,//反馈信息
    }MsgType;
    typedef struct{
      long int _msgType;
      CtrlInfo _ctlInfo;
    }FellowMsg;
    int fellow_create_msg_queue();

    int fellow_remove_msg_queue(int msg_q_id);

    int fellow_send_msg(int msg_q_id, FellowMsg *pFellowMsg);

    int fellow_rcv_msg(int msg_q_id, FellowMsg *pFellowMsg);

    相关封装函数在msg.c

    #include "msg.h"
    int fellow_create_msg_queue(const char *path, int proj_id)
    {
      key_t key;
      int msg_q_id = -1;
      if (-1 == (key = ftok(path, proj_id)))
      {
        printf("ftok fail, errno:%d ", errno);
        return -1;
      }
      printf("key:0x%x ", (int)key);
      if (-1 == (msg_q_id = msgget(key, IPC_CREAT | 0666)))
      {
        printf("msgget fail, errno:%d", errno);
        return -1;
      }
      return msg_q_id;

    }

    int fellow_remove_msg_queue(int msg_q_id)
    {
      if (-1 == msgctl(msg_q_id, IPC_RMID, NULL))
      {
        printf("msgctl fail, errno:%d", errno);
        return -1;
      }
      return 0;
    }

    int fellow_send_msg(int msg_q_id, FellowMsg *pFellowMsg)
    {
      if (NULL == pFellowMsg)
      {
        return -1;
      }
      size_t msgLen = sizeof(FellowMsg) - sizeof(long int);
      if (-1 == msgsnd(msg_q_id, (void *)pFellowMsg, msgLen, 0))
      {
        printf("msgsnd fail, errno:%d", errno);
        return -1;
      }
      return 0;
    }

    int fellow_rcv_msg(int msg_q_id, FellowMsg *pFellowMsg)
    {
      if (NULL == pFellowMsg)
      {
      return -1;
      }
      size_t msgLen = sizeof(FellowMsg) - sizeof(long int);
      if (-1 == msgrcv(msg_q_id, (void *)pFellowMsg, msgLen, 0, 0))
      {
        printf("msgrcv fail, errno:%d", errno);
        return -1;
      }
      return 0;
    }

    将msg.c编译成libmsg.so,gcc --share -I. msg.c -o libmsg.so

    发送控制消息,并监听反馈消息的进程msgsnd.c. (gcc msgsnd.c -o msgsnd -L. -lmsg -lpthread)

    #include <sys/prctl.h>
    #include <string.h>
    #include <stdio.h>
    #include "msg.h"
    #define MSG_CREAT_PATH "/mnt/hgfs/share/test/list"
    #define MSG_RCV_ID 4//和msgrcv.c正好相反
    #define MSG_SND_ID 3

    void *fellow_listenning_msg(void *arg)//监听反馈信息
    {
      if(0 != prctl(PR_SET_NAME, (unsigned long)"fellow_process_msg"))
      {
        printf("prctl fail, errno:%d", errno);
      }
      int msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_RCV_ID);
      FellowMsg _fellowMsg;
      while (1)
      {
        memset(&_fellowMsg, 0, sizeof(FellowMsg));
        fellow_rcv_msg(msg_q_id, &_fellowMsg);
        if (CTRL_FEEDBACK ==_fellowMsg._msgType)
        {
          switch (_fellowMsg._ctlInfo.u._ctlFeedback)
          {
            case OPEN_DONE:
              printf("rcv OPEN_DONE:%d ", _fellowMsg._ctlInfo.param);
            break;
            case CLOSE:
              printf("rcv CLOSE_DONE:%d ", _fellowMsg._ctlInfo.param);
            break;
            case PLAY:
              printf("rcv PLAY_DONE:%d ", _fellowMsg._ctlInfo.param);
            break;
            default:
            break;
          }
        }
      }
    }
    void main(void)
    {
      pthread_t thread_id;
      int snd_msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_SND_ID);
      printf("msgid:%d ",snd_msg_q_id);
      FellowMsg _fellowMsg;
      _fellowMsg._msgType = CTRL_CMD;
      _fellowMsg._ctlInfo.u._ctlCmd = OPEN;
      _fellowMsg._ctlInfo.param = 1;
      fellow_send_msg(snd_msg_q_id, &_fellowMsg);//发送OPEN命令
      pthread_create(&thread_id, NULL, fellow_listenning_msg, NULL);
      while (1)
      {
      }
    }

    接收控制消息,做完相关处理后,返回反馈消息。msgrcv.c(gcc msgsnd.c -o msgsnd -L. -lmsg -lpthread)

    #include <sys/prctl.h>
    #include <string.h>
    #include <stdio.h>
    #include "msg.h"
    #define MSG_CREAT_PATH "/mnt/hgfs/share/test/list"
    #define MSG_RCV_ID 3
    #define MSG_SND_ID 4
    void *fellow_process_msg(void *arg)
    {
      printf("thread start ");
      int msg_q_id = *(int *)arg;
      int snd_msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_SND_ID);
      if(0 != prctl(PR_SET_NAME, (unsigned long)"fellow_process_msg"))
      {
        printf("prctl fail, errno:%d", errno);
      }
      FellowMsg _fellowRcvMsg;
      FellowMsg _fellowSndMsg;
      while (1)
      {
        memset(&_fellowRcvMsg, 0, sizeof(FellowMsg));
        if( -1 == fellow_rcv_msg(msg_q_id, &_fellowRcvMsg))
          break;
        if (CTRL_CMD ==_fellowRcvMsg._msgType)
        {
          switch (_fellowRcvMsg._ctlInfo.u._ctlCmd)
          {
            case OPEN:
            printf("rcv OPEN:%d ", _fellowRcvMsg._ctlInfo.param);

                        /*do something when rcv open*/
            _fellowSndMsg._msgType = CTRL_FEEDBACK;
            _fellowSndMsg._ctlInfo.u._ctlFeedback = OPEN_DONE;
            _fellowSndMsg._ctlInfo.param = 0;
            fellow_send_msg(snd_msg_q_id, &_fellowSndMsg);
            break;
            case CLOSE:
              printf("rcv CLOSE:%d ", _fellowRcvMsg._ctlInfo.param);
            break;
            case PLAY:
              printf("rcv PLAY:%d ", _fellowRcvMsg._ctlInfo.param);
            break;
            default:
            break;

          }
        }

      }
    }
    void main(void)
    {
      pthread_t thread_id;
      int msg_q_id = fellow_create_msg_queue(MSG_CREAT_PATH, MSG_RCV_ID);
      printf("msgid:%d ", msg_q_id);
      pthread_create(&thread_id, NULL, fellow_process_msg, (void *)&msg_q_id);
      while (1)
      {
        /*do some thing in main thread*/
      }
    }

    首先运行msgrcv进程,监听控制消息,然后运行msgsnd进程,发送控制消息,然后接收msgrcv的反馈。

    运行时如果不将libmsg.so的路径加到LD_LIBRARY_PATH,执行时会报错,找不到libmsg.so.(export LD_LIBRARY_PATH=/libmsg.so路径:$LD_LIBRARY_PATH)

    结果如下:

    进程msgrcv,

    进程msgsnd,

  • 相关阅读:
    pyenv
    [20200724NOIP提高组模拟T3]终章-剑之魂
    [20200724NOIP提高组模拟T2]圣章-精灵使的魔法语
    [20200724NOIP提高组模拟T1]序章-弗兰德的秘密
    [20200723NOIP提高组模拟T1]同余
    [SDOI2008]仪仗队
    P3195 [HNOI2008]玩具装箱
    [20200722NOIP提高组模拟T4]词韵
    [20200721NOIP提高组模拟T3]最小代价
    [20200721NOIP提高组模拟T1]矩阵
  • 原文地址:https://www.cnblogs.com/fellow1988/p/6146949.html
Copyright © 2011-2022 走看看