zoukankan      html  css  js  c++  java
  • trinitycore 魔兽服务器源码分析(二) 网络

    书接上文 继续分析Socket.h SocketMgr.h

    template<class T>
    class Socket : public std::enable_shared_from_this<T>

    根据智能指针的使用规则 类中有使用本类自己的指针 必须继承自enable_shared_from_this<> 防止自引用 不能释放的BUG

    class Socket封装了asio中的socket类 获取远端ip 端口等功能, 并且额外提供异步读写的功能

    类中的两个原子变量 _closed _closing标记该socket的关闭开启状态

    bool Update()函数根据socket是否是同步异步标记进行写入队列的处理。 同步则进行处理 异步则暂缓

    void AsyncRead()  void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code, std::size_t))

    则采取异步读取socket 调用默认函数ReadHandlerInternal() 或者输入函数T::*callback()

    由于AsyncReadWithCallback 函数中bind 需要 T类的指针 所以才有开头的继承std::enable_shared_from_this<T>

    但是使用比较怪异  std::enable_shared_from_this<>用法一般是继承自己本身 

    class self :: public std::enable_shared_from_this<self>{
    public:

      void test(){

      // only for test 
      std::bind(&self ::testshared_from_this());

      }
    }

    异步写write类似  ,由bool AsyncProcessQueue()函数发起

    使用asio的async_write_some函数异步读取连接内容 并调用回调函数WriteHandler()或者WriteHandlerWrapper()

    不过需要结合MessageBuffer 一起跟进流程

    类代码如下

      1 /*
      2  * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
      3  *
      4  * This program is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License as published by the
      6  * Free Software Foundation; either version 2 of the License, or (at your
      7  * option) any later version.
      8  *
      9  * This program is distributed in the hope that it will be useful, but WITHOUT
     10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     11  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
     12  * more details.
     13  *
     14  * You should have received a copy of the GNU General Public License along
     15  * with this program. If not, see <http://www.gnu.org/licenses/>.
     16  */
     17 
     18 #ifndef __SOCKET_H__
     19 #define __SOCKET_H__
     20 
     21 #include "MessageBuffer.h"
     22 #include "Log.h"
     23 #include <atomic>
     24 #include <queue>
     25 #include <memory>
     26 #include <functional>
     27 #include <type_traits>
     28 #include <boost/asio/ip/tcp.hpp>
     29 
     30 using boost::asio::ip::tcp;
     31 
     32 #define READ_BLOCK_SIZE 4096
     33 #ifdef BOOST_ASIO_HAS_IOCP
     34 #define TC_SOCKET_USE_IOCP
     35 #endif
     36 
     37 template<class T>
     38 class Socket : public std::enable_shared_from_this<T>
     39 {
     40 public:
     41     explicit Socket(tcp::socket&& socket) : _socket(std::move(socket)), _remoteAddress(_socket.remote_endpoint().address()),
     42         _remotePort(_socket.remote_endpoint().port()), _readBuffer(), _closed(false), _closing(false), _isWritingAsync(false)
     43     {
     44         _readBuffer.Resize(READ_BLOCK_SIZE);
     45     }
     46 
     47     virtual ~Socket()
     48     {
     49         _closed = true;
     50         boost::system::error_code error;
     51         _socket.close(error);
     52     }
     53 
     54     virtual void Start() = 0;
     55 
     56     virtual bool Update()
     57     {
     58         if (_closed)
     59             return false;
     60 
     61 #ifndef TC_SOCKET_USE_IOCP
     62         if (_isWritingAsync || (_writeQueue.empty() && !_closing))
     63             return true;
     64 
     65         for (; HandleQueue();)
     66             ;
     67 #endif
     68 
     69         return true;
     70     }
     71 
     72     boost::asio::ip::address GetRemoteIpAddress() const
     73     {
     74         return _remoteAddress;
     75     }
     76 
     77     uint16 GetRemotePort() const
     78     {
     79         return _remotePort;
     80     }
     81 
     82     void AsyncRead()
     83     {
     84         if (!IsOpen())
     85             return;
     86 
     87         _readBuffer.Normalize();
     88         _readBuffer.EnsureFreeSpace();
     89         _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
     90             std::bind(&Socket<T>::ReadHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
     91     }
     92 
     93     void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code, std::size_t))
     94     {
     95         if (!IsOpen())
     96             return;
     97 
     98         _readBuffer.Normalize();
     99         _readBuffer.EnsureFreeSpace();
    100         _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
    101             std::bind(callback, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
    102     }
    103 
    104     void QueuePacket(MessageBuffer&& buffer)
    105     {
    106         _writeQueue.push(std::move(buffer));
    107 
    108 #ifdef TC_SOCKET_USE_IOCP
    109         AsyncProcessQueue();
    110 #endif
    111     }
    112 
    113     bool IsOpen() const { return !_closed && !_closing; }
    114 
    115     void CloseSocket()
    116     {
    117         if (_closed.exchange(true))
    118             return;
    119 
    120         boost::system::error_code shutdownError;
    121         _socket.shutdown(boost::asio::socket_base::shutdown_send, shutdownError);
    122         if (shutdownError)
    123             TC_LOG_DEBUG("network", "Socket::CloseSocket: %s errored when shutting down socket: %i (%s)", GetRemoteIpAddress().to_string().c_str(),
    124                 shutdownError.value(), shutdownError.message().c_str());
    125 
    126         OnClose();
    127     }
    128 
    129     /// Marks the socket for closing after write buffer becomes empty
    130     void DelayedCloseSocket() { _closing = true; }
    131 
    132     MessageBuffer& GetReadBuffer() { return _readBuffer; }
    133 
    134 protected:
    135     virtual void OnClose() { }
    136 
    137     virtual void ReadHandler() = 0;
    138 
    139     bool AsyncProcessQueue()
    140     {
    141         if (_isWritingAsync)
    142             return false;
    143 
    144         _isWritingAsync = true;
    145 
    146 #ifdef TC_SOCKET_USE_IOCP
    147         MessageBuffer& buffer = _writeQueue.front();
    148         _socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()), std::bind(&Socket<T>::WriteHandler,
    149             this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
    150 #else
    151         _socket.async_write_some(boost::asio::null_buffers(), std::bind(&Socket<T>::WriteHandlerWrapper,
    152             this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
    153 #endif
    154 
    155         return false;
    156     }
    157 
    158     void SetNoDelay(bool enable)
    159     {
    160         boost::system::error_code err;
    161         _socket.set_option(tcp::no_delay(enable), err);
    162         if (err)
    163             TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for %s - %d (%s)",
    164                 GetRemoteIpAddress().to_string().c_str(), err.value(), err.message().c_str());
    165     }
    166 
    167 private:
    168     void ReadHandlerInternal(boost::system::error_code error, size_t transferredBytes)
    169     {
    170         if (error)
    171         {
    172             CloseSocket();
    173             return;
    174         }
    175 
    176         _readBuffer.WriteCompleted(transferredBytes);
    177         ReadHandler();
    178     }
    179 
    180 #ifdef TC_SOCKET_USE_IOCP
    181 
    182     void WriteHandler(boost::system::error_code error, std::size_t transferedBytes)
    183     {
    184         if (!error)
    185         {
    186             _isWritingAsync = false;
    187             _writeQueue.front().ReadCompleted(transferedBytes);
    188             if (!_writeQueue.front().GetActiveSize())
    189                 _writeQueue.pop();
    190 
    191             if (!_writeQueue.empty())
    192                 AsyncProcessQueue();
    193             else if (_closing)
    194                 CloseSocket();
    195         }
    196         else
    197             CloseSocket();
    198     }
    199 
    200 #else
    201 
    202     void WriteHandlerWrapper(boost::system::error_code /*error*/, std::size_t /*transferedBytes*/)
    203     {
    204         _isWritingAsync = false;
    205         HandleQueue();
    206     }
    207 
    208     bool HandleQueue()
    209     {
    210         if (_writeQueue.empty())
    211             return false;
    212 
    213         MessageBuffer& queuedMessage = _writeQueue.front();
    214 
    215         std::size_t bytesToSend = queuedMessage.GetActiveSize();
    216 
    217         boost::system::error_code error;
    218         std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error);
    219 
    220         if (error)
    221         {
    222             if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
    223                 return AsyncProcessQueue();
    224 
    225             _writeQueue.pop();
    226             if (_closing && _writeQueue.empty())
    227                 CloseSocket();
    228             return false;
    229         }
    230         else if (bytesSent == 0)
    231         {
    232             _writeQueue.pop();
    233             if (_closing && _writeQueue.empty())
    234                 CloseSocket();
    235             return false;
    236         }
    237         else if (bytesSent < bytesToSend) // now n > 0
    238         {
    239             queuedMessage.ReadCompleted(bytesSent);
    240             return AsyncProcessQueue();
    241         }
    242 
    243         _writeQueue.pop();
    244         if (_closing && _writeQueue.empty())
    245             CloseSocket();
    246         return !_writeQueue.empty();
    247     }
    248 
    249 #endif
    250 
    251     tcp::socket _socket;
    252 
    253     boost::asio::ip::address _remoteAddress;
    254     uint16 _remotePort;
    255 
    256     MessageBuffer _readBuffer;
    257     std::queue<MessageBuffer> _writeQueue;
    258 
    259     std::atomic<bool> _closed;
    260     std::atomic<bool> _closing;
    261 
    262     bool _isWritingAsync;
    263 };
    264 
    265 #endif // __SOCKET_H__
    View Code

     //======================================================

    template<class SocketType>
    class SocketMgr

    将之前的Socket NetworkThread AsyncAcceptor

    整合了起来

    virtual bool StartNetwork(boost::asio::io_service& service, std::string const& bindIp, uint16 port, int threadCount)函数

    开启threadCount个NetworkThread 

    创建一个AsyncAcceptor 异步ACCEPT连接

     uint32 SelectThreadWithMinConnections() 函数会返回连接数目最少的NetworkThread 的线程索引

    std::pair<tcp::socket*, uint32> GetSocketForAccept()则返回连接数目最少的线程索引和 该线程用于异步连接Socket指针

    其余的start stop 就没什么了

    值得关注的是virtual void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex)

    当继承SocketMgr的服务器在accept的时候会调用该函数

    函数功能是运行accept的Socket的run函数

    并且讲Socket加入到NetworkThread 的Socket容器中(AddSocket函数)

    整个类的代码如下

      1 /*
      2  * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/>
      3  *
      4  * This program is free software; you can redistribute it and/or modify it
      5  * under the terms of the GNU General Public License as published by the
      6  * Free Software Foundation; either version 2 of the License, or (at your
      7  * option) any later version.
      8  *
      9  * This program is distributed in the hope that it will be useful, but WITHOUT
     10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     11  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
     12  * more details.
     13  *
     14  * You should have received a copy of the GNU General Public License along
     15  * with this program. If not, see <http://www.gnu.org/licenses/>.
     16  */
     17 
     18 #ifndef SocketMgr_h__
     19 #define SocketMgr_h__
     20 
     21 #include "AsyncAcceptor.h"
     22 #include "Errors.h"
     23 #include "NetworkThread.h"
     24 #include <boost/asio/ip/tcp.hpp>
     25 #include <memory>
     26 
     27 using boost::asio::ip::tcp;
     28 
     29 template<class SocketType>
     30 class SocketMgr
     31 {
     32 public:
     33     virtual ~SocketMgr()
     34     {
     35         ASSERT(!_threads && !_acceptor && !_threadCount, "StopNetwork must be called prior to SocketMgr destruction");
     36     }
     37 
     38     virtual bool StartNetwork(boost::asio::io_service& service, std::string const& bindIp, uint16 port, int threadCount)
     39     {
     40         ASSERT(threadCount > 0);
     41 
     42         AsyncAcceptor* acceptor = nullptr;
     43         try
     44         {
     45             acceptor = new AsyncAcceptor(service, bindIp, port);
     46         }
     47         catch (boost::system::system_error const& err)
     48         {
     49             TC_LOG_ERROR("network", "Exception caught in SocketMgr.StartNetwork (%s:%u): %s", bindIp.c_str(), port, err.what());
     50             return false;
     51         }
     52 
     53         if (!acceptor->Bind())
     54         {
     55             TC_LOG_ERROR("network", "StartNetwork failed to bind socket acceptor");
     56             return false;
     57         }
     58 
     59         _acceptor = acceptor;
     60         _threadCount = threadCount;
     61         _threads = CreateThreads();
     62 
     63         ASSERT(_threads);
     64 
     65         for (int32 i = 0; i < _threadCount; ++i)
     66             _threads[i].Start();
     67 
     68         return true;
     69     }
     70 
     71     virtual void StopNetwork()
     72     {
     73         _acceptor->Close();
     74 
     75         if (_threadCount != 0)
     76             for (int32 i = 0; i < _threadCount; ++i)
     77                 _threads[i].Stop();
     78 
     79         Wait();
     80 
     81         delete _acceptor;
     82         _acceptor = nullptr;
     83         delete[] _threads;
     84         _threads = nullptr;
     85         _threadCount = 0;
     86     }
     87 
     88     void Wait()
     89     {
     90         if (_threadCount != 0)
     91             for (int32 i = 0; i < _threadCount; ++i)
     92                 _threads[i].Wait();
     93     }
     94 
     95     virtual void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex)
     96     {
     97         try
     98         {
     99             std::shared_ptr<SocketType> newSocket = std::make_shared<SocketType>(std::move(sock));
    100             newSocket->Start();
    101 
    102             _threads[threadIndex].AddSocket(newSocket);
    103         }
    104         catch (boost::system::system_error const& err)
    105         {
    106             TC_LOG_WARN("network", "Failed to retrieve client's remote address %s", err.what());
    107         }
    108     }
    109 
    110     int32 GetNetworkThreadCount() const { return _threadCount; }
    111 
    112     uint32 SelectThreadWithMinConnections() const
    113     {
    114         uint32 min = 0;
    115 
    116         for (int32 i = 1; i < _threadCount; ++i)
    117             if (_threads[i].GetConnectionCount() < _threads[min].GetConnectionCount())
    118                 min = i;
    119 
    120         return min;
    121     }
    122 
    123     std::pair<tcp::socket*, uint32> GetSocketForAccept()
    124     {
    125         uint32 threadIndex = SelectThreadWithMinConnections();
    126         return std::make_pair(_threads[threadIndex].GetSocketForAccept(), threadIndex);
    127     }
    128 
    129 protected:
    130     SocketMgr() : _acceptor(nullptr), _threads(nullptr), _threadCount(0)
    131     {
    132     }
    133 
    134     virtual NetworkThread<SocketType>* CreateThreads() const = 0;
    135 
    136     AsyncAcceptor* _acceptor;
    137     NetworkThread<SocketType>* _threads;
    138     int32 _threadCount;
    139 };
    140 
    141 #endif // SocketMgr_h__
    View Code
    作 者: itdef
    欢迎转帖 请保持文本完整并注明出处
    技术博客 http://www.cnblogs.com/itdef/
    B站算法视频题解
    https://space.bilibili.com/18508846
    qq 151435887
    gitee https://gitee.com/def/
    欢迎c c++ 算法爱好者 windows驱动爱好者 服务器程序员沟通交流
    如果觉得不错,欢迎点赞,你的鼓励就是我的动力
    阿里打赏 微信打赏
  • 相关阅读:
    linux shell在while中用read从键盘输入
    ubuntu14.04折腾迅雷xware
    select与epoll分析
    ubuntu 14.04下练习lua
    C++中的重载、覆盖、隐藏
    删除ubuntu旧内核
    fcntl函数加文件锁
    系统中断与SA_RESTART
    linux使用共享内存通信的进程同步退出问题
    leetcode-easy-others-268 Missing Number
  • 原文地址:https://www.cnblogs.com/itdef/p/7729530.html
Copyright © 2011-2022 走看看