zoukankan      html  css  js  c++  java
  • 基于多进程的网络聊天程序

    參考:linux高性能server编程。作者:游双

    程序简单介绍:该程序用了共享内存来实现进程间的同步,因为仅仅是同一时候读取共享内存。所以没实用到锁。该程序的功能是server监听网络连接,当有一个client连接时,server创建一个子进程处理该连接。

    每一个子进程仅仅负责自己的client以及和父进程通信。当子进程从client读取数据后,把数据放到共享内存上,每一个子进程在共享内存上有自己的一段空间。因此不会出现同一时候写。

    放上去后通知父进程,说:共享内存上有新数据到达了,然后父进程通知其它子进程,去到该位置读取数据,把数据发送到自己的client。实现了群聊的效果。该程序对于多进程编程的刚開始学习的人是个不错的样例,写下来是为了让自己熟悉一下。


    server代码:编译的时候须要加上 -lrt选项

    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <errno.h>
    #include <string.h>
    #include <fcntl.h>
    #include <stdlib.h>
    #include <sys/epoll.h>
    #include <signal.h>
    #include <sys/wait.h>
    #include <sys/mman.h>
    #include <sys/stat.h>
    #include <fcntl.h>
    
    #define USER_LIMIT 5
    #define BUFFER_SIZE 1024
    #define FD_LIMIT 65535
    #define MAX_EVENT_NUMBER 1024
    #define PROCESS_LIMIT 65536
    
    /* 处理一个客户端连接的必要数据 */
    struct client_data			
    {
    	sockaddr_in address;
    	int connfd;				/* 客户端的fd  */
    	pid_t pid;				/* 处理这个连接的子进程的pid */
    	int pipefd[2];			/* 和父进程通信用的管道 */
    };
    
    int sig_pipefd[2];//当有信号发生时。用于父进程自己的通信
    char* share_mem;
    int user_count = 0; //当前客户的数量
    client_data* users = 0 ; 
    int* sub_process = 0;
    static const char* shm_name = "/my_share_memory";
    int maxevents = 100;
    bool stop_child = false;
    
    void setnonblock(int fd)
    {
    	int flag = fcntl(fd,F_GETFL);
    	assert(flag != -1);
    	fcntl(fd,F_SETFL,flag | O_NONBLOCK);
    }
    void addfd(int epollfd,int fd)
    {
    	epoll_event ee;
    	ee.data.fd = fd;
    	ee.events = EPOLLIN | EPOLLET;
    	epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ee);
    	setnonblock(fd);
    }
    void sig_handler(int sig)
    {
    	int save_errno = errno;
    	int msg = sig;
    	send(sig_pipefd[1],(char*)&msg,1,0);
    	errno = save_errno;//恢复错误值
    }
    void child_sig_handler(int sig)
    {
    	stop_child = true;
    }
    void addsig(int sig,void (*handler)(int),bool restart = true)
    {
    	struct sigaction sa;
    	memset(&sa,'',sizeof(sa));
    	sa.sa_handler = handler;
    	if(restart)sa.sa_flags |= SA_RESTART;
    	sigfillset(&sa.sa_mask);//作用?
    	assert(sigaction(sig,&sa,NULL) != -1);
    }
    /* 子进程的处理函数。idx表示该子进程处理的客户端连接的编号,users表示全部客户端连接数据的数组,share_mem表示共享内存的起始地址 */
    int run_child(int idx,client_data* users,char* share_mem)
    {
    	int connfd = users[idx].connfd;
    	int pipefd = users[idx].pipefd[1];
    	int epollfd = epoll_create(100);//子进程的事件处理函数
    	assert(epollfd != -1);
    	addfd(epollfd,connfd);//与客户端通信
    	addfd(epollfd,pipefd);//与父进程通信
    	addsig(SIGTERM,child_sig_handler,false);
    	epoll_event events[maxevents];
    
    	int ret;
    	while(!stop_child)
    	{
    		int number = epoll_wait(epollfd,events,maxevents,-1);
    		if(number < 0 && errno != EINTR)
    		{
    			printf("epoll error
    ");
    			break;
    		}
    		int i;
    		for(i = 0;i < number;i++)
    		{
    			int sockfd = events[i].data.fd;
    			if(sockfd == connfd && (events[i].events & EPOLLIN))//客户端发来数据
    			{
    				memset(share_mem+idx*BUFFER_SIZE,'',BUFFER_SIZE);
    				/* 将客户端数据读取到相应的读缓存中。该读缓存是共享内存的一段 */
    				ret = recv(sockfd,share_mem+idx*BUFFER_SIZE,BUFFER_SIZE-1,0);
    				if(ret < 0 && errno != EAGAIN)
    				{
    					printf("recv error
    ");
    					stop_child =  true;
    				}
    				else if(ret == 0)
    				{
    					printf("client close
    ");
    					stop_child = true;
    				}
    				else 
    				{
    					send(pipefd,(char*)&idx,sizeof(idx),0);//告诉父进程,“我”收到数据了
    				}
    			}
    			/* 父进程通知"我"将第client个客户端的数据发送到我负责的客户端 */
    			else if(sockfd == pipefd && (events[i].events & EPOLLIN))
    			{
    				int client = 0;
    				ret = recv(sockfd,(char*)&client,sizeof(client),0);
    				if(ret < 0 && errno != EAGAIN)stop_child = true;
    				else if(ret == 0) stop_child = true;
    				else
    				{
    					send(connfd,share_mem+client*BUFFER_SIZE,BUFFER_SIZE,0);
    				}
    			}
    		}
    	}
    	close(connfd);
    	close(pipefd);
    	close(epollfd);
    	return 0;
    }
    int main(int argc,char* argv[])
    {
    	if(argc != 3)
    	{
    		printf("usage %s server_ip server_port 
    ",basename(argv[0]));
    		return -1;
    	}
    	sockaddr_in server;
    	server.sin_family = AF_INET;
    	inet_pton(AF_INET,argv[1],&server.sin_addr);
    	server.sin_port = htons(atoi(argv[2]));
    	int listenfd = socket(AF_INET,SOCK_STREAM,0);
    	assert(listenfd != -1);
    	int opt = 1;
    	int ret = setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
    	assert(ret == 0);
    	ret = bind(listenfd,(const sockaddr*)&server,sizeof(server));
    	assert(ret != -1);
    	ret = listen(listenfd,100);
    	assert(ret != -1);
    
    	/* 初始化连接池 */
    	user_count = 0;
    	users = new client_data[USER_LIMIT+1];
    	sub_process = new int[PROCESS_LIMIT];
    	int i;
    	for(i = 0; i < PROCESS_LIMIT;++i)
    	{
    		sub_process[i] = -1;
    	}
    
    	/* epoll的初始化 */
    	int epollfd = epoll_create(100);
    	assert(epollfd != -1);
    	addfd(epollfd,listenfd);//监听网络连接port
    
    	ret = socketpair(AF_UNIX,SOCK_STREAM,0,sig_pipefd);//当有信号发生时,用于父进程自己的通信
    	assert(ret != -1);
    	setnonblock(sig_pipefd[1]);//UNIX域套接字的0号port用于信号处理函数
    	addfd(epollfd,sig_pipefd[0]);//主进程监听UNIX域套接字的1号port
    
    	/*  设置信号处理函数 */ 
    	addsig(SIGCHLD,sig_handler);
    	addsig(SIGPIPE,SIG_IGN);
    	addsig(SIGINT,sig_handler);
    	addsig(SIGTERM,sig_handler);
    
    	/* 创建共享内存,用于全部客户socket连接的读缓存 */
    	int shmfd = shm_open(shm_name,O_CREAT|O_RDWR,0666);
    	assert(shmfd != -1);
    	ret = ftruncate(shmfd,USER_LIMIT*BUFFER_SIZE);//设置shmfd的大小
    	assert(ret != -1);
    	share_mem = (char*)mmap(NULL,USER_LIMIT*BUFFER_SIZE,PROT_READ|PROT_WRITE,MAP_SHARED,shmfd,0);
    	assert(share_mem != MAP_FAILED);
    	close(shmfd);
    
    	/* 进入epoll事件循环 */
    	bool stop_server = false;
    	bool terminate  = false;
    	epoll_event events[maxevents];
    	while(!stop_server)
    	{
    		int number = epoll_wait(epollfd,events,maxevents,-1);
    		if(number < 0 && errno != EINTR)
    		{
    			printf("epoll error
    ");
    			break;
    		}
    		for(i = 0;i < number;i++)
    		{
    			int sockfd = events[i].data.fd;
    			/* 新的客户连接 */
    			if(sockfd == listenfd)
    			{
    				sockaddr_in client;
    				socklen_t clilen = sizeof(client);
    				int connfd =  accept(listenfd,(struct sockaddr*)&client,&clilen);
    				if(connfd < 0)
    				{
    					printf("accept error
    ");
    					continue;
    				}
    				if(user_count >= USER_LIMIT)
    				{
    					const char* info = "to many users
    ";
    					printf("%s
    ",info);
    					send(connfd,info,strlen(info),0);
    					close(connfd);
    					continue;
    				}
    
    				/* 保存第user_count 个客户连接的相关数据  */
    				users[user_count].address = client;
    				users[user_count].connfd = connfd;
    				ret = socketpair(AF_UNIX,SOCK_STREAM,0,users[user_count].pipefd);
    				assert( ret != -1);
    
    				pid_t pid = fork();
    				if(pid < 0)
    				{
    					close(connfd);
    					continue;
    				}
    				if(pid == 0)//子进程
    				{
    					close(sig_pipefd[0]);
    					close(sig_pipefd[1]);
    					close(users[user_count].pipefd[0]);//子进程关闭0port
    					close(listenfd);
    					close(epollfd);
    					run_child(user_count,users,share_mem);//子进程的处理函数
    					munmap((void*)share_mem,USER_LIMIT*BUFFER_SIZE);
    					exit(0);
    				}
    				else //父进程
    				{
    					close(users[user_count].pipefd[1]);//父进程关闭1port
    					close(connfd);
    					addfd(epollfd,users[user_count].pipefd[0]);
    					users[user_count].pid = pid;
    					sub_process[pid] = user_count;
    					user_count ++;
    				}
    			}
    			/* 处理信号事件 */
    			else if(sockfd ==sig_pipefd[0] &&  events[i].events & EPOLLIN)
    			{
    				int sig;
    				char signals[1024];
    				ret = recv(sockfd,signals,sizeof(signals),0);
    				if(ret < 0 && ret != EAGAIN)
    				{
    					printf("recv error
    ");
    					continue;
    				}
    				if(ret == 0)continue;
    				for(i = 0; i < ret; ++ i)
    				{
    					switch(signals[i])
    					{
    						case SIGCHLD : //子进程关闭
    						{
    							pid_t pid;
    							int status;
    							while((pid = waitpid(-1,&status,WNOHANG)) > 0)
    							{
    								/* 用子进程的pid获取被关闭的客户端连接的编号 */
    								int del_user = sub_process[pid];
    								sub_process[del_user] = -1;
    								/* 清楚第del_user个客户连接使用的相关数据 */
    								epoll_ctl(epollfd,EPOLL_CTL_DEL,users[del_user].pipefd[0],0);
    								close(users[del_user].pipefd[0]);
    								/* 把最后一个客户连接的信息移动到该位置,用于保证0~user_count-1直接的连接都是活着的 */
    								users[del_user] = users[--user_count];
    								sub_process[users[del_user].pid] = del_user;
    							}
    							if(terminate && user_count == 0) stop_server = true;
    							break;
    						}
    						case SIGINT :
    						case SIGTERM : //结束服务器进程
    						{
    							printf("kill all the child now
    ");
    							for(i = 0 ;i < user_count;++i)
    							{
    								pid_t pid = users[i].pid;
    								kill(pid,SIGTERM);
    							}
    							terminate = true;//此处不是stop_sever是为了等待全部子进程结束后再结束
    							break;
    						}
    						default : break;
    					}
    				}
    			}
    			/* 某个子进程向父进程写入了数据 */
    			else if(events[i].events & EPOLLIN)
    			{
    				int child;
    				ret = recv(sockfd,(char*)&child,sizeof(child),0);
    				if(ret < 0 && errno != EAGAIN) continue;
    				else if(ret == 0)continue;
    				printf("read data from child accross pipe
    ");
    				for(i = 0 ;i < user_count;i++)
    				{
    					if(i != child)
    					{
    						printf("send data to child accross pipe
    ");
    						send(users[i].pipefd[0],(char*)&child,sizeof(child),0);
    					}
    				}
    			}
    		}
    	}
    	close(listenfd);
    	close(epollfd);
    	close(sig_pipefd[0]);
    	close(sig_pipefd[1]);
    	shm_unlink(shm_name);
    	delete[] users;
    	delete[] sub_process;
    	return 0;
    }


    client代码(比較简单。就没有给出凝视):
    #define _GNU_SOURCE 1
    #include <sys/types.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <assert.h>
    #include <stdio.h>
    #include <unistd.h>
    #include <string.h>
    #include <stdlib.h>
    #include <poll.h>
    #include <fcntl.h>
    
    #define BUFFER_SIZE 64
    
    int main( int argc, char* argv[] )
    {
        if( argc <= 2 )
        {
            printf( "usage: %s ip_address port_number
    ", basename( argv[0] ) );
            return 1;
        }
        const char* ip = argv[1];
        int port = atoi( argv[2] );
    
        struct sockaddr_in server_address;
        bzero( &server_address, sizeof( server_address ) );
        server_address.sin_family = AF_INET;
        inet_pton( AF_INET, ip, &server_address.sin_addr );
        server_address.sin_port = htons( port );
    
        int sockfd = socket( PF_INET, SOCK_STREAM, 0 );
        assert( sockfd >= 0 );
        if ( connect( sockfd, ( struct sockaddr* )&server_address, sizeof( server_address ) ) < 0 )
        {
            printf( "connection failed
    " );
            close( sockfd );
            return 1;
        }
    
        pollfd fds[2];
        fds[0].fd = 0;
        fds[0].events = POLLIN;
        fds[0].revents = 0;
        fds[1].fd = sockfd;
        fds[1].events = POLLIN | POLLRDHUP;
        fds[1].revents = 0;
        char read_buf[BUFFER_SIZE];
        int pipefd[2];
        int ret = pipe( pipefd );
        assert( ret != -1 );
    
        while( 1 )
        {
            ret = poll( fds, 2, -1 );
            if( ret < 0 )
            {
                printf( "poll failure
    " );
                break;
            }
    
            if( fds[1].revents & POLLRDHUP )
            {
                printf( "server close the connection
    " );
                break;
            }
            else if( fds[1].revents & POLLIN )
            {
                memset( read_buf, '', BUFFER_SIZE );
                int len = recv( fds[1].fd, read_buf, BUFFER_SIZE-1, 0 );
    			int i;
    		    for(i = 0;i<len;i++)printf("%c",read_buf[i]);
            }
    
            if( fds[0].revents & POLLIN )
            {
                ret = splice( 0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
                ret = splice( pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE );
            }
        }
        
        close( sockfd );
        return 0;
    }


  • 相关阅读:
    iphone界面详解
    Spring jdbcTemplate.queryForInt(sql)的奇怪问题,呵呵
    BCP 高效批量导入
    eclipse中javascript显示为乱码的解决办法
    spring jdbcTemplate返回RS
    Spring IOC DI 形象理解
    MOSS 2007 文档库事件处理
    showModalDialog和showModelessDialog使用心得
    XMLHTTP.open权限不够的解决
    体现JAVA中的面向对象思想,接口(抽象类)的用处 :饲养员给动物喂食物
  • 原文地址:https://www.cnblogs.com/mthoutai/p/6808296.html
Copyright © 2011-2022 走看看