zoukankan      html  css  js  c++  java
  • [原]浅谈几种服务器端模型——反应堆模式(基于epoll的反应堆)

    引言:前面一章简单介绍了关于epoll 的使用方式,这一章介绍一下一个简单的反应堆模型,没有实现超时机制的管理。最主要的是要介绍一下关于异步事件反应堆的设计方式。

    反应堆的模型图在上一张可以看到,但是那个是盗来的一张图,twisted 的反应堆。今天给不熟悉这个部分的朋友介绍一下基于 epoll 的反应堆,过程类似于libevent.

    反应堆可以提供几个操作:

    (0)创建一个反应堆:

    mc_event_base_t * mc_base_new(void) ;
    

    返回一个操作句柄.  

    (1)为某一个需要监听的文件描述符加入回调函数,并注册事件类型。

    int mc_event_set( mc_event_t *ev , short revent , int fd , mc_ev_callback callback , void *args )  ;
    	/*
    	 * Initialize a event , add callback and event type
    	 * if the event exists , this function will change the mode of this event
    	 * and fd 
    	 */
    

     这里的 revent 由宏定义为几种类型:

      

    #define MC_EV_READ     0x0001
    #define MC_EV_WRITE    0x0002
    #define MC_EV_SIGNAL   0x0004
    #define MC_EV_TIMEOUT  0x0008
    #define MC_EV_LISTEN   0x0010
    

    相应的操作可以使用 | 运算来并几个需要监听的事件类型。

    事件类型定义如下:

    typedef struct mc_event_s
    {
    	
    	  struct mc_event_s   *next	   ;
    	 
    	 
    	  struct mc_event_s   *prev    ;
    	
    	 unsigned int min_heap_index  ;
    	 
    	 int ev_fd		;   // file des of event
    	 short revent	;   // event type
    	 
    	 struct timeval  ev_timeval   ; // event timeout time 
    	 mc_ev_callback callback ;// callback of this event 
    	 void  *args 			      ;
    	 int ev_flags 			      ;
    	 
    	 mc_event_base_t	*base	  ;
    }mc_event_t ;
    

    事件结构本身后面解释。 

    (2)将需要监听的并且已经初始化的事件加入反应堆。

    int mc_event_post( mc_event_t *ev , mc_event_base_t * base ) ;
    	/*
    	 * Post this event to event_base 
    	 * struct base has two queue , active queue and added queue
    	 * this function will post event to added queue , but not in active queue
    	 */
    

    将刚才注册了事件类型和回调函数的事件加入 base, 即将其看做一个反应堆。

    (3)最后提供了一个 dispatch 函数,反应堆开始循环,等待事件的发生。如果对应的 fd 上的事件发生,调用相应的回调函数。由第一步注册。

    int mc_dispatch( mc_event_base_t * base ) ;
        /*
         * start loop 
         * and dispatch event by 
         * mc_event_loop
         */
    

    反应堆支持在循环过程中,通过相应的回调函数再注册事件,类似于热加入,热移除。

    实现方式很简单,就是在第一个事件的回调函数上调用 mc_event_set()然后注册。再加入 base.

    base 的结构如下 :

    typedef struct mc_event_base_s
    {
    	void		 * 	added_list		;
    	void	     *	active_list	    ;
    	unsigned int 	event_num		;
    	unsigned int 	event_active_num;
    	
    	/*
    	 *mc_minheap	    minheap			;
    	 */
    	int				epoll_fd	    ;  //for epoll only 
    	int			    ev_base_stop	;
    	int				magic		    ;
    	struct timeval	event_time		;	
    }mc_event_base_t ;
    

    让我们来看一个简单的 demo

    /*_____________________test bellow ______________________*/
    #define mc_sock_fd	int
    
    
    #define DEFAULT_NET	AF_INET
    #define DEFAULT_DATA_GRAM 	SOCK_STREAM
    #define DEFAULT_PORT  		(1115)
    #define	DEFAULT_BACKLOG		(200)
    
    /* simple connection */
    struct _connection
    {
    	int fd 			  ;
    	mc_event_t  read  ;
    	mc_event_t  write ;
    	char buf[1024]    ;
    	mc_event_base_t * base ;
    };
    void setreuseaddr( mc_sock_fd fd )
    {
    	int yes = 1 ;
    	setsockopt( fd , SOL_SOCKET , SO_REUSEADDR , &yes , sizeof(int) );
    }
    int mc_socket()
    {
    	int retsock = socket(DEFAULT_NET,DEFAULT_DATA_GRAM,0) ;
    	if( retsock < 0  )
    	{
    		/* we should add some debug information here
    		fprintf(LOGPATH,"socket error\n");
    		*/
    		return -1 ;
    	}
    	return retsock ;
    }		
    
    int mc_bind(mc_sock_fd listenfd )
    {
    	struct sockaddr_in serveraddr ;
    	bzero(&serveraddr,sizeof(serveraddr));
    
    	serveraddr.sin_family = AF_INET ;
    	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
    	serveraddr.sin_port = htons(DEFAULT_PORT);
    	return bind(listenfd,(struct sockaddr *)&serveraddr , sizeof(serveraddr ));
    }
    
    int mc_isten(mc_sock_fd listenfd)
    {
    	return listen(listenfd,DEFAULT_BACKLOG);
    }
    
    
    
    void handler_accept( int fd , short revent , void *args )
    {
    	struct sockaddr_in in_addr ;
    	size_t in_len ;
    	int s	;
    	int done = 0 ;
    	struct _connection * lc = (struct _connection *)args ;
    	
    	in_len = sizeof( in_addr );
    	mc_setnonblocking(fd) ;
    	while( !done )
    	{
    		s = accept( fd , (struct sockaddr *)&in_addr , &in_len );
    		if( s == -1 )
    		{
    			if( (errno == EAGAIN )|| (errno == EWOULDBLOCK ) )
    			{
    				break;
    			}
    			else
    			{
    				perror("accept");
    				break;
    			}
    		}
    		if( s == 0 )
    		{
    			fprintf(stderr,"Accept a connection on %d \n",fd );
    		}
    		done = 1 ;
    	}
    		mc_setnonblocking(s) ;
    		lc->fd = s ;
    		mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );
    		
    		
    		mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );
    		mc_event_post( &(lc->write) , lc->base );
    		 
    		
    }
    void handler_read( int fd , short revent , void *args )
    {
    	mc_setnonblocking(fd) ;
    	struct _connection * lc ;
    	lc  = (struct _connection *)args ;
    	read( fd , lc->buf , 1024 );
    	mc_event_set( &(lc->write) , MC_EV_WRITE , lc->fd , handler_write , lc );
    }
    
    void handler_write( int fd , short revent , void *args )
    {
    	mc_setnonblocking(fd) ;
    	struct _connection * lc ;
    	lc  = (struct _connection *)args ;
    	write( fd , lc->buf , 1024 );
    	mc_event_set( &(lc->read) , MC_EV_READ , lc->fd , handler_read , lc );
    }
    
    void cab( int fd , short revent , void *args )
    {
    	mc_setnonblocking(fd) ;
    	char buf[1024] = "xx00xx00xx00xx00\n";
    	write(fd,buf,1024);
    }
    int main()
    {
    	mc_event_t mev ;
    	mc_event_base_t  *base = mc_base_new() ;
    	struct _connection lc ;
    	lc.base = base ;
    	
    	int sockfd = mc_socket() ;
    	mc_bind(sockfd);
    	mc_isten(sockfd);
    	
    	mc_event_set( &(lc.read) , MC_EV_READ , sockfd , handler_accept , &lc );
    	mc_event_post( &(lc.read) , base );
    	mc_dispatch(base);
    	return 0;
    }
    

      

    首先:封装的几个套接口操作没有考虑错误处理,作为简单的实例。

    定义了一个 connection 结构,用于表示每一个到来的连接,这里的 struct _connection 中包含读写事件和一个缓冲区,还有指向反应堆的指针和对应注册的fd

    工作过程如下:(集中看  main函数)

    (1)创建一个反应堆。

    (2)实例化一个 connection

    (3)创建套接口,bind,listen 老生常谈,这里就不多说了

    (4)将这个监听套接口注册相应的回调函数,这里我们注册的是 handler_accept() 函数,回调函数类型都是  void *XXX(  int  , short , void *) ;

           当监听套接口发生可读事件时,第一次我们认为是相应的监听套接口得到了新的连接,所以,第一次调用的时候直接调用注册了的回调函数 handler_accept().

    在handler_accept() 函数中,我们为这个连接的读写事件添加了相应的回调函数,并把连接描述符(不是监听描述符)注册到这个上。下次这个套接口可读的时候调用handler_read(),可写的时候调用handler_write(). 如果需要改变状态或改变回调函数,只需要一个状态机或者别的方式来确定需要的回调函数是哪一个,在我们的handler_write() 和 handler_read()中可以改变回调函数,代码所示。

    PS:注意一点的是我们的事件是一个实例,不管是在connection结构中或是自己定义,都需要不断的向操作系统申请空间,如果采用对象池或者connection池的方式,可以减少服务器的负载。

    总结:反应堆模式最基本的操作就是:注册事件(为需要监听的fd加入回调函数)----->将事件加入反应堆------>开始事件循环------>事件发生,调用回调函数。

    异步操作的精髓就是在这里,而不是同步的等待每一个事件。下一章讲解这个反应堆的实现,越来越带感咯.

    文章属原创,转载请注明出处 联系作者: Email:zhangbolinux@sina.com QQ:513364476
  • 相关阅读:
    【Spark学习】Apache Spark部署之Amazon EC2
    【Spark学习】Apache Spark部署之Standalone Mode
    【Spark学习】Apache Spark部署之Spark on YARN
    【Spark学习】使用Maven创建Spark
    【Spark学习】Apache Spark for 第三方Hadoop分发版
    【Spark学习】Apache Spark配置
    【Spark学习】Apache Spark监控与测量
    【Spark学习】Apache Spark集群硬件配置要求
    【Spark学习】Apache Spark作业调度机制
    Navicat For Mysql快捷键
  • 原文地址:https://www.cnblogs.com/Bozh/p/2469990.html
Copyright © 2011-2022 走看看