转自:http://blog.csdn.net/wuxunfeng/article/details/5286540
平时对网络编程方面比较感兴趣,看了一些相关的资料,自己也尝试写过一些不同网络模型的服务程序。这次刚好有一个新的需求,需要开发一个转发服务器。之前开发的项目,网络通讯都是处理联机交易的,网络连接都是采用短连接,这次的服务端,采用长连接的方式。
1. 轮询和主动通知选择
公司有一个客户端产品(CLIENT),因为需要从多个客户的服务端获(SERVER)取信息,原有的设计是每个客户端通过SOCKET不断的轮询访问服务端获取信息。当CLIENT的数量不多时,轮询访问对服务端压力不大,但是当CLIENT的数量比较多时,这样的访问对服务器的压力就比较大,而且很低效。为了降低CLIENT对服务端的压力和提高信息的获取效率,采用让SERVER主动通知CLIENT的方式。不可能让每个客户SERVER都开发一个通知的服务端,所以需要一个和SERVER通讯并且把消息通知给CLIENT的转发服务端(简称TransmitServer).系统整体架构如下:
SERVER --------------> RECV -------> TransmitServer—————>CLIENT
SERVER把消息发给RECV, RECV通过消息队列发给TransmitServer,TransmitServer再把消息通知给相应的CLIENT。
2.短连接和长连接选择
因为是通知消息的方式,所以如果使用短连接的话,让TransmitServer去和ClENT主动建立连接是不可行的,那样就要让TransmitServer知道所有CLIENT的通讯地址,这是一种很笨的方式,所以不采用。当然如果使用短连接轮询的方式,让CLIENT去访问TransmitServer,TransmitServer访问SERVER,对SERVER的压力也能得到解决.
3. 进程管理方式和I/O复用的选择
网络连接处理的模型有很多,按照进程的管理方式,我分为2类,进程池和多进程(还包括线程),以及每种方式还可以选择是否应该使用I/O复用。
这里说的进程池和多进程是,进程池是预先启动多个子进程,并且可以管理进程;多进程是指到主进程阻塞于ACCEPT处理连接请求,由子进程 单独负责每个套接口连接。
处理短连接连接,因为客户端频繁的连接服务器和断开连接,服务端的主要性能开销应该是在进程切换上,基于性能考虑采用进程池的方式会比多进程好.如果连接并发量不大,没有性能上的问题,多进程的程序会比进程池简单很多。
如果服务端处理的是长连接。如果让进程池或者多进程中一个子进程只处理一个连接,系统的主要性能开销主要取决于进程的数量,在进程的数量到一定数量的时候,会对服务器造成比较大的压力。所以在处理长连接时,一般才用I/O多路复用的方式,LINUX上I/O多路复用,有SELECT、EPOLL等。
使用I/O多路复用一方面可以让单个进程同时多个连接,可以提高并发连接数。另一方面,还可以可以让CLIENT和SERVER的通讯更加灵活,例如使用I/O多路复用,让CLIENT和SERVER可以很容易的异步通讯。
4. 线程和进程的选择
使用I/O多路复用,让单个进程可以处理多个连接,如果进程池只有一个进程, 同一个服务进程里的连接之间可以很方便的通讯,但是如果是多个子进程,那么子进程之间就不能直接通讯,通常要要消息队列、共享内存、管道等。当服务端需要处理连接之间的交互,而且性能上需要多个进程,那么使用线程池代替进程池应该是一种比较好的方式。线程和进程相比较,线程的优点是性能开销更小,因为在同一个进程空间里,线程之间的通讯很容易;因为线程共享进程空间数据,因此线程在处理的时候比进程更容易出错,线程池的方式简化了通讯方式,但是为了线程安全,这方面的复杂性就增加了。所以如果网络连接之间并不需要很多交互,每个连接处理都是独立的,那么应该选择进程。这个原则不但对网络处理,对于别的处理也是如此。
综合上面几点服务端网络模型的选择,主要是根据业务选择使用长连接或者短连接,根据连接方式。长连接一般使用线程池+I/O复用,例如网络游戏、聊天室等。短连接一般是进程池或者多进程,因为短连接不需要连接之间的交互,每个连接都是独立的,所以使用进程更加合适。
参考书目:
W.Richard Stevens <<UNIX网络编程(第一卷)>>
W.Richard Stevens <<UNIX高级环境编程>>
DEMO示例:
#include <stdio.h> #include <math.h> #include <stdlib.h> #include <string.h> #include <errno.h> #include <memory.h> #include <sys/timeb.h> #include <time.h> #include <sys/time.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/sem.h> #include <sys/un.h> #include <sys/socket.h> #include <netinet/in.h> #include <signal.h> #include <netdb.h> #define SETSIZE 3000 #define MAXLINE 1000 #define LISTENQ 2000 #define CHILDMAX 0 #define NMERIDLEN 6 #define NUSRIDLEN 40 #define NHANDHEADLEN 12 #define NTESTPACKETLEN 1 #define NPACKETHEADLEN 4 #define NMAXPACKETLEN 256 #define TICKTIME 200000 #define TCP_CLOSE -101 #define TCP_PACKET_AVILD -102 int children[ CHILDMAX ]; int ChildNum; typedef struct STRU_LINK_LIST{ char saMerId[ NMERIDLEN + 1 ]; char saUsrId[ NUSRIDLEN + 1 ]; int socket; } STRULINKLIST; STRULINKLIST client[ SETSIZE ]; typedef struct STRU_HAND_PACKET{ char saMerId[ NMERIDLEN+1 ]; char saUsrId[ NUSRIDLEN+1 ]; char saHead[ NHANDHEADLEN+1 ]; }STRUHANDPACKET; typedef struct STRU_TEST_PACKET{ char saPacket[ NTESTPACKETLEN+1 ]; }STRUTESTPACKET; typedef struct STRU_NOTIFY_PACKET{ char saMerId[ NMERIDLEN + 1 ]; char saUsrId[ NUSRIDLEN + 1 ]; }STRUNOTIFYPACKET; void sig_do( int signo ); void DeleteLink( STRULINKLIST* link, fd_set* allset ); void ProcessCmd( char* line , int len , STRULINKLIST* link, fd_set* allset, int maxi ); int pack_packet( char cmd, void* stru, char* packet ); void get_packet( char cmd, char* packet, void *stru ) ; int main(int argc, char **argv) { int i, maxi, maxfd, listenfd, connfd, sockfd; fd_set rset, allset, conset; socklen_t clilen; struct sockaddr_in cliaddr, servaddr; int nready; ssize_t n; char line[MAXLINE]; int pid; struct timeval timeval; listenfd = socket( AF_INET , SOCK_STREAM , IPPROTO_TCP ) ; bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(9999); if( bind(listenfd, (struct sockaddr * ) &servaddr, sizeof(servaddr)) < 0 ) { printf( "bind error/n" ); exit(-1); } listen(listenfd, 2000); signal( SIGINT, sig_do ); ProcessSelect( listenfd ); } Tick( ) { usleep( TICKTIME ); } void ProcessSelect( int listen_fd) { int i, maxi, maxfd, listenfd, connfd, sockfd; fd_set rset, allset,conset; socklen_t clilen; char line[MAXLINE]; char saTmpBuf[ 128 ]; int nready; ssize_t n, nLen; struct sockaddr_in cliaddr; struct timeval timeval; timeval.tv_sec = 0; timeval.tv_usec = 0; listenfd = listen_fd; maxfd = -1; maxi = -1; for (i = 0; i < SETSIZE; i++) client[i].socket = -1; FD_ZERO(&allset); FD_ZERO(&conset); FD_SET(listenfd, &conset); for ( ; ; ) { Tick( ); rset = conset; /* structure assignment */ nready = select(listenfd+1, &rset, NULL, NULL, &timeval); /*accept新的连接*/ nready = select(listenfd+1, &rset, NULL, NULL, &timeval); /*accept新的连接*/ if (FD_ISSET(listenfd, &rset)) { /* new client connection */ clilen = sizeof(cliaddr); connfd = accept(listenfd, (struct sockaddr *) &cliaddr, &clilen); if( connfd > 0 ) { ErrorLog(ERROR, "pid:%d accept new client: %s, port %d socket[%d]", getpid( ), inet_ntoa(&cliaddr.sin_addr), ntohs(cliaddr.sin_port), connfd); for (i = 0; i < SETSIZE; i++) if (client[i].socket < 0) { /* 保存新的连接*/ client[i].socket = connfd; break; } if (i == SETSIZE) { close( connfd ); ErrorLog(ERROR, "link number max"); } else { /* 添加新的连接到集合 */ FD_SET(connfd, &allset); if (connfd > maxfd) maxfd = connfd; /* for select */ if (connfd > maxfd) maxfd = connfd; /* for select */ /*记录client的最大下标*/ if (i > maxi) maxi = i; } } else ErrorLog( ERROR, "accept error " ); /* if (--nready <= 0) continue; */ } rset = allset; nready = select(maxfd+1, &rset, NULL, NULL, &timeval); for (i = 0; i <= maxi; i++) { memset( line, 0, sizeof( line )); if ( (sockfd = client[i].socket) < 0) continue; if (FD_ISSET(sockfd, &rset)) { if( (nLen = RecvPacket( sockfd, line )) <= 0 ) { DeleteLink( &client[i], &allset ); } else { /*处理客户端的请求命令*/ ProcessCmd( line, nLen, &client[i], &allset, maxi ); /*处理客户端的请求命令*/ ProcessCmd( line, nLen, &client[i], &allset, maxi ); } if (--nready <= 0) break; } } /*读取消息队列数据*/ /*发送网站数据*/ /* for( i = 0; i < 2; i++ ) { memset( line, 0, sizeof( line )); sprintf( line, "N%06dWUDY",i); nLen = strlen(line); ProcessCmd( line, nLen, NULL , &allset, maxi); } */ /*发送测试包*/ /* memset( line, 0, sizeof( line )); strcpy( line, "T"); ProcessCmd( line, 1, NULL , &allset, maxi); */ /*转发数据*/ } } int make_child( ) { int pid; pid = fork( ); if( pid < 0 ) { printf( "fork error/n" ); }else if( pid == 0 ) { /* do child*/ } return pid; } int RecvPacket(int sockfd, char* packet) { int n, nPacketLen; char saTmpBuf[ NPACKETHEADLEN+1 ]; if ( (n = read(sockfd, saTmpBuf, NPACKETHEADLEN )) == 0) { return TCP_CLOSE; } else { // process cmd for client nPacketLen = atoi( saTmpBuf ); if( (n = read( sockfd, packet, nPacketLen )) <= 0 ) { return TCP_CLOSE; } if( n < nPacketLen ) { ErrorLog( ERROR,"报文长度不符" ); return TCP_PACKET_AVILD; } } return n; } void DeleteLink( STRULINKLIST* link, fd_set* allset ) { close(link->socket); FD_CLR(link->socket, allset); link->socket = -1; } void HandLink( STRULINKLIST* link, STRUHANDPACKET handPacket) { memset( link->saMerId, 0, sizeof( link->saMerId )); memset( link->saUsrId, 0, sizeof( link->saUsrId )); memcpy( link->saMerId, handPacket.saMerId, NMERIDLEN ); memcpy( link->saUsrId, handPacket.saUsrId, NMERIDLEN ); } void ProcessCmd( char* line , int len , STRULINKLIST* link, fd_set* pset, int maxi ) { char saTmpCmd[ 2 ]; char saPacket[ MAXLINE ]; int i; STRUHANDPACKET StruHandPacket; STRUTESTPACKET StruTestPacket; STRUNOTIFYPACKET StruNotifyPacket; memset( saTmpCmd, 0, sizeof( saTmpCmd )); memset( saPacket, 0, sizeof( saPacket )); memcpy( saTmpCmd, line, 1 ); memcpy( saPacket, line+1, len-1 ); switch( saTmpCmd[0] ) { case 'H': get_packet( saTmpCmd[0], saPacket, &StruHandPacket); HandLink( link, StruHandPacket ); break; case 'C': break; case 'T': get_packet(saTmpCmd[0], saPacket,&StruTestPacket ); memset( saPacket, 0, sizeof( saPacket )); len = pack_packet( saTmpCmd[0], &StruTestPacket, saPacket ); for( i = 0; i <= maxi; i++ ) { { int sockfd; int nLen; sockfd = client[i].socket; if( write( sockfd, saPacket, len ) < 0) { /*发送失败,关闭连接*/ DeleteLink( &client[i] , pset); continue; } } break; case 'N': get_packet(saTmpCmd[0], saPacket,&StruNotifyPacket ); memset( saPacket, 0, sizeof( saPacket )); len = pack_packet( saTmpCmd[0], &StruNotifyPacket, saPacket ); for( i = 0; i <= maxi; i++ ) { if( !strcmp( client[i].saMerId, StruNotifyPacket.saMerId ) && !strcmp( client[i].saUsrId, StruNotifyPacket.saUsrId) && client[i].socket != -1 ) { if( write( client[i].socket, saPacket, len ) < 0) { /*发送失败,关闭连接*/ /*发送失败,关闭连接*/ DeleteLink( &client[i], pset ); continue; } } } break; default: break; } } int pack_packet( char cmd, void* stru, char* packet ) { STRUNOTIFYPACKET StruNotifyPacket; STRUTESTPACKET StruTestPacket; int nLen; switch( cmd ) { case 'H': break; case 'C': break; case 'T': memset( &StruTestPacket, 0, sizeof( STRUTESTPACKET )); memcpy( &StruTestPacket,(STRUTESTPACKET*)stru , sizeof( STRUTESTPACKET ) ); nLen = 1 + strlen(StruTestPacket.saPacket ); sprintf( packet, "%04d%c%s%s", nLen,cmd, StruTestPacket.saPacket); break; case 'N': memset( &StruNotifyPacket, 0, sizeof( STRUNOTIFYPACKET )); memcpy( &StruNotifyPacket, (STRUNOTIFYPACKET*)stru, sizeof(STRUNOTIFYPACKET)); nLen = strlen("N") + strlen( StruNotifyPacket.saMerId ) + strlen( StruNotifyPacket.saUsrId ); sprintf( packet, "%04d%c%s%s", nLen,cmd, StruNotifyPacket.saMerId, StruNotifyPacket.saUsrId ); break; default: break; } return nLen+NPACKETHEADLEN; } void get_packet( char cmd, char* packet, void *stru ) { char saPacket[ NMAXPACKETLEN+1 ]; int nLen; nLen = strlen( packet ); switch( cmd ) { case 'H': memset( (STRUHANDPACKET*)stru, 0, sizeof( STRUHANDPACKET )); memcpy( ((STRUHANDPACKET*)stru)->saHead, packet, NHANDHEADLEN ); memcpy( ((STRUHANDPACKET*)stru)->saMerId, packet+NHANDHEADLEN, NMERIDLEN ); memcpy( ((STRUHANDPACKET*)stru)->saUsrId, packet+NHANDHEADLEN+NMERIDLEN, nLen-NHANDHEADLEN-NMERIDLEN); break; case 'C': break; case 'T': memset( (STRUTESTPACKET*)stru, 0, sizeof( STRUTESTPACKET )); memcpy( ((STRUTESTPACKET*)stru)->saPacket, packet, NTESTPACKETLEN ); break; case 'N': memset( (STRUNOTIFYPACKET*)stru, 0, sizeof( STRUNOTIFYPACKET )); memcpy( ((STRUNOTIFYPACKET*)stru)->saMerId, packet, NMERIDLEN ); memcpy( ((STRUNOTIFYPACKET*)stru)->saUsrId, packet+NMERIDLEN,nLen-NMERIDLEN ); break; default: break; } } void sig_do( int signo ) { int i; for( i = 0 ; i < ChildNum; i++ ) kill(children[i] , SIGTERM); while( wait(NULL) > 0 ) ; if( errno != ECHILD ) printf( "wait error/n" ); exit(0); }