-
功能:目前只支持对资源的访问.
-
使用的模型:多线程加epoll,与传统的一个连接请求一个线程处理不同的是,这个模型只为那些需要服务的连接请求调用线程进行处理,
-
整个模型的大致流程
- 创建一个线程持对象,将每一个线程池设为脱离线程,这样,在线程结束后,可以自动回收资源,每一个调用线程都在等一个信号,这个线程池有一个工作队列,每往里面加一个连接请求,就让信号量 加一,得到这个信号量的线程就开始处理连接请求。
- 创建一个epoll句柄,先将server socket fd加入到epoll中,进行监听,当server socket fd处于可读状态时,表明有一个连接请求过来(或者是已经连接,然后发了一个请求报文),就调用accept,获取连接的client的fd,将它也加入epoll中,然后初始化一个连接对象。
- 监听epoll中的每一个文件描述符。如果client fd可读(处于EPOLLIN状态),表明客户端发送了一个请求报文,就将这个请求加入到线程池对象的工作队列中,等待处理。
- 如果client fd 可写(处于EPOLLOUT状态),就继续发送数据,(一般处于这个状态,是应为发送的数据大于内核中的写缓冲区,需要分多次发送,所以会有EPOLLOUT这个状态)。
+如果client fd处于异常状态,就关闭连接。
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <cassert>
#include <sys/epoll.h>
#include <semaphore.h>
#include "locker.h"
#include "threadpool.h"
#include "http_conn.h"
#define MAX_FD 65536
#define MAX_EVENT_NUMBER 10000
extern int addfd(int epollfd,int fd,bool one_shot);
extern int removefd(int epollfd,int fd);
void addsig( int sig, void( handler )(int), bool restart = true )
{
struct sigaction sa;
memset( &sa, ' ', sizeof( sa ) );
sa.sa_handler = handler;
if( restart )
{
sa.sa_flags |= SA_RESTART;
}
sigfillset( &sa.sa_mask );
assert( sigaction( sig, &sa, NULL ) != -1 );
}
void show_error( int connfd, const char* info )
{
printf( "%s", info );
send( connfd, info, strlen( info ), 0 ); //向socket中写入出错信息
close( connfd );
}
int main( int argc, char* argv[] )
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_number
", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
addsig( SIGPIPE, SIG_IGN ); //忽略SIGPIPE信号
threadpool< http_conn >* pool = NULL;
try
{
pool = new threadpool< http_conn >;//创建一个线程池对象
}
catch( ... )
{
return 1;
}
http_conn* users = new http_conn[ MAX_FD ]; //定义用户的连接数量
assert( users );
int user_count = 0;
int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
assert( listenfd >= 0 );
struct linger tmp = { 1, 0 };//close()立刻返回,但不会发送未发送完成的数据,而是通过一个REST包强制的关闭socket描述符,即强制退出。
setsockopt( listenfd, SOL_SOCKET, SO_LINGER, &tmp, sizeof( tmp ) );
int ret = 0;
struct sockaddr_in address;
bzero( &address, sizeof( address ) );
address.sin_family = AF_INET;
inet_pton( AF_INET, ip, &address.sin_addr );
address.sin_port = htons( port );
ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
assert( ret >= 0 );
ret = listen( listenfd, 5 );
assert( ret >= 0 );
epoll_event events[ MAX_EVENT_NUMBER ];
int epollfd = epoll_create( 5 );//参数只是起提示作用
assert( epollfd != -1 );
addfd( epollfd, listenfd, false );//把服务器端fd添加到epollfd中
http_conn::m_epollfd = epollfd;//把epoll文件句柄赋值给http_conn类中的静态变量m_epollfd
while( true )
{
int number = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );//等待服务器端fd是否就绪
if ( ( number < 0 ) && ( errno != EINTR ) )
{
printf( "epoll failure
" );
break;
}
for ( int i = 0; i < number; i++ )
{
int sockfd = events[i].data.fd;
if( sockfd == listenfd )//如果服务器端fd就绪表明有人链接
{
struct sockaddr_in client_address;
socklen_t client_addrlength = sizeof( client_address );
int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );//获取客户端fd
if ( connfd < 0 )
{
printf( "errno is: %d
", errno );
continue;
}
if( http_conn::m_user_count >= MAX_FD )//如果连接数以满,就拒绝连接
{
show_error( connfd, "Internal server busy" );
continue;
}
users[connfd].init( connfd, client_address );//初始化一个连接。会将连接用户的fd添加到epoll_fd中
}
else if( events[i].events & ( EPOLLRDHUP | EPOLLHUP | EPOLLERR ) )//如果监听的fd中有人断开连接,就关闭这个链接
{
users[sockfd].close_conn();
}
else if( events[i].events & EPOLLIN ) //如果监听的fd可以读
{
if( users[sockfd].read() )
{
pool->append( users + sockfd );
}
else
{
users[sockfd].close_conn(); //如果读出错,就关闭连接
}
}
else if( events[i].events & EPOLLOUT ) //如果监听的fd可以写
{
if( !users[sockfd].write() )
{
users[sockfd].close_conn();
}
}
else
{}
}
}
close( epollfd );
close( listenfd );
delete [] users;
delete pool;
return 0;
}