trinitycore是游戏服务器的开源代码 许多玩家使用魔兽的数据来进行测试 ,使用它来假设魔兽私服。
官方网址 https://www.trinitycore.org/
类似的还有mangos 和 kbengine 不过mangos使用庞大的ACE网络框架
kbengine使用自写网络库 两者均使用了多语言进行开发
作为trinitycore 主要使用c++。代码比较好读,就开启本篇这个代码阅读的坑
代码要求具备c++11的function shared_ptr 指针的相关知识
以及了解阅读过boost asio网络库的文档和示例代码的相关知识
先从网络看起
大致看了下 trinitycore使用boost asio作为网络库
大致就是这么几个代码
class AsyncAcceptor 顾名思义 是异步accept
构造函数 简单的初始化类中变量 无他
AsyncAcceptor(boost::asio::io_service& ioService, std::string const& bindIp, uint16 port) :
_acceptor(ioService), _endpoint(boost::asio::ip::address::from_string(bindIp), port),
_socket(ioService), _closed(false), _socketFactory(std::bind(&AsyncAcceptor::DefeaultSocketFactory, this))
{
}
void AsyncAcceptWithCallback() 函数
1 使用_socketFactory产生socket指针复制给类变量
tcp::socket* socket;
std::tie(socket, threadIndex) = _socketFactory();
2 将初始化时传入的函数指针AcceptCallback在ACCEPT时候调用
typedef void(*AcceptCallback)(tcp::socket&& newSocket, uint32 threadIndex);
acceptCallback(std::move(*socket), threadIndex);
并再次进入 AsyncAcceptWithCallback函数等待下次accept;
if (!_closed)
this->AsyncAcceptWithCallback<acceptCallback>();
bool Bind()函数将类中accpter与指定的协议绑定bind 并进行监听listen
template<class T>
void AsyncAcceptor::AsyncAccept()函数
不设置回调函数 而是将accept的socket转化为 类型T的指针 并调用T->start();
并且再次进入AsyncAccept()等待下次accept
1 /* 2 * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/> 3 * 4 * This program is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License as published by the 6 * Free Software Foundation; either version 2 of the License, or (at your 7 * option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 12 * more details. 13 * 14 * You should have received a copy of the GNU General Public License along 15 * with this program. If not, see <http://www.gnu.org/licenses/>. 16 */ 17 18 #ifndef __ASYNCACCEPT_H_ 19 #define __ASYNCACCEPT_H_ 20 21 #include "Log.h" 22 #include <boost/asio/ip/tcp.hpp> 23 #include <boost/asio/ip/address.hpp> 24 #include <functional> 25 #include <atomic> 26 27 using boost::asio::ip::tcp; 28 29 class AsyncAcceptor 30 { 31 public: 32 typedef void(*AcceptCallback)(tcp::socket&& newSocket, uint32 threadIndex); 33 34 AsyncAcceptor(boost::asio::io_service& ioService, std::string const& bindIp, uint16 port) : 35 _acceptor(ioService), _endpoint(boost::asio::ip::address::from_string(bindIp), port), 36 _socket(ioService), _closed(false), _socketFactory(std::bind(&AsyncAcceptor::DefeaultSocketFactory, this)) 37 { 38 } 39 40 template<class T> 41 void AsyncAccept(); 42 43 template<AcceptCallback acceptCallback> 44 void AsyncAcceptWithCallback() 45 { 46 tcp::socket* socket; 47 uint32 threadIndex; 48 std::tie(socket, threadIndex) = _socketFactory(); 49 _acceptor.async_accept(*socket, [this, socket, threadIndex](boost::system::error_code error) 50 { 51 if (!error) 52 { 53 try 54 { 55 socket->non_blocking(true); 56 57 acceptCallback(std::move(*socket), threadIndex); 58 } 59 catch (boost::system::system_error const& err) 60 { 61 TC_LOG_INFO("network", "Failed to initialize client's socket %s", err.what()); 62 } 63 } 64 65 if (!_closed) 66 this->AsyncAcceptWithCallback<acceptCallback>(); 67 }); 68 } 69 70 bool Bind() 71 { 72 boost::system::error_code errorCode; 73 _acceptor.open(_endpoint.protocol(), errorCode); 74 if (errorCode) 75 { 76 TC_LOG_INFO("network", "Failed to open acceptor %s", errorCode.message().c_str()); 77 return false; 78 } 79 80 _acceptor.bind(_endpoint, errorCode); 81 if (errorCode) 82 { 83 TC_LOG_INFO("network", "Could not bind to %s:%u %s", _endpoint.address().to_string().c_str(), _endpoint.port(), errorCode.message().c_str()); 84 return false; 85 } 86 87 _acceptor.listen(boost::asio::socket_base::max_connections, errorCode); 88 if (errorCode) 89 { 90 TC_LOG_INFO("network", "Failed to start listening on %s:%u %s", _endpoint.address().to_string().c_str(), _endpoint.port(), errorCode.message().c_str()); 91 return false; 92 } 93 94 return true; 95 } 96 97 void Close() 98 { 99 if (_closed.exchange(true)) 100 return; 101 102 boost::system::error_code err; 103 _acceptor.close(err); 104 } 105 106 void SetSocketFactory(std::function<std::pair<tcp::socket*, uint32>()> func) { _socketFactory = func; } 107 108 private: 109 std::pair<tcp::socket*, uint32> DefeaultSocketFactory() { return std::make_pair(&_socket, 0); } 110 111 tcp::acceptor _acceptor; 112 tcp::endpoint _endpoint; 113 tcp::socket _socket; 114 std::atomic<bool> _closed; 115 std::function<std::pair<tcp::socket*, uint32>()> _socketFactory; 116 }; 117 118 template<class T> 119 void AsyncAcceptor::AsyncAccept() 120 { 121 _acceptor.async_accept(_socket, [this](boost::system::error_code error) 122 { 123 if (!error) 124 { 125 try 126 { 127 // this-> is required here to fix an segmentation fault in gcc 4.7.2 - reason is lambdas in a templated class 128 std::make_shared<T>(std::move(this->_socket))->Start(); 129 } 130 catch (boost::system::system_error const& err) 131 { 132 TC_LOG_INFO("network", "Failed to retrieve client's remote address %s", err.what()); 133 } 134 } 135 136 // lets slap some more this-> on this so we can fix this bug with gcc 4.7.2 throwing internals in yo face 137 if (!_closed) 138 this->AsyncAccept<T>(); 139 }); 140 } 141 142 #endif /* __ASYNCACCEPT_H_ */
//============================================================================
template<class SocketType>
class NetworkThread
类使用多线程执行socket的管理
那么类的成员变量及作用如后
typedef std::vector<std::shared_ptr<SocketType>> SocketContainer; //socket容器进行socket指针的存储管理
std::atomic<int32> _connections; //原子计数 多线程下记录连接数目
std::atomic<bool> _stopped; //原子bool型flag 标记此线程是否停止
std::thread* _thread; //线程指针
SocketContainer _sockets; //socket容器进行socket指针的存储管理
std::mutex _newSocketsLock; //多线程互斥变量
SocketContainer _newSockets; //另一个socket容器
//boost 设置常规变量
boost::asio::io_service _io_service;
tcp::socket _acceptSocket;
boost::asio::deadline_timer _updateTimer;
从 void Run()函数入手
void Run()的功能如下
定时异步执行 Update函数 ,运行_io_service
_updateTimer.expires_from_now(boost::posix_time::milliseconds(10));
_updateTimer.async_wait(std::bind(&NetworkThread<SocketType>::Update, this));
_io_service.run();
void Update()函数
在加锁情况下 将_newSockets容器socket指针添加到_sockets ==》 AddNewSockets();
并移除那些update失败的socket==》_sockets.erase(。。。。。。
再次定时异步调用update
函数作用应该是定时清除无效socket 具体效果还需要在后继代码中看看 template<typename SocketType> 中SocketType的update函数的具体作用
整个类代码如下
1 /* 2 * Copyright (C) 2008-2017 TrinityCore <http://www.trinitycore.org/> 3 * 4 * This program is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License as published by the 6 * Free Software Foundation; either version 2 of the License, or (at your 7 * option) any later version. 8 * 9 * This program is distributed in the hope that it will be useful, but WITHOUT 10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for 12 * more details. 13 * 14 * You should have received a copy of the GNU General Public License along 15 * with this program. If not, see <http://www.gnu.org/licenses/>. 16 */ 17 18 #ifndef NetworkThread_h__ 19 #define NetworkThread_h__ 20 21 #include "Define.h" 22 #include "Errors.h" 23 #include "Log.h" 24 #include "Timer.h" 25 #include <boost/asio/ip/tcp.hpp> 26 #include <boost/asio/deadline_timer.hpp> 27 #include <atomic> 28 #include <chrono> 29 #include <memory> 30 #include <mutex> 31 #include <set> 32 #include <thread> 33 34 using boost::asio::ip::tcp; 35 36 template<class SocketType> 37 class NetworkThread 38 { 39 public: 40 NetworkThread() : _connections(0), _stopped(false), _thread(nullptr), 41 _acceptSocket(_io_service), _updateTimer(_io_service) 42 { 43 } 44 45 virtual ~NetworkThread() 46 { 47 Stop(); 48 if (_thread) 49 { 50 Wait(); 51 delete _thread; 52 } 53 } 54 55 void Stop() 56 { 57 _stopped = true; 58 _io_service.stop(); 59 } 60 61 bool Start() 62 { 63 if (_thread) 64 return false; 65 66 _thread = new std::thread(&NetworkThread::Run, this); 67 return true; 68 } 69 70 void Wait() 71 { 72 ASSERT(_thread); 73 74 _thread->join(); 75 delete _thread; 76 _thread = nullptr; 77 } 78 79 int32 GetConnectionCount() const 80 { 81 return _connections; 82 } 83 84 virtual void AddSocket(std::shared_ptr<SocketType> sock) 85 { 86 std::lock_guard<std::mutex> lock(_newSocketsLock); 87 88 ++_connections; 89 _newSockets.push_back(sock); 90 SocketAdded(sock); 91 } 92 93 tcp::socket* GetSocketForAccept() { return &_acceptSocket; } 94 95 protected: 96 virtual void SocketAdded(std::shared_ptr<SocketType> /*sock*/) { } 97 virtual void SocketRemoved(std::shared_ptr<SocketType> /*sock*/) { } 98 99 void AddNewSockets() 100 { 101 std::lock_guard<std::mutex> lock(_newSocketsLock); 102 103 if (_newSockets.empty()) 104 return; 105 106 for (std::shared_ptr<SocketType> sock : _newSockets) 107 { 108 if (!sock->IsOpen()) 109 { 110 SocketRemoved(sock); 111 --_connections; 112 } 113 else 114 _sockets.push_back(sock); 115 } 116 117 _newSockets.clear(); 118 } 119 120 void Run() 121 { 122 TC_LOG_DEBUG("misc", "Network Thread Starting"); 123 124 _updateTimer.expires_from_now(boost::posix_time::milliseconds(10)); 125 _updateTimer.async_wait(std::bind(&NetworkThread<SocketType>::Update, this)); 126 _io_service.run(); 127 128 TC_LOG_DEBUG("misc", "Network Thread exits"); 129 _newSockets.clear(); 130 _sockets.clear(); 131 } 132 133 void Update() 134 { 135 if (_stopped) 136 return; 137 138 _updateTimer.expires_from_now(boost::posix_time::milliseconds(10)); 139 _updateTimer.async_wait(std::bind(&NetworkThread<SocketType>::Update, this)); 140 141 AddNewSockets(); 142 143 _sockets.erase(std::remove_if(_sockets.begin(), _sockets.end(), [this](std::shared_ptr<SocketType> sock) 144 { 145 if (!sock->Update()) 146 { 147 if (sock->IsOpen()) 148 sock->CloseSocket(); 149 150 this->SocketRemoved(sock); 151 152 --this->_connections; 153 return true; 154 } 155 156 return false; 157 }), _sockets.end()); 158 } 159 160 private: 161 typedef std::vector<std::shared_ptr<SocketType>> SocketContainer; 162 163 std::atomic<int32> _connections; 164 std::atomic<bool> _stopped; 165 166 std::thread* _thread; 167 168 SocketContainer _sockets; 169 170 std::mutex _newSocketsLock; 171 SocketContainer _newSockets; 172 173 boost::asio::io_service _io_service; 174 tcp::socket _acceptSocket; 175 boost::asio::deadline_timer _updateTimer; 176 }; 177 178 #endif // NetworkThread_h__