zoukankan      html  css  js  c++  java
  • Linux网络编程——进程池实现过程详解(2)

    进程池功能升级:

    • send_recv_syn同步机制
    • 自定义设置函数recvCycle,确保双方收发机制正常
    • 客户端中显示下载进度 time/slice两种方法
    • 设置异常情况
      • 客户端在下载中突然断开,原先的服务端会一直死循环打印
      • 服务器突然断开,客户端全部死循环
      • 服务器断开后,再次执行同一端口会出现异常
    • 服务器要升级,通知客户端有序退出
      • 如果业务不重要,直接暴力kill
      • 如果业务重要,需要在子进程完成任务后退出
        • sigprocmask屏蔽信号加保护+sigprocmask解除保护
        • 同步退出机制
    • 服务端因其他因素挂掉后自动重启的设计方法

    1、 send_recv_syn 同步机制

    设置发送端1000,接收端1500,但接收端因为网络原因,不能保证每个包都是接收到1500字节。

    tcp_client.c

    #include <func.h>
    #define N 1048576
    int main(int argc,char* argv[])
    {
    	ARGS_CHECK(argc,3);
    	int socketFd;
    	socketFd=socket(AF_INET,SOCK_STREAM,0);
    	ERROR_CHECK(socketFd,-1,"socket");
    	struct sockaddr_in ser;
    	bzero(&ser,sizeof(ser));
    	ser.sin_family=AF_INET;
    	ser.sin_port=htons(atoi(argv[2]));
    	ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序
    	int ret;
    	ret=connect(socketFd,(struct sockaddr*)&ser,sizeof(ser));
    	ERROR_CHECK(ret,-1,"connect");
    	printf("connect success
    ");
    	int i;
    	int total=0;
        char buf[1000]={0};
    	for(i=0;i<N;i++)
    	{
    		ret=send(socketFd,buf,sizeof(buf),MSG_DONTWAIT);
    		if(-1==ret)
    		{
    			//printf("errno=%d
    ",errno);
    			printf("total=%d
    ",total);
    			return -1;
    		}
    		total += ret;
            printf("ret = %d
    ", ret);
    	}
        printf("total send = %d
    ", total);
    	close(socketFd);
    }
    

    tcp_server.c

    #include <func.h>
    
    int main(int argc,char* argv[])
    {
        ARGS_CHECK(argc,3);
        int socketFd;
        socketFd=socket(AF_INET,SOCK_STREAM,0);
        ERROR_CHECK(socketFd,-1,"socket");
        struct sockaddr_in ser;
        bzero(&ser,sizeof(ser));
        ser.sin_family=AF_INET;
        ser.sin_port=htons(atoi(argv[2]));
        ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序
        int ret;
        ret=bind(socketFd,(struct sockaddr*)&ser,sizeof(ser));
        ERROR_CHECK(ret,-1,"bind");
        listen(socketFd,10);//缓冲区的大小,一瞬间能够放入的客户端连接信息
        int new_fd;
        struct sockaddr_in client;
        bzero(&client,sizeof(client));
        int addrlen=sizeof(client);
        new_fd=accept(socketFd,(struct sockaddr*)&client,&addrlen);
        ERROR_CHECK(new_fd,-1,"accept");
        printf("client ip=%s,port=%d
    ",inet_ntoa(client.sin_addr),ntohs(client.sin_port));
        char buf[1500]={0};
        int total = 0;
        while(1){
            ret = recv(new_fd, buf, sizeof(buf), 0);
            total = total + ret;
            printf("ret = %d
    ", ret);
        };
        printf("total recv = %d
    ", total);
        close(new_fd);
        close(socketFd);
    }
    

    2、自定义设置函数recvCycle改进问题

    问题:在传输大文件的时候会造成服务器端断开,原因是客户端发送buf1000个字节,客户端由于网络问题不一定会收到这么多,最终导致服务器端瘫痪。

    #include "function.h"
    
    int recvCycle(int newFd, void* p, int len){
        int total = 0;
        int ret;
        char *pStart = (char*)p;   
        while(total < len){
            ret = recv(newFd, pStart + total, len - total, 0);
            total = total + ret;
        }
        return 0;
    }
    

    3、客户端中显示下载进度 time/slice两种方法

    time型

    #include "function.h"
    
    int main(int argc,char* argv[])
    {
    	ARGS_CHECK(argc,3);
    	int socketFd;
    	socketFd=socket(AF_INET,SOCK_STREAM,0);
    	ERROR_CHECK(socketFd,-1,"socket");
    	struct sockaddr_in ser;
    	bzero(&ser,sizeof(ser));
    	ser.sin_family=AF_INET;
    	ser.sin_port=htons(atoi(argv[2]));
    	ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序
    	int ret;
    	ret=connect(socketFd,(struct sockaddr*)&ser,sizeof(ser));
    	ERROR_CHECK(ret,-1,"connect");
    	printf("connect success
    ");
    	int dataLen;
    	char buf[1000]={0};
        //接收文件名
    	recvCycle(socketFd,&dataLen,4);
    	recvCycle(socketFd,buf,dataLen);
    	int fd;
    	fd=open(buf,O_CREAT|O_WRONLY,0666);
    	ERROR_CHECK(fd,-1,"open");
        //接文件大小
        off_t fileSize = 0;//文件大小off_t 长整型
    	recvCycle(socketFd,&dataLen,4);
    	recvCycle(socketFd,&fileSize,dataLen);
        time_t start, now;
    	//接受文件内容
        start = now = time(NULL);
        int downLoadSize = 0;
        while(1)
    	{
    		recvCycle(socketFd,&dataLen,4);
    		if(dataLen>0)
    		{
    			recvCycle(socketFd,buf,dataLen);
    			write(fd,buf,dataLen);
                downLoadSize+=dataLen;
                time(&now);
                if(now-start>=1){
                    printf("
    %5.2f%%", (float)downLoadSize / fileSize * 100);
                    fflush(stdout);
                    start = now;
                }
    		}else{
                printf("
    100%%         
    ");
    			break;
    		}
    	}
    	close(fd);
    	close(socketFd);
    	return 0;
    }
    

    slice型

    #include "function.h"
    
    int main(int argc,char* argv[])
    {
    	ARGS_CHECK(argc,3);
    	int socketFd;
    	socketFd=socket(AF_INET,SOCK_STREAM,0);
    	ERROR_CHECK(socketFd,-1,"socket");
    	struct sockaddr_in ser;
    	bzero(&ser,sizeof(ser));
    	ser.sin_family=AF_INET;
    	ser.sin_port=htons(atoi(argv[2]));
    	ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序
    	int ret;
    	ret=connect(socketFd,(struct sockaddr*)&ser,sizeof(ser));
    	ERROR_CHECK(ret,-1,"connect");
    	printf("connect success
    ");
    	int dataLen;
    	char buf[1000]={0};
        //接收文件名
    	recvCycle(socketFd,&dataLen,4);
    	recvCycle(socketFd,buf,dataLen);
    	int fd;
    	fd=open(buf,O_CREAT|O_WRONLY,0666);
    	ERROR_CHECK(fd,-1,"open");
        //接文件大小
        off_t fileSize = 0, oldSize = 0, sliceSize;//文件大小off_t 长整型
    	off_t downLoadSize = 0;
        recvCycle(socketFd,&dataLen,4);
    	recvCycle(socketFd,&fileSize,dataLen);
    	//接受文件内容
        sliceSize = fileSize / 10000;
        while(1)
    	{
    		recvCycle(socketFd,&dataLen,4);
    		if(dataLen>0)
    		{
    			recvCycle(socketFd,buf,dataLen);
    			write(fd, buf, dataLen);
                downLoadSize+=dataLen;
                if(downLoadSize - oldSize > sliceSize){
                    printf("
    %5.2f%%", (float)downLoadSize / fileSize * 100);
                    fflush(stdout);
                    oldSize = downLoadSize;
                }
    		}else{
                printf("
    100%%         
    ");
    			break;
    		}
    	}
    	close(fd);
    	close(socketFd);
    	return 0;
    }
    

    4、设置异常情况

    (1)客户端在下载中突然断开,原先的服务端会一直死循环打印

    父进程处于S和R状态来回切换,因为子进程断开后处于Z状态(僵尸状态),而fd未关闭会一直使epoll_wait的返回值为1。

    原因:

    (a)epoll_wait是每次监控到僵尸子进程为空闲。

    (b)父进程和子进程分别掌握管道的一端,当子进程(客户端)关闭后,管道会被标记为一直可读,于是会main函数会一直在while(1)中不退出,read返回值为0,并一直打印 "child is not busy"。

    方法:

    修改tran_n.c,设置当send返回值为-1时及时退出。

    tran_n.c

    #include "function.h"
    
    int tranFile(int newFd){
        Train_t train;
        int ret;
        //发送文件名
        train.dataLen = strlen(FILENAME);
        strcpy(train.buf, FILENAME);
        int fd = open(FILENAME, O_RDONLY);
        ret = send(newFd, &train, 4 + train.dataLen, 0);
        ERROR_CHECK(ret, -1, "send");
        //发文件大小
        struct stat buf;
        fstat(fd, &buf);
        train.dataLen = sizeof(buf.st_size);
        memcpy(train.buf, &buf.st_size, train.dataLen);
        ret = send(newFd, &train, 4 + train.dataLen, 0);
        ERROR_CHECK(ret, -1, "send");
        //发文件内容 
        while((train.dataLen = read(fd, train.buf, sizeof(train.buf)))){
            ret = send(newFd, &train, 4 + train.dataLen, 0);
            ERROR_CHECK(ret, -1, "send");
        }
        //发送结束标志
        ret = send(newFd, &train, 4, 0);
        ERROR_CHECK(ret, -1, "send");
        return 0;
    }
    

    修改后:

    (2)服务器突然断开,客户端全部死循环

    原因:recv返回值一直为0,total的值一直小于len。

    方法:需要在函数后判断返回值是否为0,如果为0直接break返回-1

    修改后:

    (3)在改进(2)问题后,服务器断开后,再次执行./process_pool_server 192.168.3.160 2000 5会出现如下异常:

    原因:TCP断开连接后,恢复同一个端口需要一定时间,此时如果直接再重连同个端口就会出现上面的错误

    方法:端口复用。在tcpInit中修改代码,设置setsockopt参数reuse即可。

    #include "function.h"
    
    int tcpInit(int *pSocketFd, char *ip, char *port){
    	int socketFd;
    	socketFd = socket(AF_INET, SOCK_STREAM, 0);
    	ERROR_CHECK(socketFd, -1, "socket");
    	int ret, reuse = 1;
        ret = setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int));
    	ERROR_CHECK(ret, -1, "setsockopt");
        struct sockaddr_in ser;
    	bzero(&ser, sizeof(ser));
    	ser.sin_family = AF_INET;
    	ser.sin_port = htons(atoi(port));
    	ser.sin_addr.s_addr = inet_addr(ip);//点分十进制转为32位的网络字节序
    	ret = bind(socketFd, (struct sockaddr*)&ser, sizeof(ser));
    	ERROR_CHECK(ret, -1, "bind");
    	listen(socketFd, 10);//缓冲区的大小,一瞬间能够放入的客户端连接信息
    	*pSocketFd = socketFd;
    	return 0;
    }
    

    5、服务器要升级,通知客户端有序退出

    方法:异步拉起同步方法,设置一个管道,在同一进程中既可读又可写,当信号产生时写管道exitFds[1],并让exitFds[0]加入epoll监控events的集合,监控是否有管道中的读描述符是否可读(exitFds[0])。

    (a)如果业务不重要,直接暴力kill

    main.c

    #include "function.h"
    
    int exitFds[2]; //全局变量,用于信号处理的管道,同一管道的目的是在同一进程中既可读又可写
    
    void sigFunc(int sigNum){
        write(exitFds[1], &sigNum, 1);//写一个字节
    }
    
    int main(int argc, char* argv[]){
    	ARGS_CHECK(argc, 4);
        pipe(exitFds);
        signal(SIGUSR1, sigFunc);
    	int ret;
    	int childNum = atoi(argv[3]);
    	Process_Data *pChild = (Process_Data*)calloc(childNum, sizeof(Process_Data));
    	makeChild(pChild, childNum);//创建子进程
    	int socketFd;
    	tcpInit(&socketFd, argv[1], argv[2]);//建立TCP连接
    	int epfd;
    	epfd = epoll_create(1);//创建一个句柄,占用一个文件描述符,参数表示需要监控的数目
    	struct epoll_event event, *evs;
    	evs = (struct epoll_event*)calloc(childNum + 2, sizeof(struct epoll_event));
    	event.events = EPOLLIN;
    	event.data.fd = socketFd;
    	ret = epoll_ctl(epfd, EPOLL_CTL_ADD, socketFd, &event);//监听socketFd
    	ERROR_CHECK(ret, -1, "epoll_ctl");
        event.data.fd = exitFds[0];
        ret = epoll_ctl(epfd, EPOLL_CTL_ADD, exitFds[0], &event);
    	ERROR_CHECK(ret, -1, "epoll_ctl");
    	int i, j;
    	for(i = 0; i < childNum ; i++){
    		event.data.fd = pChild[i].fd;
    		ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pChild[i].fd, &event);//将要监控的子进程的fd加入
    		ERROR_CHECK(ret, -1, "epoll_ctl");
    	}
    	int readyFdNum; 
    	int newFd;
    	//int count = 0;
        while(1){
    		readyFdNum = epoll_wait(epfd, evs, childNum + 2, -1);
            //printf("count = %d, readyFdNum = %d
    ",count++, readyFdNum);
            //epoll_wait等待事件的产生,参数evs用来从内核得到事件的集合
            //用childNum + 1告知内核这个events有多大
    		for(i = 0; i < readyFdNum; i++){
                //有客户端连入
    			if(evs[i].events == EPOLLIN && evs[i].data.fd == socketFd){
    				//断开后不会进入该循环
                    newFd = accept(socketFd, NULL, NULL);//不保存远程主机信息
    				for(j = 0; j < childNum; j++){
    					if(!pChild[j].busy){ //找到非忙碌的子进程,发任务(文件描述符)
    				        sendFd(pChild[j].fd, newFd);
                            pChild[j].busy = 1;
    						printf("%d child is busy
    ", pChild[j].pid);
    						break;
    					}
    				}	
    				close(newFd);   //限制客户端只下载一次后就关闭,否则newFd的引用计数为2,即父进程和子进程都可以读取数据
    			}
                if(evs[i].events == EPOLLIN && evs[i].data.fd == exitFds[0]){
                    printf("start exit
    ");
                    close(socketFd);
                    //两种方式:1、暴力Kill 2、同步退出机制
                    //暴力kill
                    for(j = 0;j < childNum; j++){
                        kill(pChild[j].pid, 9);
                    }
                    for(j = 0;j < childNum; j++){
                        wait(NULL);
                    }
                    return 0;
                }
    			for(j = 0; j < childNum; j++){
    				if(evs[i].data.fd == pChild[j].fd){  //遍历所有子进程的fd
                        //判断就绪描述符是哪个子进程的管道对端,说明子进程已完成任务,就将对应子进程标记为非忙碌,并读出管道内容。
    					//printf("%d %d
    ",evs[i].data.fd, pChild[j].fd);
                        read(pChild[j].fd, &ret, 1);//对端写一个字节这边读一个字节标记已完成任务,如果数据不读出,则会一直可读状态
                 //       if(0 == ret2){
                 //           return -1;
                 //       }
                        pChild[j].busy = 0;
                        //printf("ret2 = %d
    ", ret2);
    					printf("%d child is not busy
    ", pChild[j].pid);
    				}
    			}
               // printf("%d
    ", count++);
               // sleep(3);
    		}
    	}
    	return 0;
    }
    
    

    用10号信号Kill父进程及父进程状况:

    子进程状况:

    (b)如果业务重要,需要在子进程完成任务后再退出

    方法一:sigprocmask屏蔽信号加保护 ..........................sigprocmask解除保护

    方法二:同步退出机制,设置一个exitFlag,某个子进程完成当前任务后下次接收一个指定fd和exitFlag,若exitFlag为1则继续执行任务,如果exitFlag为0则有序退出,最大退出时间是所有子进程执行完当前任务的剩余最大时长。

    效果如下:

    具体代码见最后完整版。

    6、服务端因其他因素挂掉后自动重启的设计方法

    方法:父进程监控子进程,在子进程非正常退出后父进程重新启用子进程

    这里用三个名字来代表各个亲缘关系:爷进程、父进程、5个子进程

    while(fork()) //爷进程进入循环
    {
       int status;
       wait(&status); //获取退出码
       if(WIFEXITED(status)) //如果父进程是正常退出
       {
          printf("child exit normal
    ");
          exit(0);//爷进程退出
       }
       //父进程非正常退出,重新回while循环创建父进程
    }
    

    效果如下:

    当process_pool_server执行后,会出现1+1+5个进程,这时候给父进程发送信号通知其退出(不是给爷进程发信号)

    function.h

    #include <errno.h>
    #include <sys/epoll.h>
    #include <netdb.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <pthread.h>
    #include <setjmp.h>
    #include <signal.h>
    #include <sys/msg.h>
    #include <strings.h>
    #include <sys/sem.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/wait.h>
    #include <syslog.h>
    #include <sys/select.h>
    #include <sys/time.h>
    #include <sys/mman.h>
    #include <pwd.h>
    #include <grp.h>
    #include <time.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <sys/stat.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <dirent.h>
    #include <fcntl.h>
    #define ARGS_CHECK(argc,val) {if(argc!=val) 
    	{printf("error args
    ");return -1;}}
    #define ERROR_CHECK(ret,retval,funcName) {if(ret==retval) 
    	{printf("LINE %d Function ERROR ", __LINE__);fflush(stdout);perror(funcName);return -1;}}
    #define THREAD_ERROR_CHECK(ret,funcName) {if(ret!=0) 
    	{printf("%s:%s
    ",funcName,strerror(ret));return -1;}}
    //管理每一个子进程的数据结构
    typedef struct{
    	pid_t pid;//子进程的pid
    	int fd;//子进程的管道对端
    	short busy;//子进程是否忙碌,0代表非忙碌,1代表忙碌
    }Process_Data;
    
    typedef struct{
        int dataLen;
        char buf[1000];
    }Train_t;
    #define FILENAME "file"
    int makeChild(Process_Data*,int);
    int tcpInit(int*, char*, char*);
    int childHandle(int);
    int sendFd(int, int, int);
    int recvFd(int, int*, int*);
    int tranFile(int);
    

    main.c

    #include "function.h"
    
    int exitFds[2]; //全局变量,用于信号处理的管道,同一管道的目的是在同一进程中既可读又可写
    
    void sigFunc(int sigNum){
        write(exitFds[1], &sigNum, 1);//写一个字节
    }
    
    int main(int argc, char* argv[]){
        while(fork()) //爷进程进入循环
        {
            int status;
            wait(&status); //获取退出码
            if(WIFEXITED(status)) //如果父进程是正常退出
            {
                printf("child exit normal
    ");
                exit(0);//爷进程退出
    
            }
            //父进程非正常退出,重新回while循环创建父进程
        }
        ARGS_CHECK(argc, 4);
        pipe(exitFds);
        signal(SIGUSR1, sigFunc);
        int ret;
        int childNum = atoi(argv[3]);
        Process_Data *pChild = (Process_Data*)calloc(childNum, sizeof(Process_Data));
        makeChild(pChild, childNum);//创建子进程
        int socketFd;
        tcpInit(&socketFd, argv[1], argv[2]);//建立TCP连接
        int epfd;
        epfd = epoll_create(1);//创建一个句柄,占用一个文件描述符,参数表示需要监控的数目
        struct epoll_event event, *evs;
        evs = (struct epoll_event*)calloc(childNum + 2, sizeof(struct epoll_event));
        event.events = EPOLLIN;
        event.data.fd = socketFd;
        ret = epoll_ctl(epfd, EPOLL_CTL_ADD, socketFd, &event);//监听socketFd
        ERROR_CHECK(ret, -1, "epoll_ctl");
        event.data.fd = exitFds[0];
        ret = epoll_ctl(epfd, EPOLL_CTL_ADD, exitFds[0], &event);
        ERROR_CHECK(ret, -1, "epoll_ctl");
        int i, j;
        for(i = 0; i < childNum ; i++){
            event.data.fd = pChild[i].fd;
            ret = epoll_ctl(epfd, EPOLL_CTL_ADD, pChild[i].fd, &event);//将要监控的子进程的fd加入
            ERROR_CHECK(ret, -1, "epoll_ctl");
        }
        int readyFdNum; 
        int newFd;
        //int count = 0;
        while(1){
            readyFdNum = epoll_wait(epfd, evs, childNum + 2, -1);
            //printf("count = %d, readyFdNum = %d
    ",count++, readyFdNum);
            //epoll_wait等待事件的产生,参数evs用来从内核得到事件的集合
            //用childNum + 1告知内核这个events有多大
            for(i = 0; i < readyFdNum; i++){
                //有客户端连入
                if(evs[i].events == EPOLLIN && evs[i].data.fd == socketFd){
                    //断开后不会进入该循环
                    newFd = accept(socketFd, NULL, NULL);//不保存远程主机信息
                    for(j = 0; j < childNum; j++){
                        if(!pChild[j].busy){ //找到非忙碌的子进程,发任务(文件描述符)
                            sendFd(pChild[j].fd, newFd, 1);
                            pChild[j].busy = 1;
                            printf("%d child is busy
    ", pChild[j].pid);
                            break;
                        }
                    }	
                    close(newFd);   //限制客户端只下载一次后就关闭,否则newFd的引用计数为2,即父进程和子进程都可以读取数据
                }
                if(evs[i].events == EPOLLIN && evs[i].data.fd == exitFds[0]){
                    printf("start exit
    ");
                    close(socketFd);
                    //两种方式:1、暴力Kill 2、同步退出机制
                    //暴力kill
                    for(j = 0;j < childNum; j++){
                        //kill(pChild[j].pid, 9);
                        sendFd(pChild[j].fd, 0, 0);//给所有的子进程发送0号描述符(没啥用),exitFlag为0
                    }
                    for(j = 0;j < childNum; j++){
                        wait(NULL);
                    }
                    return 0;
                }
                for(j = 0; j < childNum; j++){
                    if(evs[i].data.fd == pChild[j].fd){  //遍历所有子进程的fd
                        //判断就绪描述符是哪个子进程的管道对端,说明子进程已完成任务,就将对应子进程标记为非忙碌,并读出管道内容。
                        //printf("%d %d
    ",evs[i].data.fd, pChild[j].fd);
                        read(pChild[j].fd, &ret, 1);//对端写一个字节这边读一个字节标记已完成任务,如果数据不读出,则会一直可读状态
                        //       if(0 == ret2){
                        //           return -1;
                        //       }
                        pChild[j].busy = 0;
                        //printf("ret2 = %d
    ", ret2);
                        printf("%d child is not busy
    ", pChild[j].pid);
                    }
                }
                // printf("%d
    ", count++);
                // sleep(3);
            }
        }
        return 0;
    }
    

    child.c

    #include "function.h"
    //服务端
    int makeChild(Process_Data *pChild, int childNum)
    {
    	int i;
    	pid_t pid;
    	int fds[2];
    	int ret;
    	for(i=0;i < childNum;i++)
    	{
    		//初始化socketpair类型描述符,与pipe不同,每一端既可读又可写
    		ret = socketpair(AF_LOCAL, SOCK_STREAM, 0, fds);
    		ERROR_CHECK(ret, -1, "socketpair");
    		pid = fork();
    		if(0 == pid)  //子进程
    		{
    			close(fds[1]); 
    			ret = childHandle(fds[0]);
                if(-1 == ret){
                    return -1;
                }
    		}
    		close(fds[0]); //父进程
    		pChild[i].pid = pid;
    		pChild[i].fd = fds[1];
    		pChild[i].busy = 0;
    	}
    	return 0;
    }
    
    int childHandle(int fd)
    {
        int ret;
    	int newFd;
        int exitFlag;
    	while(1){
            //开5个子进程,newFd为10,因内核控制信息,父子进程共享同一块文件描述符
    		recvFd(fd, &newFd, &exitFlag);//接收到任务
    #ifdef DEBUG
            printf("newFd = %d
    ", newFd);
    #endif
            if(exitFlag){
                ret = tranFile(newFd);//给客户端发送文件
                printf("I get task %d
    ", newFd);
                if(-1 == ret){
                    printf("tranFile not finish!
    ");
                    close(newFd);
                    continue;
                }
            	//newFd的值为10,socketFd为3,有五个子进程管道,为4-8,epfd也占1
            	printf("finish send file
    ");
                close(newFd);
            }
            else{
                exit(0);//不能用break
            }
     //       if(0 == newFd){
     //           printf("conncect failed!
    ");
     //           return -1;
     //       }
    		write(fd, &newFd, 1); //通知父进程非忙碌,写一个字节即可
    	}
    }
    

    tran_file.c

    #include "function.h"
    
    int tranFile(int newFd){
        Train_t train;
        int ret;
        //发送文件名
        train.dataLen = strlen(FILENAME);
        strcpy(train.buf, FILENAME);
        int fd = open(FILENAME, O_RDONLY);
        ret = send(newFd, &train, 4 + train.dataLen, 0);
        ERROR_CHECK(ret, -1, "send");
        //发文件大小
        struct stat buf;
        fstat(fd, &buf);
        train.dataLen = sizeof(buf.st_size);
        memcpy(train.buf, &buf.st_size, train.dataLen);
        ret = send(newFd, &train, 4 + train.dataLen, 0);
        ERROR_CHECK(ret, -1, "send");
        //发文件内容 
        while((train.dataLen = read(fd, train.buf, sizeof(train.buf)))){
            ret = send(newFd, &train, 4 + train.dataLen, 0);
            ERROR_CHECK(ret, -1, "send");
        }
        //发送结束标志
        ret = send(newFd, &train, 4, 0);
        ERROR_CHECK(ret, -1, "send");
        return 0;
    }
    

    tcpInit.c

    #include "function.h"
    
    int tcpInit(int *pSocketFd, char *ip, char *port){
    	int socketFd;
    	socketFd = socket(AF_INET, SOCK_STREAM, 0);
    	ERROR_CHECK(socketFd, -1, "socket");
    	int ret, reuse = 1;
        ret = setsockopt(socketFd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int));
        ERROR_CHECK(ret, -1, "setsockopt");
        struct sockaddr_in ser;
    	bzero(&ser, sizeof(ser));
    	ser.sin_family = AF_INET;
    	ser.sin_port = htons(atoi(port));
    	ser.sin_addr.s_addr = inet_addr(ip);//点分十进制转为32位的网络字节序
    	ret = bind(socketFd, (struct sockaddr*)&ser, sizeof(ser));
    	ERROR_CHECK(ret, -1, "bind");
    	listen(socketFd, 10);//缓冲区的大小,一瞬间能够放入的客户端连接信息
    	*pSocketFd = socketFd;
    	return 0;
    }
    

    send_fd.c

    #include "function.h"
    int sendFd(int sfd,int fd, int exitFlag)
    {
    	struct msghdr msg;
    	memset(&msg,0,sizeof(msg));
    	struct iovec iov[2];
    	char buf2[10]="world";
    	iov[0].iov_base= &exitFlag;
    	iov[0].iov_len=5;
    	iov[1].iov_base=buf2;
    	iov[1].iov_len=5;
    	msg.msg_iov=iov;
    	msg.msg_iovlen=2;
    	struct cmsghdr *cmsg;
    	int len=CMSG_LEN(sizeof(int));
    	cmsg=(struct cmsghdr *)calloc(1,len);
    	cmsg->cmsg_len=len;
    	cmsg->cmsg_level=SOL_SOCKET;
    	cmsg->cmsg_type=SCM_RIGHTS;
    	*(int*)CMSG_DATA(cmsg)=fd;
    	msg.msg_control=cmsg;
    	msg.msg_controllen=len;
    	int ret;
    	ret=sendmsg(sfd,&msg,0);
    	ERROR_CHECK(ret,-1,"sendmsg");
    	return 0;
    }
    int recvFd(int sfd,int *fd, int *exitFlag)
    {
    	struct msghdr msg;
    	memset(&msg,0,sizeof(msg));
    	struct iovec iov[2];
    	char buf2[10];
    	iov[0].iov_base=exitFlag;
    	iov[0].iov_len=5;
    	iov[1].iov_base=buf2;
    	iov[1].iov_len=5;
    	msg.msg_iov=iov;
    	msg.msg_iovlen=2;
    	struct cmsghdr *cmsg;
    	int len=CMSG_LEN(sizeof(int));
    	cmsg=(struct cmsghdr *)calloc(1,len);
    	cmsg->cmsg_len=len;
    	cmsg->cmsg_level=SOL_SOCKET;
    	cmsg->cmsg_type=SCM_RIGHTS;
    	msg.msg_control=cmsg;
    	msg.msg_controllen=len;
    	int ret;
    	ret=recvmsg(sfd,&msg,0);
    	ERROR_CHECK(ret,-1,"sendmsg");
    	*fd=*(int*)CMSG_DATA(cmsg);
    	return 0;
    }
    

    客户端

    client.c

    #include "function.h"
    
    int main(int argc,char* argv[])
    {
    	ARGS_CHECK(argc,3);
    	int socketFd;
    	socketFd=socket(AF_INET,SOCK_STREAM,0);
    	ERROR_CHECK(socketFd,-1,"socket");
    	struct sockaddr_in ser;
    	bzero(&ser,sizeof(ser));
    	ser.sin_family=AF_INET;
    	ser.sin_port=htons(atoi(argv[2]));
    	ser.sin_addr.s_addr=inet_addr(argv[1]);//点分十进制转为32位的网络字节序
    	int ret;
    	ret=connect(socketFd,(struct sockaddr*)&ser,sizeof(ser));
    	ERROR_CHECK(ret,-1,"connect");
    	printf("connect success
    ");
    	int dataLen;
    	char buf[1000]={0};
        //接收文件名
    	recvCycle(socketFd,&dataLen,4);
    	recvCycle(socketFd,buf,dataLen);
    	int fd;
    	fd=open(buf,O_CREAT|O_WRONLY,0666);
    	ERROR_CHECK(fd,-1,"open");
        //接文件大小
        off_t fileSize = 0, oldSize = 0, sliceSize;//文件大小off_t 长整型
    	off_t downLoadSize = 0;
        recvCycle(socketFd,&dataLen,4);
    	recvCycle(socketFd,&fileSize,dataLen);
    	//接受文件内容
        sliceSize = fileSize / 10000;
        while(1)
    	{
    		ret = recvCycle(socketFd,&dataLen,4);
            if(-1 == ret){
                printf("
    ");
                printf("server is update!
    ");
                break;
            }
            if(dataLen > 0)
    		{
    			ret = recvCycle(socketFd,buf,dataLen);
                if(-1 == ret){
                    printf("
    ");
                    printf("server is update!
    ");
                    break;
                }
                write(fd, buf, dataLen);
                downLoadSize += dataLen;
                if(downLoadSize - oldSize > sliceSize){
                    printf("
    %5.2f%%", (float)downLoadSize / fileSize * 100);
                    fflush(stdout);
                    oldSize = downLoadSize;
                }
    		}
            else{
                printf("
    100%%         
    ");
    			break;
    		}
    	}
    	close(fd);
    	close(socketFd);
    	return 0;
    }
    
    

    recvCycle.c

    #include "function.h"
    
    int recvCycle(int newFd, void* p, int len){
        int total = 0;
        int ret;
        char *pStart = (char*)p;   
        while(total < len){
            ret = recv(newFd, pStart + total, len - total, 0);
            //printf("recv ret = %d
    ", ret);
            if(0 == ret){
                return -1;
            }
            total = total + ret;
        }
        
        return 0;
    }
    
  • 相关阅读:
    【洛谷P4318】完全平方数
    【洛谷P2257】YY的GCD
    【洛谷P1403】约数研究
    【洛谷P3455】ZAP-Queries
    【CF600E】Lomsat gelral
    【BZOJ3289】Mato的文件管理 莫队+树状数组
    【洛谷P2585】三色二叉树
    【CF242E】Xor Segment
    【洛谷P4144】大河的序列
    hdu 1547(BFS)
  • 原文地址:https://www.cnblogs.com/Mered1th/p/10781243.html
Copyright © 2011-2022 走看看