1. 基础socket库
socket.h:
/** * 网络套接字库 */ #ifndef Socket_h #define Socket_h #include <stdio.h> #include <string> #ifdef WIN32 // windows #include <winsock.h> typedef int socklen_t; #else // linux, MacOS #include <sys/socket.h> #include <netinet/in.h> #include <netdb.h> #include <fcntl.h> #include <unistd.h> #include <sys/stat.h> #include <sys/types.h> #include <arpa/inet.h> #include <errno.h> #define INVALID_SOCKET -1 #define SOCKET_ERROR -1 typedef int SOCKET; #endif #define SOCKET_BACKLOG 100 namespace avalon { int socket_error(); int socket_init(); int socket_clean(); void socket_debug(const char* message, ...); class Socket { public: static Socket* create(SOCKET socket_fd); static Socket* create(int family, int type, int protocal = IPPROTO_IP); public: Socket(SOCKET socket_fd); Socket(int family, int type, int protocal = IPPROTO_IP); Socket& operator = (SOCKET socket_fd); virtual ~Socket(); bool connect(const char* host, unsigned short port); bool bind(unsigned short port); bool listen(int backlog = SOCKET_BACKLOG); Socket* accept(char* client_host = nullptr); ssize_t send(const char* buffer, size_t len, int flag = 0); ssize_t recv(char* buffer, size_t len, int flag = 0); int close(); SOCKET getSocketFD(); void set_blocking(const bool blocking); private: SOCKET _socket_fd; int _family; int _type; int _protocal; }; } #endif
socket.cpp
/** * 网络套接字库 */ #include "AvalonSocket.h" #ifdef WIN32 #pragma comment(lib, "wsock32") #endif #define SOCKET_DEBUG_LEVEL 0 namespace avalon { int socket_error() { int error = 0; #ifdef WIN32 error = WSAGetLastError(); #else error = errno; #endif printf("Avalon socket error: %d %s ", error, strerror(error)); return error; } void socket_debug(const char* message, ...) { char buf[1024] = ""; va_list args; va_start(args, message); vsnprintf(buf, 1024, message, args); va_end(args); std::string error = "Avalon sokcet: "; error.append(buf); error.append(" "); printf(error.c_str()); if (SOCKET_DEBUG_LEVEL) { int error_no = socket_error(); if (error_no != -1) { throw error_no; } } } int socket_init() { #ifdef WIN32 WSADATA wsadata; WORD version = MAKEWORD(2, 0); int ret = WSAStartup(version,&wsadata); if (ret) { socket_debug("Initilize winsock error"); return -1; } #endif return 0; } int socket_clean() { #ifdef WIN32 return WSACleanup(); #endif return 0; } Socket* Socket::create(SOCKET socket_fd) { if (socket_fd < 0) { socket_debug("socket_fd(%d) is invailed.", socket_fd); return nullptr; } else { Socket* socket = new Socket(socket_fd); if (socket) { return socket; } else { socket_debug("Create avalon socket failed."); return nullptr; } } } Socket* Socket::create(int family, int type, int protocal) { Socket* socket = new Socket(family, type, protocal); if (socket) { if (socket->getSocketFD() == INVALID_SOCKET) { delete socket; socket_debug("Create socket failed."); return nullptr; } socket_debug("Create socket(%d) successfully.", socket->getSocketFD()); return socket; } else { socket_debug("Create avalon socket failed."); return nullptr; } } Socket::Socket(SOCKET socket_fd) : _family(AF_INET) , _type(SOCK_STREAM) , _protocal(IPPROTO_IP) { _socket_fd = socket_fd; } Socket::Socket(int family, int type, int protocal) : _family(AF_INET) , _type(SOCK_STREAM) , _protocal(IPPROTO_IP) { _socket_fd = socket(family, type, protocal); if (_socket_fd != INVALID_SOCKET) { _family = family; _type = type; _protocal = protocal; } } Socket& Socket::operator = (SOCKET socket_fd) { _socket_fd = socket_fd; return *this; } Socket::~Socket() { if (_socket_fd != -1) { this->close(); } } bool Socket::connect(const char* host, unsigned short port) { struct sockaddr_in remote_addr; remote_addr.sin_family = _family; remote_addr.sin_port = htons(port); inet_pton(_family, host, &remote_addr.sin_addr); if (errno == EAFNOSUPPORT) return false; int ret = ::connect(_socket_fd, (struct sockaddr*)(&remote_addr), sizeof(remote_addr)); if (ret == SOCKET_ERROR) { socket_debug("Connect %s:%d failed.", host, port); socket_error(); return false; } socket_debug("Connect %s:%d successfully.", host, port); return true; } bool Socket::bind(unsigned short port) { int opt = 1; if (setsockopt(_socket_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, sizeof(opt)) < 0) return false; struct sockaddr_in remote_addr; remote_addr.sin_family = _family; remote_addr.sin_addr.s_addr = INADDR_ANY; remote_addr.sin_port = htons(port); int ret = ::bind(_socket_fd, (struct sockaddr*)(&remote_addr), sizeof(remote_addr)); if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) bind port(%d) failed.", _socket_fd, port); return false; } socket_debug("Socket(%d) bind port(%d) successfully.", _socket_fd, port); return true; } bool Socket::listen(int backlog) { int ret = ::listen(_socket_fd, backlog); if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) listen failed.", _socket_fd); return false; } socket_debug("Socket(%d) Listen successfully.", _socket_fd); return true; } Socket* Socket::accept(char* client_host) { struct sockaddr_in com_socket; socklen_t len = sizeof(com_socket); SOCKET ret = -1; do { ret = ::accept(_socket_fd, (struct sockaddr*)(&com_socket), &len); if (ret == SOCKET_ERROR) { if (errno == EINTR) continue; else { socket_debug("Socket(%d) accept failed.", _socket_fd); socket_error(); return nullptr; } } else break; } while (true); avalon::Socket* socket = avalon::Socket::create(ret); if (client_host) { sprintf(client_host, "%s", inet_ntoa(com_socket.sin_addr)); } socket_debug("Socket(%d) accept successfully, client socket: %d ip: %s", _socket_fd, socket->getSocketFD(), inet_ntoa(com_socket.sin_addr)); return socket; } ssize_t Socket::send(const char* buffer, size_t len, int flag) { ssize_t count = 0; while (count < len) { ssize_t bytes = ::send(_socket_fd, buffer + count, len - count, flag); count += bytes; if (bytes == -1 || bytes == 0) { socket_error(); break; } } return count; } ssize_t Socket::recv(char* buffer, size_t len, int flag) { return ::recv(_socket_fd, buffer, len, flag); } ssize_t Socket::write(const char* buffer, size_t len) { ssize_t count = 0; while (count < len) { ssize_t bytes = ::write(_socket_fd, buffer + count, len - count); count += bytes; if (bytes == -1 || bytes == 0) { socket_error(); break; } } return count; } ssize_t Socket::read(char* buffer, size_t len) { return ::read(_socket_fd, buffer, len); } void Socket::set_blocking(const bool blocking) { int opts; opts = fcntl(_socket_fd, F_GETFL); if (opts < 0) return; if (!blocking) opts = (opts | O_NONBLOCK); else opts = (opts & ~O_NONBLOCK); fcntl(_socket_fd, F_SETFL, opts); } int Socket::close() { int ret = -1; #ifdef WIN32 ret = closesocket(_socket_fd); #else ret = ::close(_socket_fd); #endif if (ret == SOCKET_ERROR) { socket_debug("Socket(%d) close failed.", _socket_fd); } _socket_fd = -1; return ret; } SOCKET Socket::getSocketFD() { return _socket_fd; } }
2. 多线程的模型:
在accept成功之后,为每个通信socket创建新的进程和线程,单独用于处理服务器和客户端的通信。但是系统都会有创建进程数量的限制,在linux下,创建的线程也叫轻量级进程,所以即时创建的是线程也会受到系统的限制,通常这个默认限制是2048个,而且进程或者线程数量过多,也会导致进程或者线程切换的开销:
客户端:
avalon::Socket* socket = avalon::Socket::create(AF_INET, SOCK_STREAM); if (socket) { if (!socket->connect("127.0.0.1", 6666)) continue; char buf[1024] = ""; sprintf(buf, "%d I am a client socket!", i); ssize_t bytes = socket->send(buf, strlen(buf), 0); char recvBuf[1024]; while (true) { memset(recvBuf, 0, 1024); bytes = socket->recv(recvBuf, 1024); if (bytes > 0) { printf("%d recv data from remote: %d %s ", i, bytes, recvBuf); } else if (bytes == 0) { printf("remote socket %d cloese. ", socket->getSocketFD()); break; } else { int error = avalon::socket_error(); printf("%d socket error: %d %s ", i, error, strerror(error)); break; } } }
服务端:
void communiction_handler(avalon::Socket* socket) { char buffer[1024]; while (true) { if (!socket) continue; printf("thread %ld ", std::this_thread::get_id()); ssize_t bytes = socket->recv(buffer, 1024, 0); if (bytes > 0) { buffer[bytes] = '