程序员也三年了,工作以来,一直都在写游戏逻辑,刚好最近闲下来,难得有时间,自己研究一些新的东西。因此赶紧看看关于网络方面的东西,总结一下,好记心不如烂键盘。
其实总的来说,网络也就那么回事,比我原本想像中要简单很多,只有简单的几个接口。
一开始关于大数据,我还想复杂了,原本以为需要自己分包,再合包,而且我一开始也是这么做的,都已经做好了,再问别人才发现,这是没有必要的。(伤心!!)
网络就是一个数据流,发送的时候,可以分多次send,接收时,也可以多次recv , 因为有缓存机制。
首先,需要做的只是给每条消息加上消息头,定义好包的大小及消息类型等其他业务相关的数据。
第二步, 发送消息时,先加上包头,后面跟数据。
第三步,接收消息时,先接收包头,再决定后面接收多大的数据。
以下直接贴代码:
客户端:
SocketClient.h
#ifndef __NET_SOCKET_CLIENT_H__ #define __NET_SOCKET_CLIENT_H__ #ifdef WIN32 #include <WinSock2.h> #else #include <sys/socket.h> #include <netinet/in.h> #include <sys/types.h> #include <sys/ioctl.h> #include <arpa/inet.h> #include <unistd.h> #ifndef typedef_socket_ #define typedef_socket_ typedef int SOCKET; #endif //typedef unsigned int SOCKET; #define INVALID_SOCKET -1 #define SOCKET_ERROR -1 #endif #include "ByteArray.h" #include <thread> #include <mutex> #include <list> #include <string> #define MsgPack_ptr std::shared_ptr<MsgPack> class SocketClient { private: SOCKET m_sock; std::shared_ptr<std::thread> m_thread; bool m_connected; std::mutex m_mutex; std::list<MsgPack*> m_requests; std::list<MsgPack_ptr> m_responseList; bool m_running; std::string m_callback; int32_t m_recvTimeout; public: SocketClient(); ~SocketClient(); bool _connect_timeout(std::string ip, unsigned short port, int timeout); bool _connect_block(std::string ip, unsigned short port); bool connect(std::string ip, unsigned short port, int timeout); int send(const char* buf, int len); int recv(char* buf, int len); int close(); void startRecvThread(); bool isConnected() const { return m_connected; } void setRecvTimeout(int32_t ms){ m_recvTimeout = ms; } bool sendRequest(MsgPack* reqest); MsgPack* recvMessage(); std::list<MsgPack_ptr>& getResponseList() { return m_responseList; } void lockMutex(); void unlockMutex(); protected: void runRecv(); }; #endif
SocketClient.cpp
#include "SocketClient.h" #include <iostream> #include "bytearray.h" #include "msg.h" //#include "../Utils/Utils.h" #include "cocos2d.h" USING_NS_CC; #define NETLOG CCLOG #ifdef WIN32 #pragma comment(lib, "wsock32") class SocketIniter { public: SocketIniter() { WSADATA wsaData; WORD version = MAKEWORD(2, 0); int rt = WSAStartup(version, &wsaData); if (rt) { std::cerr << "Initilize winsock error !" << std::endl; } } ~SocketIniter() { WSACleanup(); } }; static SocketIniter s_socketIniter; #endif SocketClient::SocketClient() { m_connected = false; m_running = true; m_recvTimeout = 5; } SocketClient::~SocketClient() { close(); m_running = false; } int SocketClient::close() { m_connected = false; ::shutdown(m_sock, 2); #ifdef WIN32 ::closesocket(m_sock); #else ::close(m_sock); #endif m_mutex.lock(); // for (auto i : m_responseList) // delete i; m_responseList.clear(); m_mutex.unlock(); return 0; } bool SocketClient::_connect_timeout(std::string ip, unsigned short port, int timeout) { if (m_connected) { return true; } unsigned long non_blocking = 1; unsigned long blocking = 0; bool ret = false; int tryTimes = timeout / 2; for (int i = 0; i<tryTimes; ++i) { close(); m_sock = socket(AF_INET, SOCK_STREAM, 0); int rcv_size = 1024 * 1024; setsockopt(m_sock, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size, sizeof(int)); bool bKeppAlive = true; setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeppAlive, sizeof(bKeppAlive)); #ifdef WIN32 if (-1 == ioctlsocket(m_sock, FIONBIO, (unsigned long*)&non_blocking)) ret = false; #else ioctl(m_sock, FIONBIO, &non_blocking); #endif struct sockaddr_in sock_addr; sock_addr.sin_family = AF_INET; sock_addr.sin_addr.s_addr = inet_addr(ip.c_str()); sock_addr.sin_port = htons(port); int rt = ::connect(m_sock, (struct sockaddr*)&sock_addr, sizeof(sock_addr)); if (rt != 0) { struct timeval tv = { 2, 0 }; fd_set writefds; FD_ZERO(&writefds); FD_SET(m_sock, &writefds); if (select(m_sock + 1, 0, &writefds, 0, &tv) > 0) { #ifdef WIN32 int error; int err_len = sizeof(error); getsockopt(m_sock, SOL_SOCKET, SO_ERROR, (char*)&error, &err_len); #else int error; socklen_t err_len = sizeof(error); getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &error, &err_len); #endif if (error == 0) { ret = true; } else { ret = false; } } else { ret = false; } } else { ret = true; } if (ret) break; } //if (!ret) // return false; #ifdef WIN32 ioctlsocket(m_sock, FIONBIO, (unsigned long*)&blocking); #else ioctl(m_sock, FIONBIO, &blocking); #endif #if(CC_TARGET_PLATFORM==CC_PLATFORM_WIN32) DWORD tv2 = m_recvTimeout * 1000; #else struct timeval tv2 = { m_recvTimeout, 0 }; #endif //接收不设置超时 //setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv2, sizeof(tv2)); m_connected = ret; m_requests.clear(); if (ret) startRecvThread(); MsgPack* msg = new MsgPack(this->isConnected() ? 1 : 2);//conn return if (msg) { NETLOG("connect msg: %d", msg->getMsgID()); m_mutex.lock(); m_responseList.push_back(MsgPack_ptr(msg)); m_mutex.unlock(); } /* cocos2d::Director::getInstance()->getScheduler()->performFunctionInCocosThread([&, this]{ std::string sout; g_pLuaSystem->callLuaFunc(this->m_callback, this->isConnected()?1:0, sout); }); */ //int ret = CCLuaEngine::defaultEngine()->getLuaStack()->executeFunctionByHandler(_nStartReconnectHandler, 0); return ret; } bool SocketClient::connect(std::string ip, unsigned short port, int timeout) { auto t = std::thread(&SocketClient::_connect_timeout, this, ip, port, timeout); t.detach(); return true; } bool SocketClient::_connect_block(std::string ip, unsigned short port) { //std::lock_guard<std::mutex> lock(m_mutex); if (m_connected) { return true; } close();// add close m_mutex.lock(); // for (auto i : m_responseList) // delete i; m_responseList.clear(); m_mutex.unlock(); m_sock = socket(AF_INET, SOCK_STREAM, 0); { /* #if(CC_TARGET_PLATFORM==CC_PLATFORM_WIN32) DWORD tv = 10000; DWORD tv2 = 10000; #else struct timeval tv = {10,0}; struct timeval tv2 = {10,0}; #endif setsockopt(m_sock, SOL_SOCKET, SO_SNDTIMEO, (char*)&tv, sizeof(tv)); setsockopt(m_sock, SOL_SOCKET, SO_RCVTIMEO, (char*)&tv2, sizeof(tv2)); */ bool bKeppAlive = true; setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, (char*)&bKeppAlive, sizeof(bKeppAlive)); int rcv_size0; int optLen = sizeof(rcv_size0); //getsockopt(m_sock, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size0, &optLen); //NETLOG("socket-recv-buffer= %d %d", rcv_size0, optLen); int rcv_size = 1024 * 1024; setsockopt(m_sock, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size, sizeof(int)); //getsockopt(m_sock, SOL_SOCKET, SO_RCVBUF, (char*)&rcv_size0, &optLen); //NETLOG("socket-recv-buffer= %d %d", rcv_size0, optLen); } struct sockaddr_in svraddr; svraddr.sin_family = AF_INET; svraddr.sin_addr.s_addr = inet_addr(ip.c_str()); svraddr.sin_port = htons(port); int rt = ::connect(m_sock, (struct sockaddr*)&svraddr, sizeof(svraddr)); if (rt == SOCKET_ERROR) { return false; } m_requests.clear(); m_connected = true; startRecvThread(); return true; } /* static const std::string key = "P%2BViyZLtO^gRT2"; bool encryptByAES(std::string in, std::string& out, bool isEncrypt) { std::string iv; iv.resize(AES_BLOCK_SIZE, 0); AES_KEY aesKey; int enc(AES_ENCRYPT); if (isEncrypt) { AES_set_encrypt_key((const unsigned char*)key.c_str(), AES_BLOCK_SIZE * 8, &aesKey); } else { AES_set_decrypt_key((const unsigned char*)key.c_str(), AES_BLOCK_SIZE * 8, &aesKey); enc = AES_DECRYPT; } short inLen = in.length(); short block = (inLen + AES_BLOCK_SIZE - 1) / AES_BLOCK_SIZE; in.resize(AES_BLOCK_SIZE * block); out.resize(inLen); short begin = 0; try { while (true) { int len = inLen - begin >= AES_BLOCK_SIZE ? AES_BLOCK_SIZE : inLen - begin; AES_cbc_encrypt((unsigned char*)&inStr[begin], (unsigned char*)&outStr[begin], AES_BLOCK_SIZE, &aesKey, (unsigned char*)&iv[0], enc); begin += AES_BLOCK_SIZE; if (begin >= inLen) { break; } } } catch (...) { SYLAR_LOG_ERROR(g_logger) << "Bad aes buffer"; return false; } out.resize(inLen); return true; }*/ int SocketClient::send(const char* buf, int len){ if (!m_connected) { return -1; } int bytes = 0; int count = 0; while (count < len) { bytes = ::send(m_sock, buf + count, len - count, 0); if (bytes == -1 || bytes == 0) { m_connected = false; NETLOG("socket send false"); return -1; } count += bytes; } return count; } int SocketClient::recv(char* buf, int len) { if (!m_connected) { return -1; } return ::recv(m_sock, buf, len, MSG_WAITALL); /* int bytes = 0; int count = 0; while (count < len) { bytes = ::recv(m_sock, buf + count, len - count, MSG_WAITALL); if (bytes < 0) { m_connected = false; NETLOG("socket recv false"); return count; } count += bytes; } return count; */ } void SocketClient::startRecvThread() { if (m_thread) { //CCLOG("Recv Thread is Running"); return; } m_thread.reset(new std::thread(&SocketClient::runRecv, this)); if (!m_thread->joinable()) m_thread->join(); } bool SocketClient::sendRequest(MsgPack* reqest) { MsgPack* buf = reqest; int len = buf->getLen(); return send(buf->getData(), len) > 0; } //char recbuff[4096]; MsgPack* SocketClient::recvMessage() { MsgHeader header; if (recv((char *)&header, sizeof(MsgHeader)) <= 0) return 0; MsgPack* buff = new MsgPack(header); int lenLeft = recv(buff->getData(), header.nPkgSize); NETLOG("-----recv-end = %d", lenLeft); if (lenLeft <= 0) { delete buff; return 0; } //msg read begin buff->setCur(0); return buff; } void SocketClient::runRecv() { while (m_running) { #ifdef WIN32 Sleep(1); #else usleep(1); #endif try { while (m_connected) { MsgPack* msg = recvMessage(); if (msg) { NETLOG("is in runRecv msgid:%d msgLen:%d ", msg->getMsgID(), msg->getMax()); MsgHeader &header = msg->getHeader(); if (header.nPkgTotall > 1) { NETLOG("%d / %d", header.nPkgIndex, header.nPkgTotall); } m_mutex.lock(); m_responseList.push_back(MsgPack_ptr(msg)); m_mutex.unlock(); } } } catch (std::bad_alloc& e) { NETLOG("recv-exception %s", e.what()); } catch (...) { NETLOG("recv-exception unkown."); } } NETLOG("exit thread runRecv."); } void SocketClient::lockMutex() { m_mutex.lock(); } void SocketClient::unlockMutex() { m_mutex.unlock(); }
以下我的定义的消息头,当然其实,有的东西其实是没必要的
typedef struct { int32_t nId; //消息ID int32_t nPkgTotall; //总包数 int32_t nPkgIndex; //当前包 int32_t nPkgSize; //当前包内容大小 } MsgHeader; MsgHeader* createMsgHeader(int32_t msgId, int32_t pkgTotal, int32_t pkgIndex, int32_t pkgSz);
服务端:
NetServer.h
#pragma once //服务器 #define _MAX_HEAD 64 //最大消息头大小 #define _MAX_MSGSIZE 8 * 1024 // 暂定一个消息最大为16k #define BLOCKSECONDS 3 // INIT函数阻塞时间 #define INBUFSIZE (64*1024) //? 具体尺寸根据剖面报告调整 接收数据的缓存 #define OUTBUFSIZE (4*1024-64) //? 具体尺寸根据剖面报告调整。 发送数据的缓存,当不超过8K时,FLUSH只需要SEND一次 //Server.cpp #include <iostream> #include <winsock2.h> #pragma comment(lib, "ws2_32.lib") DWORD WINAPI ClientThread(LPVOID lpParameter); DWORD WINAPI server_main(PVOID pParam); void CALLBACK startServer(HWND hWnd, UINT nMsg, UINT nTimerid, DWORD dwTime); bool SendMsg(int msgId ,void* pBuf, int nSize, SOCKET &m_sockClient);
NetServer.cpp
//Server.cpp #include "stdafx.h" #include "Server.h" #include "WinServerDlg.h" #include <fstream> #include <vector> #include "msg.h" #include "Business.h" using namespace std; #define PORT 9001 #define IP_ADDRESS "127.0.0.1" bool GetLocalIP(char* ip) { //1.初始化wsa WSADATA wsaData; int ret = WSAStartup(MAKEWORD(2, 2), &wsaData); if (ret != 0) { return false; } //2.获取主机名 char hostname[256]; ret = gethostname(hostname, sizeof(hostname)); if (ret == SOCKET_ERROR) { return false; } //3.获取主机ip HOSTENT* host = gethostbyname(hostname); if (host == NULL) { return false; } //4.转化为char*并拷贝返回 strcpy(ip, inet_ntoa(*(in_addr*)*host->h_addr_list)); return true; } DWORD WINAPI ClientThread(LPVOID lpParameter) { SOCKET CientSocket = (SOCKET)lpParameter; int Ret = 0; char RecvBuffer[_MAX_MSGSIZE]; int headLen = sizeof(MsgHeader); //包队列,出现多包时,合包用 MsgBigCache* vecMsg[100]; memset(vecMsg, 0, sizeof(vecMsg)); while (true) { memset(RecvBuffer, 0x00, sizeof(RecvBuffer)); Ret = recv(CientSocket, RecvBuffer, _MAX_MSGSIZE, 0); if (Ret == 0 || Ret == SOCKET_ERROR) { Log("客户端退出!"); break; } MsgHeader *header = reinterpret_cast<MsgHeader*> (RecvBuffer); if (header->nPkgTotall > 1) { CString str; str.Format("此消息共%d ,当前%d", header->nPkgTotall, header->nPkgIndex); Log(str); } // Log("接收到客户信息为:%s", RecvBuffer+headLen); if (1 == header->nPkgTotall) { progressMsg(RecvBuffer + headLen, CientSocket, header); } else if (header->nPkgTotall > 1000 || header->nPkgTotall < 0) { return 0; } else { MsgBigCache *msg = new MsgBigCache(); char *buff = new char[header->nPkgSize]; memset(buff, 0, header->nPkgSize); memcpy(buff, RecvBuffer + headLen, header->nPkgSize); msg->header = *header; msg->buff = buff; vecMsg[header->nPkgIndex] = msg; bool allRec = true; for (int i = 1; i <= header->nPkgTotall; i++) { if (!vecMsg[i]) { allRec = false; break; } } if (allRec) { ofstream f("1.png" , ios_base::binary); for (int i = 1; i <= header->nPkgTotall; i++) { f.write((char *)vecMsg[i]->buff, vecMsg[i]->header.nPkgSize); delete vecMsg[i]->buff; delete vecMsg[i]; } memset(vecMsg,0,sizeof(vecMsg)); } } } return 0; } DWORD WINAPI server_main(PVOID pParam) { WSADATA Ws; SOCKET ServerSocket, ClientSocket; struct sockaddr_in LocalAddr, ClientAddr; int Ret = 0; int AddrLen = 0; HANDLE hThread = NULL; char buffIp[128]; char buffPort[20]; ifstream f("config.ini"); if (f.fail()) { GetLocalIP(buffIp); strcpy(buffPort, "9001"); } else { f.getline(buffIp, sizeof(buffIp)); f.getline(buffPort, sizeof(buffPort)); } int port = atoi(buffPort); //Init Windows Socket if (WSAStartup(MAKEWORD(2, 2), &Ws) != 0) { Log("Init Windows Socket Failed::%d" , GetLastError()); return -1; } //Create Socket ServerSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (ServerSocket == INVALID_SOCKET) { Log ("Create Socket Failed::%d" ,GetLastError()); return -1; } LocalAddr.sin_family = AF_INET; LocalAddr.sin_addr.s_addr = inet_addr(buffIp); LocalAddr.sin_port = htons(port); memset(LocalAddr.sin_zero, 0x00, 8); //Bind Socket Ret = ::bind(ServerSocket, (struct sockaddr*)&LocalAddr, sizeof(LocalAddr)); if (Ret != 0) { Log("Bind Socket Failed::%d", GetLastError()); return -1; } //listen Ret = listen(ServerSocket, 10); if (Ret != 0) { Log("listen Socket Failed::", GetLastError()); return -1; } Log("服务端启动成功!"); Log(CString("ip:") + buffIp); Log(CString("port:") + buffPort); while (true) { AddrLen = sizeof(ClientAddr); ClientSocket = accept(ServerSocket, (struct sockaddr*)&ClientAddr, &AddrLen); if (ClientSocket == INVALID_SOCKET) { Log ("Accept Failed::%d" , GetLastError()); break; } CString str; str.Format("客户端连接:: %s : %d", inet_ntoa(ClientAddr.sin_addr), ClientAddr.sin_port); Log(str); sendConnSuc(ClientSocket); hThread = CreateThread(NULL, 0, ClientThread, (LPVOID)ClientSocket, 0, NULL); if (hThread == NULL) { Log("Create Thread Failed!"); break; } CloseHandle(hThread); } closesocket(ServerSocket); closesocket(ClientSocket); WSACleanup(); return 0; } bool g_start = false; void CALLBACK startServer(HWND hWnd, UINT nMsg, UINT nTimerid, DWORD dwTime) { if (g_start) return; // server_main(0); auto hThread = CreateThread(NULL, 0, server_main, 0, 0, NULL); // if (hThread == NULL) // { // Log("Create Thread Failed!"); // } // else g_start = true; KillTimer(hWnd, nTimerid); } ////////////////////////////////////////发消息给客户端 bool hasError() { #ifdef WIN32 int err = WSAGetLastError(); if (err != WSAEWOULDBLOCK) { #else int err = errno; if (err != EINPROGRESS && err != EAGAIN) { #endif return true; } return false; } char m_bufOutput[_MAX_MSGSIZE]; int m_nOutbufLen; bool SendMsg(int msgid, void* pBuf, int nSize, SOCKET &m_sockClient) { if (pBuf == 0 || nSize <= 0) { return false; } if (m_sockClient == INVALID_SOCKET) { return false; } char *contentBuff = (char *)pBuf; MsgHeader header; header.nId = msgid; header.nPkgIndex = 1; header.nPkgTotall = 1;// ceil(1.0 * nSize / OUTBUFSIZE); int headLen = sizeof(header); header.nPkgSize = nSize; m_nOutbufLen = nSize + headLen; CString str; str.Format("回复消息%d !大小 %d 个字节!", header.nId, header.nPkgSize); Log(str); // 发送一段数据 int outsize; outsize = send(m_sockClient, (char*)&header, headLen, 0); if (outsize <= 0) { if (hasError()) { Log("发送失败,中断!"); return false; } } outsize = send(m_sockClient, (char*)pBuf, nSize, 0); if (outsize <= 0) { if (hasError()) { Log("发送失败,中断!"); return false; } } return true; }