zoukankan      html  css  js  c++  java
  • ZMQ和MessagePack的简单使用

      近段日子在做一个比较复杂的项目,其中用到了开源软件ZMQ和MessagePack。ZMQ对底层网络通信进行了封装,是一个消息处理队列库,使用起来非常方便。MessagePack是一个基于二进制的对象序列化类库,具有跨语言的特性,同样非常容易使用。在我做的项目中,消息类通过MessagePack进行压包,然后写入ZMQ的消息结构体,通过ZMQ传递,最后接收者利用MessagePack进行解包,从而分析命令。由于我英语水平实在不高,所以我并没有通过阅读它们的说明文档来对它们进行了解,而仅仅是通过它们的示例代码进行探索。虽然因此遇到了一些不解问题,但这种方式却为我节省了很多时间。不过,对于英语好的人,还是应该通过阅读说明文档来去了解它们。

      为了说明如何使用它们,在这里构造一个使用场景:有N个Client,一个Server,M个Agent,Client使用ZMQ的请求-响应模式和Server通信,Server收到Client的命令后,通过ZMQ的发布-订阅模式与各个Agent进行通信。下面的代码封装并使用了ZMQ和MessagePack,为了简便,我把类的定义和实现都写在了头文件。

      1.对ZMQ的简单封装:

      1 #include"Msgpack.h"
      2 #include<zmq.h>
      3 #include<string>
      4 #include<cassert>
      5 #include<iostream>
      6 
      7 namespace Tool
      8 {
      9     //网络工具类
     10     class Network
     11     {
     12     public:
     13 
     14         // 功能 :构造函数。
     15         // 参数 :无。
     16         // 返回 :无。
     17         Network() : m_socket(NULL) { }
     18 
     19         // 功能 :初始化socket。
     20         // 参数 :zmqType表示ZMQ的模式,address表示socket绑定或连接地址。
     21         // 返回 :true表示初始化成功,false表示失败。
     22         bool Init(int zmqType,const std::string& address)
     23         {
     24             try
     25             {
     26                 m_socket = zmq_socket(Context,zmqType);
     27                 return SetSocket(zmqType,address);
     28             }
     29             catch(...)
     30             {
     31                 std::cout << "Network初始化失败。" << std::endl;
     32                 return false;
     33             }
     34         }
     35 
     36         // 功能 :发送消息。
     37         // 参数 :指向Msgpack的指针,isRelease如果为true表示发送消息后即刻释放资源。
     38         // 返回 :true表示发送成功,false表示发送失败。
     39         bool SendMessage(Msgpack *msgpack,bool isRelease = true) const
     40         {
     41             try
     42             {
     43                 zmq_msg_t msg;
     44                 zmq_msg_init(&msg);
     45                 if(isRelease)
     46                 {
     47                     zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),Tool::Network::Release,msgpack);
     48                 }
     49                 else
     50                 {
     51                     zmq_msg_init_data(&msg,msgpack->GetSbuf().data(),msgpack->GetSbuf().size(),0,0);
     52                 }
     53                 zmq_msg_send(&msg,m_socket,0);
     54                 return true;
     55             }
     56             catch(...)
     57             {
     58                 std::cout << "Network发送失败。" << std::endl;
     59                 return false;
     60             }
     61         }
     62 
     63         // 功能 :接收消息。
     64         // 参数 :无。
     65         // 返回 :指向消息的指针。
     66         zmq_msg_t* ReceiveMessage() const
     67         {
     68             zmq_msg_t *reply = NULL;
     69             try
     70             {
     71                 reply = new zmq_msg_t();
     72                 zmq_msg_init(reply);
     73                 zmq_msg_recv(reply,m_socket,0);
     74                 return reply;
     75             }
     76             catch(...)
     77             {
     78                 if( reply != NULL )
     79                 {
     80                     delete reply;
     81                 }
     82                 return NULL;
     83             }
     84         }
     85 
     86         // 功能 :关闭消息。
     87         // 参数 :指向消息的指针。
     88         // 返回 :无。
     89         void CloseMsg(zmq_msg_t* msg)
     90         {
     91             try
     92             {
     93                 zmq_msg_close(msg);
     94                 msg = NULL;
     95             }
     96             catch(...)
     97             {
     98                 msg = NULL;
     99             }
    100         }
    101 
    102         // 功能 :析构函数。
    103         // 参数 :无。
    104         // 返回 :无。
    105         ~Network()
    106         {
    107             if( m_socket != NULL )
    108             {
    109                 zmq_close(m_socket);
    110                 m_socket = NULL;
    111             }
    112         }
    113 
    114     private:
    115 
    116         //通信socket
    117         void *m_socket;
    118 
    119         //网络环境
    120         static void *Context;
    121 
    122     private:
    123 
    124         // 功能 :设置socket。
    125         // 参数 :zmqType表示ZMQ的模式,address表示socket绑定或连接地址。
    126         // 返回 :true表示设置成功,false表示设置失败。
    127         bool SetSocket(int zmqType,const std::string& address)
    128         {
    129             int result = -1;
    130             switch(zmqType)
    131             {
    132             case ZMQ_REP:
    133             case ZMQ_PUB:
    134                 result = zmq_bind(m_socket,address.c_str());
    135                 break;
    136             case ZMQ_REQ:
    137                 result = zmq_connect(m_socket,address.c_str());
    138                 break;
    139             case ZMQ_SUB:
    140                 result = zmq_connect(m_socket,address.c_str());
    141                 assert(result == 0);
    142                 result = zmq_setsockopt(m_socket,ZMQ_SUBSCRIBE,"",0);
    143                 break;
    144             default:
    145                 return false;
    146             }
    147             assert( result == 0 );
    148             return true;
    149         }
    150 
    151         // 功能 :发送完消息后,释放消息资源。
    152         // 参数 :function为函数地址,hint指向要释放资源的对象。
    153         // 返回 :无。
    154         static void Release(void *function, void *hint)
    155         {
    156             Msgpack *msgpack = (Msgpack*)hint;
    157             if( msgpack != NULL )
    158             {
    159                 delete msgpack;
    160                 msgpack = NULL;
    161             }
    162         }
    163     };
    164 
    165     //整个程序共用一个context
    166     void *Tool::Network::Context = zmq_ctx_new();
    167 };

      说明:

      (1)由zmq_ctx_new创建出来的Context,整个应用程序共用一个就可以了,具体的通信是由zmq_socket创建的socket来完成的。上述代码中没有去释放Context指向的资源。

      (2)在zmq_msg_init_data函数的参数中,需要传入一个释放资源的函数地址,在ZMQ发送完消息后就调用这个函数来释放资源。如果没有传入这个参数,而且传入的信息是临时变量,那么接收方很有可能接收不到信息,甚至抛出异常。如果不传入这个参数,那么就要记得由自己去释放资源了。

      2.对MessagePack的简单封装:

      1 #include"BaseMessage.h"
      2 #include"ClientMessage.h"
      3 #include"ServerMessage.h"
      4 #include<zmq.h>
      5 #include<msgpack.hpp>
      6 
      7 namespace Tool
      8 {
      9     using namespace Message;
     10 
     11     //压包/解包工具类
     12     class Msgpack
     13     {
     14     public:
     15 
     16         // 功能 :构造函数。
     17         // 参数 :无。
     18         // 返回 :无。
     19         Msgpack(void) { }
     20 
     21         // 功能 :析构函数。
     22         // 参数 :无。
     23         // 返回 :无。
     24         ~Msgpack(void) { }
     25 
     26         // 功能 :压包数据。
     27         // 参数 :要压包的数据。
     28         // 返回 :true表示压包成功。
     29         template<typename T>
     30         bool Pack(const T& t)
     31         {
     32             try
     33             {
     34                 Release();
     35                 msgpack::pack(m_sbuf,t);
     36                 return true;
     37             }
     38             catch(...)
     39             {
     40                 std::cout << "Msgpack压包数据失败。" << std::endl;
     41                 return false;
     42             }
     43         }
     44 
     45         // 功能 :解包数据。
     46         // 参数 :zmq消息体。
     47         // 返回 :返回指向基类消息的指针。
     48         BaseMessage* Unpack(zmq_msg_t& msg)
     49         {
     50             try
     51             {
     52                 int size = zmq_msg_size(&msg);
     53                 if( size > 0 )
     54                 {
     55                     Release();
     56                     m_sbuf.write((char*)zmq_msg_data(&msg),size);
     57                     size_t offset = 0;
     58                     msgpack::zone z;
     59                     msgpack::object obj;
     60                     msgpack::unpack(m_sbuf.data(),m_sbuf.size(),&offset,&z,&obj);
     61                     return GetMessage(obj);
     62                 }
     63             }
     64             catch(...)
     65             {
     66                 //吃掉异常
     67             }
     68             return NULL;
     69         }
     70 
     71         // 功能 :获取压包/解包工具。
     72         // 参数 :无。
     73         // 返回 :压包/解包工具。
     74         inline msgpack::sbuffer& GetSbuf()
     75         {
     76             return m_sbuf;
     77         }
     78 
     79     private:
     80 
     81         //压包/解包工具
     82         msgpack::sbuffer m_sbuf;
     83 
     84     private:
     85 
     86         // 功能 :释放上一次的数据资源。
     87         // 参数 :无。
     88         // 返回 :无。
     89         void Release()
     90         {
     91             m_sbuf.clear();
     92             m_sbuf.release();
     93         }
     94         
     95         // 功能 :获取消息。
     96         // 参数 :用于转换的msgpack::object。
     97         // 返回 :指向消息基类的指针。
     98         BaseMessage* GetMessage(const msgpack::object& obj)
     99         {
    100             BaseMessage bmessage;
    101             obj.convert(&bmessage);
    102             switch(bmessage.Type)
    103             {
    104             case 1024:
    105                 return Convert<ClientMessage>(obj);
    106             case 2048:
    107                 return Convert<ServerMessage>(obj);
    108             default:
    109                 return NULL;
    110             }
    111         }
    112 
    113         // 功能 :将压包后的数据转换为具体的类。
    114         // 参数 :用于转换的msgpack::object。
    115         // 返回 :指向T的指针。
    116         template<typename T>
    117         T* Convert(const msgpack::object& obj)
    118         {
    119             T *t = new T();
    120             obj.convert(t);
    121             return t;
    122         }
    123     };
    124 };

      说明:

      压包时将zmq_msg_t消息体压包到msgpack::sbuffer,然后就可以关闭这个消息体了。要将解包后的数据转换成具体的某一个类,需要知道这个类是什么类,这里有三种方法:

      (1)可以先发送一个消息告知接收者即将收到什么消息,然后接收者将消息解包后转换成对应的类。这种方式需要额外的一次通信,不建议使用。

      (2)所有的消息都继承自一个基类,这个基类存储有消息类型的字段。解包后,先将数据转换为基类,然后根据类型再转换为具体的派生类。这种方式需要多转换一次,上面的代码也正是采用这种方式。

      (3)压包时先压包一个消息类,然后再压包一个标识这个消息是什么类型的标识类,即压包两次。解包时,先解包标识类,得知消息类的具体类型,然后再解包消息类,即解包两次,转换两次。与(2)相比,除了要做更多的压包、解包工作外,这里还需要对解包的偏移量进行计算,否则容易出错。

      3.使用到的消息类:

    namespace Message
    {
        //消息基类
        class  BaseMessage
        {
        public:
    
            MSGPACK_DEFINE(Type);
    
            //消息类型
            int Type;
    
            //默认构造函数
            BaseMessage()
            {
                Type = 0;
            }
        };
    
        //来自客户端的消息
        class ClientMessage : public BaseMessage
        {
        public:
    
            MSGPACK_DEFINE(Type,Information);
    
            //信息
            std::string Information;
    
            //默认构造函数
            ClientMessage()
            {
                Type = 1024;
            }
        };
    
        //来自服务端的消息
        class ServerMessage : public BaseMessage
        {
        public:
    
            MSGPACK_DEFINE(Type,Information);
    
            //信息
            std::vector<std::string> Information;
    
            //默认构造函数
            ServerMessage()
            {
                Type = 2048;
            }
        };
    }; 

      说明:

      (1)MSPACK_DEFINE标识了一个类的哪些成员可以进行压包/解包。派生类中的MSGPACK_DEFINE还需要写上基类的成员,否则无法使用对MessagePack封装说明的第二个方法。

      (2)C++版本的MessagePack压/解包的数据成员,只能是一个类、结构或者联合体,不能使用指针(包括boost库的智能指针)、数组,枚举值也不适用。因此,BaseMessage使用int值来标识派生类属于哪个类型。C#版本的MessagePack可以对枚举值进行压包。

      4.Client的示例代码:

     1 int _tmain(int argc, _TCHAR* argv[])
     2 {
     3     Network network;
     4     bool result = network.Init(ZMQ_REQ,"tcp://192.168.10.179:8888");
     5     if(result)
     6     {
     7         ClientMessage cmessage;
     8         cmessage.Information = "I come form Client.";
     9 
    10         Msgpack msgpack;
    11         result = msgpack.Pack<ClientMessage>(cmessage);
    12         if(result)
    13         {
    14             result = network.SendMessageW(&msgpack,false);
    15             if(result)
    16             {
    17                 zmq_msg_t *msg = network.ReceiveMessage();
    18                 if( msg != NULL )
    19                 {
    20                     BaseMessage *bmessage = msgpack.Unpack(*msg);
    21                     network.CloseMsg(msg);
    22                     if( bmessage != NULL && bmessage->Type == 2048 )
    23                     {
    24                         ServerMessage *smessage = static_cast<ServerMessage*>(bmessage);
    25                         if( smessage != NULL && smessage->Information.size() > 0 )
    26                         {
    27                             std::cout << smessage->Information[0] << std::endl;
    28                         }
    29                         delete smessage;
    30                         smessage = NULL;
    31                         bmessage = NULL;
    32                     }
    33                 }
    34             }
    35         }
    36     }
    37 
    38     system("pause");
    39     return 0;
    40 }

      5.Server的示例代码:

     1 int _tmain(int argc, _TCHAR* argv[])
     2 {
     3     Network responder;
     4     bool result = responder.Init(ZMQ_REP,"tcp://192.168.10.179:8888");
     5     if(result)
     6     {
     7         Network publisher;
     8         result = publisher.Init(ZMQ_PUB,"tcp://192.168.10.179:9999");
     9         if(result)
    10         {
    11             Msgpack msgpack;
    12             while(true)
    13             {
    14                 zmq_msg_t *msg = responder.ReceiveMessage();
    15                 BaseMessage *bmessage = msgpack.Unpack(*msg);
    16                 responder.CloseMsg(msg);
    17 
    18                 ServerMessage smessage;
    19                 smessage.Information.push_back("I come from Server.");
    20                 msgpack.Pack<ServerMessage>(smessage);
    21                 result = responder.SendMessageW(&msgpack,false);
    22 
    23                 if( result )
    24                 {
    25                     if( bmessage != NULL && bmessage->Type == 1024 )
    26                     {
    27                         ClientMessage *cmessage = static_cast<ClientMessage*>(bmessage);
    28                         if( cmessage != NULL )
    29                         {
    30                             std::cout << cmessage->Information << std::endl;
    31                             for( int counter = 0 ; counter < 100 ; counter++ )
    32                             {
    33                                 publisher.SendMessageW(&msgpack,false);
    34                             }
    35                         }
    36                         delete cmessage;
    37                         cmessage = NULL;
    38                         bmessage = NULL;
    39                     }
    40                 }
    41             }
    42         }
    43     }
    44 
    45     return 0;
    46 }

      6.Agent的示例代码:

    int _tmain(int argc, _TCHAR* argv[])
    {
        Network network;
        bool result = network.Init(ZMQ_SUB,"tcp://192.168.10.179:9999");
        if(result)
        {
            zmq_msg_t *msg = network.ReceiveMessage();
            if( msg != NULL )
            {
                Msgpack msgpack;
                BaseMessage *bmessage = msgpack.Unpack(*msg);
                network.CloseMsg(msg);
                if( bmessage->Type == 2048 )
                {
                    ServerMessage *smessage = static_cast<ServerMessage*>(bmessage);
                    if( smessage->Information.size() > 0 )
                    {
                        std::cout << smessage->Information[0] << std::endl;
                    }
                    delete smessage;
                    smessage = NULL;
                    bmessage = NULL;
                }
            }
        }
    
        system("pause");
        return 0;
    }

      7.启动这三个程序,Client将要发送的消息压包后发给Server,Server接收到消息后反馈一个信息给Client,然后循环发布消息给Agent,Agent不需要回复Server。最后着重说明两点:

      (1)ZMQ创建的socket发送数据和接收数据要处在同一条线程。Server接收到Client的数据后,不能通过开一条线程来给Client反馈信息,必须要在接收数据的线程中反馈信息。

      (2)ZMQ并不要求发送者和接收者有一定的启动顺序,但在Server中如果只发布一次消息,那么Agent很有可能收不到信息。不管是Agent先启动,还是Server先启动,Agent都有可能收不到信息。在Server的代码中,通过循环发布一百次,来让Agent收到信息。至于实际应用中,可以结合请求-响应模式来保证订阅消息者都收到了发布者的消息。

     参考资料:

    ZMQ:http://zguide.zeromq.org/page:all

    MessagePack:http://wiki.msgpack.org/pages/viewpage.action?pageId=1081387#QuickStartforC%2B%2B-ImplementationStatus

  • 相关阅读:
    将base64格式的字符串生成文件
    断点继传
    判断文件后缀
    递归遍历指定目录,获取该目录下最大的文件信息
    web项目部署后动态编译无法找到依赖的jar包
    Java程序动态编译Java源文件
    JS中substr与substring的区别
    java.lang.NoClassDefFoundError: com/sun/tools/javac/processing/JavacProcessingEnvironment
    python 基础学习笔记(4)--字典 和 集合
    python 基础学习笔记(3)--列表与元组
  • 原文地址:https://www.cnblogs.com/zwq1011/p/3050450.html
Copyright © 2011-2022 走看看