zoukankan      html  css  js  c++  java
  • 6.基于ZMQ的游戏网络层基础架构

    对于内网服务器的通信采用zmq来进行,对于和客户端的通信采用boost的asio来。这里先来搭建zmq的基础结构。

    zmq相关的知识可以去zmq官方网站查询。

    这里使用zmq的push 和pull来进行通信。

    先放一张结构图:


    其中PushZmq是推管道, PullZmq是拉管道:

    对于Push的流程是:

    zmq_init()----> zmq_socket()---->zmq_connect()---->zmq_init_size()----->zmq_init_data--->zmq_send()--->zmq_msg_close()--->zmq_close()---->zma_term()

    具体见代码:

    PushZmq.h

    #ifndef __PUSH_ZMQ_H__
    #define __PUSH_ZMQ_H__
    
    #include <zmq.h>
    #include <string.h>
    #include <iostream>
    #include <glog/logging.h>
    
    using namespace std;
    
    class PushZmq
    {
    public:
        PushZmq(const char* url, void* zmqContext = NULL);
        ~PushZmq();
    
        size_t Send(const char* buffer, size_t length);
    private:
        string _strUrl;
        void* _ctx;
        void* _socket;
    };
    
    #endif
    


    PushZmq.cpp

    #include "pushZmq.h"
    #include "proto/hello.pb.h"
    
    using namespace hello;
    
    int main()
    {
        FLAGS_minloglevel = google::INFO;
        google::InitGoogleLogging("");
        google::SetLogDestination(google::INFO, "../");
        google::SetLogFilenameExtension("log_");
        google::LogToStderr();
    
        string url = "tcp://127.0.0.1:5555";
        PushZmq* push = new PushZmq(url.c_str());
        //string sendContent = "Hello Pull.I am From Push!";
        PbMsgHello helloMsg;
        helloMsg.set_helloint(123456);
        helloMsg.set_hellostring("ni hao wang peng ");
    
        int length = helloMsg.ByteSize();
        char* buffer = (char*)malloc(length);
        
        helloMsg.SerializeToArray(buffer, length);
        push->Send(buffer, length);
        free(buffer);
        return 0;
    }
    
    PushZmq::PushZmq( const char* url, void* zmqContext /*= NULL*/ )
    :_strUrl(url)
    ,_ctx(zmqContext)
    {
        if(!_ctx)
        {
            _ctx = zmq_init(1);
        }
    
        _socket = zmq_socket(_ctx, ZMQ_PUSH);
        if(!_socket)
        {
            cout << "Error int zmq_socket:" << zmq_strerror(errno) << endl;
            return;
        }
    
        int rc = zmq_connect(_socket, _strUrl.c_str());
        if(rc != 0 )
        {
            cout << "error in zmq_connect:" << zmq_strerror(errno) << endl;
            return;
        }
    }
    
    PushZmq::~PushZmq()
    {
        zmq_close(_socket);
        zmq_term(_ctx);
    }
    
    size_t PushZmq::Send( const char* buffer, size_t length )
    {
        zmq_msg_t msg;
        int rc = zmq_msg_init_size(&msg, length);
        memcpy((char*)zmq_msg_data(&msg), buffer, length);
    
        rc = zmq_send(_socket, &msg, ZMQ_NOBLOCK);
        if(rc < 0)
        {
            cout << "error in zmq_send:" << zmq_strerror(errno) << endl;
            zmq_msg_close(&msg);
            return -1;
        }
        zmq_msg_close(&msg);
    
        LOG(INFO) << "Send Hello success: rc=" << rc;  
        return rc;
    }


    对于Pull的流程是:

    zmq_init()--->zmq_socket()--->zmq_bind()--->zmq_poll--->zmq_msg_init()---->zmq_recv()--->zmq_msg_data()--->zmq_msg_size()-------调用具体处理函数--->zmq_close-->zmq_msg_close--->zmq_close()--->zmq_term

    PullZmq.h

    #ifndef __PULL_ZMQ_H__
    #define __PULL_ZMQ_H__
    
    #include <zmq.h>
    #include <iostream>
    #include <string.h>
    #include <glog/logging.h>
    #include <boost/bind.hpp>
    #include <boost/function.hpp>
    using namespace std;
    
    class PullZmq
    {
    public:
        typedef boost::function<bool(const char*, size_t)> TypeOnMessage;
    
        PullZmq(const char* url, TypeOnMessage onPipeMessage, void* zmqContext=NULL);
        ~PullZmq();
    
        void Run();
    private:
        void* _ctx;
        string _strUrl;
        void* _socket;
        TypeOnMessage _onMessage;
    };
    #endif


    PullZmq.Cpp:

    #include "PullZmq.h"
    #include "proto/hello.pb.h"
    using namespace hello;
    
    bool TestOnMessage( const char* buffer, size_t length );
    int main()
    {
        FLAGS_minloglevel = google::INFO;
        google::InitGoogleLogging("");
        google::SetLogDestination(google::INFO, "../");
        google::SetLogFilenameExtension("log_");
        google::LogToStderr();
    
        string url = "tcp://*:5555";
        PullZmq* pull = new PullZmq(url.c_str(),
            boost::bind(TestOnMessage, _1, _2));
    
        pull->Run();
    
        return 0;
    }
    
    PullZmq::PullZmq( const char* url, TypeOnMessage onPipeMessage, void* zmqContext )
    :_strUrl(url)
    ,_onMessage(onPipeMessage)
    , _ctx(zmqContext)
    {
        if(!_ctx)
        {
            _ctx = zmq_init(1);
            if(!_ctx)
            {
                cout << "error in zmq_init:" << zmq_strerror(errno) << endl;
                return;
            }
        }
    
        _socket = zmq_socket(_ctx, ZMQ_PULL);
        if (!_socket)
        {
            LOG(ERROR) << "Error in zmq_socket:" << zmq_strerror(errno);
            return;
        }
    
        int rc = zmq_bind(_socket, url);
        if(rc != 0)
        {
            LOG(ERROR) << "error in zmq_bind:" << zmq_strerror(errno);
            return;
        }
    }
    
    PullZmq::~PullZmq()
    {
        int rc = zmq_close(_socket);
        if(rc != 0)
        {
            LOG(ERROR) << "error in zmq_close:" << zmq_strerror(errno);
        }
        rc = zmq_term(_ctx);
        if(rc !=0 )
        {
            LOG(ERROR) << "error in zmq_term:" << zmq_strerror(errno);
        }
    }
    
    void PullZmq::Run()
    {
        zmq_pollitem_t item;
        item.socket = _socket;
        item.events = ZMQ_POLLIN;
    
        long pollWaitTime = 1000;
        bool bLoop = true;
    
        while(bLoop)
        {
            int rc = zmq_poll(&item, 1, -1);
            if(rc < 0)
            {
                LOG(ERROR) << "error in zmq_poll:" << zmq_strerror(errno);
            }else if(rc ==0)
            {
                //LOG(ERROR) << "On Idle!";
            }else 
            {
                int msgCount = rc;
                while(msgCount--)
                {
                    zmq_msg_t msg;
                    rc = zmq_msg_init(&msg);
                    if (rc !=0 )
                    {
                        LOG(ERROR) << "error in zmq_msg_init:" << zmq_strerror(errno);
                        return;
                    }
    
                    rc = zmq_recv(_socket, &msg, 0);
                    if(rc != 0)
                    {
                        LOG(ERROR) << "error in zmq_recv:" << zmq_strerror(errno);
                        zmq_msg_close(&msg);
                        continue;
                    }
    
                    void* buffer = zmq_msg_data(&msg);
                    size_t len = zmq_msg_size(&msg);
                    bLoop = _onMessage((const char*)buffer, len);
                    zmq_msg_close(&msg);
                }
            }
        }
    
    }
    
    bool TestOnMessage( const char* buffer, size_t length )
    {
        LOG(INFO) << "TestOnMessage:";
        
        PbMsgHello helloMsg;
        helloMsg.ParseFromArray(buffer, length);
        LOG(INFO) << " helloInt = " << helloMsg.helloint()
            << " helloString = " << helloMsg.hellostring();
    
        //string content;
        //content.append(buffer);
        //LOG(INFO) << "buffer = " << content << " length = " << length;
    
        return true;
    }


    对应Makefile为:

    all:	pull push
    
    hello.o:
    	g++ -c -o hello.o proto/hello.pb.cc	
    
    pull:	hello.o
    	g++ -o pullZmq hello.o PullZmq.cpp -lzmq -lglog -lboost_filesystem -lprotobuf
    
    push:	hello.o
    	g++ -o pushZmq hello.o PushZmq.cpp -lzmq -lglog -lboost_filesystem -lprotobuf
    
    
    clean:
    	rm -rf *.o
    	rm -rf pullZmq
    	rm -rf pushZmq
    


    对于上文的cpp中,开启了Protobuffer的 因此需要导入protobuffer的支持,对应proto文件

    hello.proto为:

    package hello;
    message PbMsgHello
    {
    	required string helloString = 1;
        required int32 helloInt =2;
    }
    


    运行以上cpp  可以实现 在push端包装一个Protobuffer的Message 在序列化之后Push到Pull端, Pull端接受到消息后进行解析 并读Message中的内容。

    结果如下:

    pull端:


    Push端:



    可见在Push端组装的 int 和string 在pull端成功解析。


    下一步应该进行Message的包装,以及ProtoBuffer的反射解析。即根据类型来自动生成解析所需的Message类型。


    1-6章节对于源码下载:http://download.csdn.net/detail/jcracker/6267125

  • 相关阅读:
    HDU 4081 Qin Shi Huang's National Road System
    POJ 2075 Tangled in Cables 最小生成树
    HDU 2487 Ugly window
    UVA 11426 GCD Extrme (Ⅲ)
    POJ_1220_Nmber Sequence
    Fibonacci数列对任何数取模都是一个周期数列
    POJ_3321_APPLE_TREE
    webpack配置---设置快捷打包和浏览器自动刷新
    sublime中css输入分号后自动提示的烦恼
    MongoDB的基本使用
  • 原文地址:https://www.cnblogs.com/suncoolcat/p/3323112.html
Copyright © 2011-2022 走看看