zoukankan      html  css  js  c++  java
  • 使用asio搭建服务器

    1. 背景介绍
    1.1 什么是asio
    
    使用了linux、asio、protobuf等大量开源项目,开发过程共也借鉴了其他一些开源项目。
    
    asio由Christopher M. Kohlhoff大牛从2003年着手开发,2006年申请加入C++ tr1,2008年3月份加入boost1.35.0,按照boost与C++标准库的发展惯例,预测很快会加入C++标准库中。其中的async调用方式已经作为非常重要的新特性,加入到C++0x标准库。
    
    1.2 asio的相关资料
    
    asio官方提供了及其详细的文档、例子、教程,没有必要再累赘地将其转述一遍。如果有朋友对英文有些吃力,网上也早有很多翻译版。这里提供一些官方的文档资料:
    
    非boost版本的asio —— http://think-async.com/
    
    与boost::asio的主要区别就是名字空间是boost::asio还是asio。
    boost::asio —— http://www.boost.org/doc/libs/1520/doc/html/boost_asio.html
    申请加入tr2时的申报材料 —— 猛击此处下载高清pdf
    proactor模式的首篇论文 —— 猛击此处下载高清pdf
    使用asio的大型开源网络项目sip —— https://svn.resiprocate.org/rep/resiprocate/main
    2. 源码参考
    由于代码使用了一点其他的工具,所以并没有想让读者能够编译通过。但是对从头开始搭建服务器的朋友来说,一定是一份非常有价值的参考。
    
    2.1 作为Client的模块
    
    这部分供作为Client去连接其他服务器时使用。给出的源码中有三个类:TcpConnection, BizConnection, Client. 其中
    
    TcpConnection 提供了与协议无关的tcp连接,异步操作的结果以虚函数方式供派生类使用
    BizConnection 继承自TcpConnection,使用具体的协议解析报文
    Client使用BizConnection,并提供了等待具体某条消息的wait_for、心跳、延迟等功能
    2.1.1 tcpconnection.h
    
    #ifndef TCPCONNECTION_H
    #define TCPCONNECTION_H
    
    /**
     * @author yurunsun@gmail.com
     */
    
    #include <asio.hpp>
    #include <asio/deadline_timer.hpp>
    #include <boost/shared_ptr.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/timer.hpp>
    #include <sstream>
    #include "safehandler.h"
    
    class TcpConnection
            : public boost::enable_shared_from_this<TcpConnection>
            , private boost::noncopyable
    {
    public:
        typedef std::vector<uint8_t> DataBuffer;
        typedef boost::shared_ptr<TcpConnection> TcpPtr;
        static TcpPtr create(asio::io_service& io_service, const string& name){return TcpPtr(new TcpConnection(io_service, name));}
        virtual ~TcpConnection();
        void start(const string& ip, const string& port);
        void start(unsigned ip, uint16_t port);
        void start(const string& ip, uint16_t port);
        void stop();
        bool isConnected() {return m_socket.is_open();}
    
        /// Getters and Setters
        void setName(const string& name) {m_name = name;}
        const string& getName() {return m_name;}
        void setHeadLength(uint32_t size) {m_headLength = size;}
        uint32_t getHeadLength() {return m_headLength;}
        void setConnectTimeoutSec(uint32_t sec) {m_connectTimeoutSec = sec;}
        uint32_t getConnectTimeoutSec() {return m_connectTimeoutSec;}
        const string& getip() {return m_ip;}
        uint16_t getport() {return m_port;}
        string getFarpointInfo() { stringstream ss; ss << m_name << " " << m_ip << ":" << m_port << " "; return ss.str(); }
    
    protected:
        explicit TcpConnection(asio::io_service& io_service, const string& name);
    
        /// Provide for derived class
        void connect(asio::ip::tcp::endpoint endpoint);
        void receiveHead();
        void receiveBody(uint32_t bodyLength);
        void send(const void *data, uint32_t length);
    
        /// Class override callbacks
        virtual void onConnectSuccess() { assert(false); }
        virtual void onConnectFailure(const asio::error_code& e) { (void)e; assert(false); }
        virtual void onReceiveHeadSuccess(DataBuffer& data) { (void)data; assert(false); }
        virtual void onReceiveBodySuccess(DataBuffer& data) { (void)data; assert(false); }
        virtual void onReceiveFailure(const asio::error_code& e) { (void)e; assert(false); }
        virtual void onSendSuccess() { assert(false); }
        virtual void onSendFailure(const asio::error_code& e) { (void)e; assert(false); }
        virtual void onTimeoutFailure(const asio::error_code& e) { (void)e; assert(false); }
        virtual void onCommonError(uint32_t ec, const string& em) { (void)ec; (void)em; assert(false); }
    
    private:
        void checkDeadline(const asio::error_code& e);
        void handleConnect(const asio::error_code &e);
        void handleReceiveHead(const asio::error_code& e);
        void handleReceiveBody(const asio::error_code& e);
        void handleSend(const asio::error_code& e);
    
        typedef TcpConnection this_type;
        asio::ip::tcp::socket m_socket;
        asio::deadline_timer m_deadline;
        bool m_stopped;
        DataBuffer m_readBuf;
        string m_name;
        boost::shared_ptr<Probe> m_probe;
        uint32_t m_headLength;
        uint32_t m_connectTimeoutSec;
        string m_ip;
        uint16_t m_port;
    };
    
    #endif // TCPCONNECTION_H
    2.1.2 tcpconnection.cpp
    
    #include "stdafx.h"
    #include "tcpconnection.h"
    
    using asio::ip::tcp;
    
    TcpConnection::TcpConnection(asio::io_service &io_service, const string& name)
        : m_socket(io_service)
        , m_deadline(io_service)
        , m_stopped(false)
        , m_name(name)
        , m_probe(new Probe)
        , m_headLength(10)
        , m_connectTimeoutSec(5)
        , m_ip("")
        , m_port(0)
    {
    }
    
    TcpConnection::~TcpConnection()
    {
        stop();
    }
    
    void TcpConnection::start(const string &ip, const string &port)
    {
        start(ip, atoi(port.c_str()));
    }
    
    void TcpConnection::start(unsigned ip, uint16_t port)
    {
        connect(tcp::endpoint(asio::ip::address_v4(ip), port));
    }
    
    void TcpConnection::start(const string &ip, uint16_t port)
    {
        asio::ip::address_v4 addr_v4 = asio::ip::address_v4::from_string(ip);
        connect(tcp::endpoint(addr_v4, port));
    }
    
    void TcpConnection::stop()
    {
        if (!m_stopped) {
            m_stopped = true;
            try {
                m_readBuf.clear();
                asio::error_code ignored;
                m_socket.shutdown(tcp::socket::shutdown_both, ignored);
                m_socket.close(ignored);
                m_deadline.cancel();
            } catch (const asio::system_error& err) {
                FATAL("asio::system_error em %s", err.what());
            }
        }
    }
    
    void TcpConnection::connect(tcp::endpoint endpoint)
    {
        m_stopped = false;
        m_ip = endpoint.address().to_string();
        m_port = endpoint.port();
        INFO("Trying connect %s:%u ...%s", STR(m_ip), m_port, STR(m_name));
    
        m_deadline.expires_from_now(boost::posix_time::seconds(m_connectTimeoutSec));
        m_socket.async_connect(endpoint, boost::bind(&TcpConnection::handleConnect, shared_from_this(), asio::placeholders::error));
        m_deadline.async_wait(SafeHandler1<this_type, const asio::error_code&>(&this_type::checkDeadline, this, m_probe));
    }
    
    void TcpConnection::receiveHead()
    {
        m_readBuf.resize(m_headLength);
        asio::async_read(m_socket, asio::buffer(&m_readBuf[0], m_headLength),
                         boost::bind(&TcpConnection::handleReceiveHead, shared_from_this(), asio::placeholders::error));
    }
    
    void TcpConnection::receiveBody(uint32_t bodyLength)
    {
        if (!m_stopped){
            if ((bodyLength <= MAX_BUFFER_SIZE) && (bodyLength > 0)) {
                m_readBuf.resize(bodyLength + m_headLength);
                asio::async_read(m_socket, asio::buffer(&m_readBuf[m_headLength], bodyLength),
                                 boost::bind(&TcpConnection::handleReceiveBody, shared_from_this(), asio::placeholders::error));
            } else {
                onCommonError(S_FATAL, "illegal bodyLength to call receiveBody");
            }
        } else {
            onCommonError(S_ERROR, "illegal to call receiveBody while tcp is not connected");
        }
    }
    
    void TcpConnection::send(const void* data, uint32_t length)
    {
        if (!m_stopped) {
            if (length <= MAX_BUFFER_SIZE) {
                asio::async_write(m_socket, asio::const_buffers_1(data, length),
                                  boost::bind(&TcpConnection::handleSend, shared_from_this(), asio::placeholders::error));
            } else {
                onCommonError(S_ERROR, "too big length to call send");
            }
        } else {
            onCommonError(S_ERROR, "illegal to call send while tcp is not connected");
        }
    }
    
    /**
     * @brief TcpConnection::checkDeadline
     * @param e
     * case1: m_stopped == true which means user canceled
     * case2: m_deadline.expires_at() <= asio::deadline_timer::traits_type::now()
     *          Check whether the deadline has passed. We compare the deadline against
                the current time since a new asynchronous operation may have moved the
                deadline before this actor had a chance to run.
     */
    void TcpConnection::checkDeadline(const asio::error_code& e)
    {
        if (!m_stopped) {
            if (m_deadline.expires_at() <= asio::deadline_timer::traits_type::now()) {
                onTimeoutFailure(e);
            }
        }
    }
    
    void TcpConnection::handleConnect(const asio::error_code &e)
    {
        if (!m_stopped) {
            if (!e) {
                m_deadline.cancel();
                onConnectSuccess();
            } else {
                onConnectFailure(e);
            }
        } else {
            INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);
        }
    }
    
    void TcpConnection::handleReceiveHead(const asio::error_code &e)
    {
        if (!m_stopped) {
            if (!e) {
                onReceiveHeadSuccess(m_readBuf);
            } else if (isConnected()){
                onReceiveFailure(e);
            }
        } else {
            INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);
        }
    }
    
    void TcpConnection::handleReceiveBody(const asio::error_code &e)
    {
        if (!m_stopped) {
            if (!e) {
                onReceiveBodySuccess(m_readBuf);
            } else if (isConnected()){
                onReceiveFailure(e);
            }
        } else {
            INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);
        }
    }
    
    void TcpConnection::handleSend(const asio::error_code &e)
    {
        if (!m_stopped) {
            if (!e) {
                //onSendSuccess();
            } else if (isConnected()){
                onSendFailure(e);
            }
        } else {
            INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);
        }
    }
    2.1.3 bizconnection.h
    
    #ifndef BIZCONNECTION_H
    #define BIZCONNECTION_H
    
    /**
     * @author yurunsun@gmail.com
     */
    
    #include <asio.hpp>
    #include <cstdio>
    #include <stdexcept>
    #include "sigslot/sigslot.h"
    
    #include "tcpconnection.h"
    
    class BizConnection
            : public TcpConnection
    {
    public:
        typedef boost::shared_ptr<BizConnection> BizPtr;
        static BizPtr create(asio::io_service& io_service, const string& name = string(""))
        {
            return BizPtr(new BizConnection(io_service, name));
        }
        void sendBizMsg(uint32_t uri, const BizPackage& pkg);
    
        sigslot::signal0<> BizConnected;
        sigslot::signal2<uint32_t, const string&> BizError;
        sigslot::signal0<> BizClosed;
        sigslot::signal1<BizPackage&> BizMsgArrived;
    
    protected:
        explicit BizConnection(asio::io_service& io_service, const string& name);
    
        /// Implement callbacks in base class
        virtual void onConnectSuccess();
        virtual void onConnectFailure(const asio::error_code& e);
        virtual void onReceiveHeadSuccess(DataBuffer& data);
        virtual void onReceiveBodySuccess(DataBuffer& data);
        virtual void onReceiveFailure(const asio::error_code& e);
        virtual void onSendSuccess();
        virtual void onSendFailure(const asio::error_code& e);
        virtual void onTimeoutFailure(const asio::error_code &e);
        virtual void onCommonError(uint32_t ec, const string &em);
    
        static void initNeedErrorSet();
        static std::set<uint32_t> m_needError;
    
    private:
        inline bool peekLength(void* data, uint32_t length, uint32_t& outputi32);
        void handleError(const string& from, uint32_t ec, const asio::error_code& e = asio::error_code());
    };
    
    inline bool BizConnection::peekLength(void* data, uint32_t length, uint32_t& outputi32)
    {
        if (length >= 4) {
            memcpy(&outputi32, data, sizeof(uint32_t));
            return true;
        } else {
            return false;
        }
    }
    
    #endif // BIZCONNECTION_H
    2.1.4 bizconnection.cpp
    
    #include "stdafx.h"
    #include "bizconnection.h"
    #include "tcpconnection.h"
    
    std::set<uint32_t> BizConnection::m_needError;
    
    BizConnection::BizConnection(asio::io_service &io_service, const string& name)
        : TcpConnection(io_service, name)
    {
    }
    
    void BizConnection::sendBizMsg(unsigned uri, const BizPackage &pkg)
    {
        /// TODO: usually this BizPackage contains the buffer of stream data to be sent to farpoint
        /// You should implement this by retriving buffer in BizPackage then call TcpConnection::send();
    }
    
    void BizConnection::onConnectSuccess()
    {
        receiveHead();
        BizConnected.emit();
    }
    
    void BizConnection::onConnectFailure(const asio::error_code &e)
    {
        if (e == asio::error::operation_aborted) {
            INFO("%s %s %u operation aborted... %s", STR(getName()), STR(getip()), getport(), STR(e.message()));
        }
        else if ((e == asio::error::already_connected) || (e == asio::error::already_open) || (e == asio::error::already_started)) {
            WARN("%s %s %u alread connected... %s", STR(getName()), STR(getip()), getport(), STR(e.message()));
        } else {
            handleError("onConnectFailure", S_FATAL, e);
        }
    }
    
    void BizConnection::onReceiveHeadSuccess(TcpConnection::DataBuffer &data)
    {
        uint32_t pkglen = 0;
        if (peekLength(data.data(), data.size(), pkglen)) {
            receiveBody(pkglen - getHeadLength());
        } else {
            handleError("peekLength", S_FATAL);
        }
    }
    
    void BizConnection::onReceiveBodySuccess(TcpConnection::DataBuffer &data)
    {
        /// This is simply an example, actually it's user's duty to unmarshal buffer to package.
        BizPackage msg;
        BizPackage.unserializeFrom(data);
        BizMsgArrived.emit(msg);
        receiveHead();
    }
    
    void BizConnection::onReceiveFailure(const asio::error_code &e)
    {
        if ((e == asio::error::operation_aborted)) {
            INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));
        } else {
            handleError("onReceiveFailure", S_FATAL, e);
        }
    }
    
    void BizConnection::onSendSuccess()
    {
        /// Leave it blank
    }
    
    void BizConnection::onSendFailure(const asio::error_code &e)
    {
        if ((e == asio::error::operation_aborted)) {
            INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));
        } else {
            handleError("onSendFailure", S_FATAL, e);
        }
    }
    
    void BizConnection::onTimeoutFailure(const asio::error_code &e)
    {
        if ((e == asio::error::operation_aborted)) {
            INFO("%s operation_aborted... %s", STR(getFarpointInfo()), STR(e.message()));
        } else {
            handleError("onTimeoutFailure", S_FATAL);
        }
    }
    
    void BizConnection::onCommonError(uint32_t ec, const string &em)
    {
        handleError(em, ec);
    }
    
    void BizConnection::handleError(const string& from, uint32_t ec, const asio::error_code &e/* = asio::error_code()*/)
    {
        stringstream ss;
        ss << getFarpointInfo() << " " << from;
        if (e) {
            ss << " asio " << e.value() << " " << e.message();
        }
        BizError.emit(ec, ss.str());
    }
    2.1.5 client.h
    
    #ifndef CLIENT_H
    #define CLIENT_H
    
    /**
     * @author yurunsun@gmail.com
     */
    
    #include "bizconnection.h"
    #include "handler.h"
    #include "safehandler.h"
    
    #include <asio.hpp>
    #include <boost/timer.hpp>
    
    class Client
            : public sigslot::has_slots<>
    {
    protected:
        BizConnection::BizPtr m_pBizConnection;
    
    public:
        typedef void (Client::*RequestPtr)(BizPackage&);
        typedef std::map<uint32_t, RequestPtr> RequestMap;
    
        typedef void (Client::*NotifyPtr)(BizPackage&);
        typedef std::map<uint32_t, NotifyPtr> NotifyMap;
    
        explicit Client(const string& name = string(""));
    
        /// 继承类需要实现的提供外部的方法
        virtual void startServer() = 0;
        virtual bool sendToServer(YProto &proto) = 0;
        virtual void stopServer() {clearWaitforTimer(); m_pBizConnection->stop();}
    
    protected:
        /// 继承类需要实现的初始化函数
        virtual void initRequestMap() {assert(false);}
        virtual void initNotifyMap() {assert(false);}
        virtual void initSignal();
    
        /// 继承类需要实现的钩子函数,用于处理网络事件
        virtual void onBizMsgArrived(core::Request& msg) = 0;
        virtual void onBizError(uint32_t ec, const string& em);
        virtual void onBizConnected() = 0;
    
        /// 继承类可以使用的工具方法
        /// 1. 心跳类
        void setKeepAliveSec(uint32_t sec) {m_keepAliveSec = sec;}
        uint32_t getKeepAliveSec() {return m_keepAliveSec;}
        void startKeepAlive();
        void keepAlive(const asio::error_code& e);
        virtual void onKeepAlive() {assert(false);}
        /// 2. 登陆状态类
        void setHasLogin(bool b) {m_hasLogin = b;}
        bool getHasLogin() {return m_hasLogin;}
        bool isOnline() { return (m_pBizConnection->isConnected() && m_hasLogin);}
        /// 3. 消息保存类
        template <typename Handler>
        bool savePendingCommand(Handler handler)
        {
            if(m_pendingCmd.size() < MaxPendingCommandCount) {
                m_pendingCmd.push_back(Command(handler));
                return true;
            }
            return false;
        }
        void sendPendingCommand()
        {
            if (isOnline()) {
                vector<Command>::iterator it = m_pendingCmd.begin();
                for(; it != m_pendingCmd.end(); ++it) {
                    (*it)();
                }
                m_pendingCmd.clear();
            }
        }
        /// 4. 延迟处理类
        typedef void (Client::*HoldonCallback)();
        void holdonSeconds(uint32_t sec, HoldonCallback func);
        void holdonHandler(HoldonCallback func, const asio::error_code &e);
    
        /// 5. waitfor 工具 处理异步消息超时
        typedef boost::shared_ptr<asio::deadline_timer> SharedTimerPtr;
        typedef boost::scoped_ptr<asio::deadline_timer> ScopedTimerPtr;
        typedef boost::shared_ptr<Probe> SharedProbe;
        typedef map<uint32_t, SharedTimerPtr> Uri2Timer;            /// 等待收到的包uri --> 这个时间timer
        void waitfor(uint32_t uri, uint32_t sec);                   /// 在发送req的时候调用,sec 秒数 uri 等待收到的uri
        void waitforTimeout(uint32_t uri, const asio::error_code& e); /// 所有waitfor超时都会自动回调这个函数
        virtual void onWaitforTimeout(uint32_t uri) {(void)uri; assert(false);}         /// 继承类覆盖这个钩子函数来进行错误处理
        void waitforReceived(uint32_t uri);                         /// 当响应函数handler被回调时,记得调用waitforReceived做清理工作
        void eraseWaitforTimer(uint32_t uri);
        void clearWaitforTimer();
    
        /// 继承类可以使用的工具成员:心跳 探针 请求阻塞
        typedef std::set<uint32_t> BlockReq;
        BlockReq m_block;
        SharedProbe m_probe;
        ScopedTimerPtr m_timer;
        uint32_t m_keepAliveSec;
        bool m_hasLogin;
        vector<Command> m_pendingCmd;
        static const uint32_t MaxPendingCommandCount = 20;
        ScopedTimerPtr m_holdonTimer;
        Uri2Timer m_uri2timer;
    };
    
    
    #define BIND_REQ(m, uri, callback) 
        m[static_cast<uint32_t>(uri)] = static_cast<RequestPtr>(callback);
    
    #define BIND_NOTIFY(m, uri, callback) 
        m[static_cast<uint32_t>(uri)] = static_cast<NotifyPtr>(callback);
    
    #endif // CLIENT_H
    2.1.6 client.cpp
    
    #include "stdafx.h"
    #include "client.h"
    
    Client::Client(const string& name)
        : m_pBizConnection(BizConnection::create(ioService::instance(), name))
        , m_probe(new Probe)
        , m_keepAliveSec(10)
        , m_hasLogin(false)
    {
    }
    
    void Client::initSignal()
    {
        m_pBizConnection->BizError.connect(this, &Client::onBizError);
        m_pBizConnection->BizMsgArrived.connect(this, &Client::onBizMsgArrived);
        m_pBizConnection->BizConnected.connect(this, &Client::onBizConnected);
    }
    
    void Client::onBizError(uint32_t ec, const string &em)
    {
        m_facade.serverError.emit(ec, em);
    }
    
    void Client::startKeepAlive()
    {
        m_timer.reset(new asio::deadline_timer(m_facade.io_service_ref));
        m_timer->expires_from_now(boost::posix_time::seconds(m_keepAliveSec));
        m_timer->async_wait(SafeHandler1<Client, const asio::error_code&>(&Client::keepAlive, this, m_probe));
    }
    
    void Client::keepAlive(const asio::error_code &e)
    {
        if (e != asio::error::operation_aborted) {
            FINE("%u send ping to %s %s:%u", m_facade.m_pInfo->uid, STR(m_pBizConnection->getName()), STR(m_pBizConnection->getip()), m_pBizConnection->getport());
            onKeepAlive();
            m_timer->expires_from_now(boost::posix_time::seconds(m_keepAliveSec ));
            m_timer->async_wait(SafeHandler1<Client, const asio::error_code&>(&Client::keepAlive, this, m_probe));
        }
    }
    
    void Client::holdonSeconds(uint32_t sec, HoldonCallback func)
    {
        m_holdonTimer.reset(new asio::deadline_timer(m_facade.io_service_ref));
        m_holdonTimer->expires_from_now(boost::posix_time::seconds(sec));
        SafeHandler1Bind1<Client, HoldonCallback, const asio::error_code&> h(&Client::holdonHandler, this, func, m_probe);
        m_holdonTimer->async_wait(h);
    }
    
    void Client::holdonHandler(HoldonCallback func, const asio::error_code &e)
    {
        if (!e) {
            if (m_holdonTimer != NULL)
                m_holdonTimer->cancel();
            (this->*func)();
        } else {
             WARN("error: %s", STR(e.message()));
        }
    }
    
    void Client::waitfor(uint32_t uri, uint32_t sec)
    {
        SharedTimerPtr t(new asio::deadline_timer(m_facade.io_service_ref));
        t->expires_from_now(boost::posix_time::seconds(sec));
        t->async_wait(SafeHandler1Bind1<Client, uint32_t, const asio::error_code&>(
                          &Client::waitforTimeout, this, uri, m_probe));
        m_uri2timer[uri] = t;
    }
    
    void Client::waitforTimeout(uint32_t uri, const asio::error_code &e)
    {
        if (e != asio::error::operation_aborted) {
            FATAL("%s waitfor uri %u timeout", STR(m_pBizConnection->getName()), uri);
            eraseWaitforTimer(uri);
            onWaitforTimeout(uri);
        }
    }
    
    void Client::waitforReceived(uint32_t uri)
    {
        eraseWaitforTimer(uri);
    }
    
    void Client::eraseWaitforTimer(uint32_t uri)
    {
        Uri2Timer::iterator it = m_uri2timer.find(uri);
        if (it != m_uri2timer.end()) {
            SharedTimerPtr& t = it->second;
            if (t) {
                asio::error_code e;
                t->cancel(e);
                t.reset();
            }
            m_uri2timer.erase(it);
        }
    }
    
    void Client::clearWaitforTimer()
    {
        Uri2Timer::iterator it = m_uri2timer.begin();
        for (; it != m_uri2timer.end(); ++it) {
            SharedTimerPtr& t = it->second;
            if (t) {
                asio::error_code e;
                t->cancel(e);
                t.reset();
            }
        }
        m_uri2timer.clear();
    }
    2.2 作为server模块
    
    作为server模块由于涉及公司的业务比较多,这里剥离出一个作为crossdomain服务器的部分,功能很简单:flash客户端通过socket请求crossdomain配置文件,server返回给定的字符串。这里使用了比较著名的pimpl模式,将实现完全隐藏在cpp文件中。
    
    2.2.1 crossdomain.h
    
    #ifndef CROSSDOMAIN_H
    #define CROSSDOMAIN_H
    
    #include <string>
    #include <boost/shared_ptr.hpp>
    #include <asio.hpp>
    
    /**
     * @author yurunsun@gmail.com
     */
    
    class CrossDomain
    {
    private:
        struct Server;
        boost::shared_ptr<Server> m_pserver;
        CrossDomain(asio::io_service& io_service, const std::string& local_port);
        static CrossDomain* s_instance;
    
    public:
        static void create(asio::io_service& io_service, const std::string& local_port)
        {
            s_instance = new CrossDomain(io_service, local_port);
        }
    
        static CrossDomain* instance();
        void start_server();
    };
    
    #endif // CROSSDOMAIN_H
    2.2.2 crossdomain.cpp
    
    #include "stdafx.h"
    #include "crossdomain.h"
    
    using asio::ip::tcp;
    using boost::uint8_t;
    CrossDomain* CrossDomain::s_instance = NULL;
    
    struct CrossDomainImpl : public boost::enable_shared_from_this<CrossDomainImpl>
    {
    public:
        static const unsigned MaxReadSize = 22;
        typedef boost::shared_ptr<CrossDomainImpl> CrossDomainImplPtr;
        static CrossDomainImplPtr create(asio::io_service& io_service) {
            return CrossDomainImplPtr(new CrossDomainImpl(io_service));
        }
    
        tcp::socket& get_socket() {
            return m_socket;
        }
    
        void start() {
            start_read_some();
        }
    
        ~CrossDomainImpl() {
            close();
        }
    
        void close() {
            if (m_socket.is_open()) {
                m_socket.close();
            }
        }
    
    private:
        CrossDomainImpl(asio::io_service& io_service)
            : m_socket(io_service)
        {
        }
    
        void start_read_some() {
            m_socket.async_read_some(asio::buffer(m_readbuf, MaxReadSize),
                boost::bind(&CrossDomainImpl::handle_read_some, shared_from_this(), asio::placeholders::error()));
        }
    
        void handle_read_some(const asio::error_code& err) {
            if (!err) {
                string str(m_readbuf);
                string reply("invalid");
                if (str == "<policy-file-request/>") {
                    reply = "anything you wanna send back to client...";
                }
                asio::async_write(m_socket, asio::buffer(ref),
                        boost::bind(&CrossDomainImpl::handle_write, shared_from_this(), asio::placeholders::error));
            }
        }
    
        void handle_write(const asio::error_code& error) {
            FINE("CrossDomain handle_write, gonna close");
            close();
        }
    
        tcp::socket m_socket;
        char m_readbuf[MaxReadSize];
    };
    
    struct CrossDomain::Server
    {
    private:
        CrossDomain *m_facade;
        tcp::acceptor m_acceptor;
        bool m_listened;
        string m_local_port;
    
    public:
        Server(asio::io_service& io_service, const string &local_port)
            : m_acceptor(io_service)
            , m_listened(false)
            , m_local_port(local_port)
        {
            // intend to leave it blank
        }
        ~Server() {
            if (m_acceptor.is_open()) {
                INFO("close server acceptor");
                m_acceptor.close();
            }
        }
    
        void start_server() {
            FINE("CrossDomain start_server....");
            if (!m_listened) {
                FINE("Try to listen...");
                try {
                    tcp::endpoint ep(tcp::endpoint(tcp::v4(), atoi(m_local_port.c_str())));
                    m_acceptor.open(ep.protocol());
                    m_acceptor.bind(ep);
                    m_acceptor.listen();
                } catch (const asio::system_error& ec) {
                    WARN("Port %s already in use! Fail to listen...", STR(m_local_port));
                    return;
                } catch (...) {
                    WARN("Unknown error while trying to listen...");
                    return;
                }
                m_listened = true;
                FINE("Listen port %s succesfully!", STR(m_local_port));
            }
    
            CrossDomainImpl::CrossDomainImplPtr new_server_impl = CrossDomainImpl::create(m_acceptor.get_io_service());
            m_acceptor.async_accept(new_server_impl->get_socket(),
                boost::bind(&Server::handle_accept, this, new_server_impl, asio::placeholders::error));
        }
    
    private:
        void handle_accept(CrossDomainImpl::CrossDomainImplPtr pserver_impl, const asio::error_code& err) {
            FINE("CrossDomain handle_accpet....");
            if (!err) {
                FINE("CrossDomain everything ok, start...");
                pserver_impl->start();    // start this server
                start_server();           // waiting for another Tuna Connection
            } else {
                pserver_impl->close();
            }
        }
    };
    
    CrossDomain::CrossDomain(asio::io_service &io_service, const std::string &local_port)
        : m_pserver(new Server(io_service, local_port))
    {
    }
    
    CrossDomain *CrossDomain::instance()
    {
        if (!s_instance) {
            return NULL;
        }
        return s_instance;
    }
    
    void CrossDomain::start_server()
    {
        m_pserver->start_server();
    }
    3. 使用asio的陷阱
    上边代码其实有几点漏洞:
    
    3.1 std::vector<uint_8>不适合作为buffer
    
    vector<uint8_t>不适合做buffer的原因是,sgi的内存分配器会以2倍的形式增长vector的内存,例如这个buffer要求100K,但当前vector的capability只有90K,那么sgi默认内存分配器会将vector的capability增长到180K。注意capability与size的区别。这就导致vector的内存占用依赖最大buffer的size,这是很危险的。
    
    推荐使用boost的circular_buffer作为buffer,能有效避免内存碎片、隐式内存泄露等问题。
    
    3.2 asio::const_buffer拷贝构造函数没有深拷贝
    
    const_buffer系列静态buffer只能从mutable_buffermerge过来,但是从const_buffer的拷贝构造函数源码能看到,他并不对buffer做深拷贝。所以试图将其放到队列或者容器中,期待产生buffer的拷贝,是错误的。
    
    3.3. async_write可能会拆包发送
    
    例如先调用async_write发送一个100K的大包,再马上调用async_write发送一个8字节的ping包,非常可能出现问题。async_write函数的实现是循环调用async_write_some,对于大包会将其拆分成几个小报文。如果此时收到用户一个新的async_write调用,非常可能将小包夹在大包的几个部分中间发送,导致接收端出现异常。
    
    解决的办法可以直接操作async_write_some,代替async_write。但更方便的办法是创建一个发送队列。实际上asio会准确地将发送成功的通知发送给用户,例如刚刚100K的打包,直到所有100K全部发送完成,才会调用handle回调。因此可以在发送时将报文入队列,回调函数里将报文出队列,发送下一个小报时判断队列是否为空,如果非空说明100K的包还没有发完。示例代码如下:
    
    ///发送时入队列
    void TcpConnection::send(const void* data, uint32_t length)
    {
        if (!m_stopped) {
            if (length <= MAX_BUFFER_SIZE) {
                const char* begin = (const char*)data;
                vector<char> vec(begin, begin + length);
    
                bool isLastComplete = m_bufQueue.empty();
                m_bufQueue.push_back(vec);
                /// 如果没有残余的包,就直接发送
                if (isLastComplete) {
                    vector<char>& b(m_bufQueue.front());
                    send(b);
                }
            } else {
                onCommonError(S_ERROR, "too big length to call send");
            }
        } else {
            onCommonError(S_ERROR, "illegal to call send while tcp is not connected");
        }
    }
    
    void TcpConnection::send(const std::vector<char>& vec)
    {
        if (!m_stopped) {
            asio::async_write(m_socket, asio::buffer(&vec[0], vec.size()), asio::transfer_all(), 
                              boost::bind(&TcpConnection::handleSend, shared_from_this(), asio::placeholders::error));
    
        } else {
            onCommonError(S_ERROR, "illegal to call send while tcp is not connected");
        }
    }
    
    ///回调函数将之前的buffer出队列,同时检查是否有后来的包
    void TcpConnection::handleSend(const asio::error_code &e)
    {
        if (!m_stopped) {
            if (!e) {
                m_bufQueue.pop_front();
                if (!m_bufQueue.empty()) {
                    std::vector<char>& b(m_bufQueue.front());
                    send(b);
                }
                //onSendSuccess();
            } else if (isConnected()){
                onSendFailure(e);
            }
        } else {
            INFO("%s %s %u user's canceled by stop()", STR(m_name), STR(m_ip), m_port);
        }
    }
    

  • 相关阅读:
    【linux基础】linux系统日志设置相关记录
    【linux基础】mount: unknown filesystem type 'exfat'
    [c++]float assign
    第6章 移动语义和enable_if:6.1 完美转发
    第5章 技巧性基础:5.7 模板模板参数
    第5章 技巧性基础:5.6 变量模板
    第5章 技巧性基础:5.4 原生数组和字符串字面量的模板
    第5章 技巧性基础:5.3 this->的使用
    第5章 技巧性基础:5.2 零初始化
    第5章 技巧性基础:5.1 关键字typename
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13318602.html
Copyright © 2011-2022 走看看