zoukankan      html  css  js  c++  java
  • 回射服务器-多路复用 select 01 (阻塞)

    1. 经典“入门级”问题:IO 多路复用是什么意思?

    在单个线程通过记录跟踪每一个Sock(I/O流)的状态来同时管理多个I/O流. 发明它的原因,是尽量多的提高服务器的吞吐能力。

    是不是听起来好拗口,看个图就懂了.(其实就是一个时分复用)

    这里写图片描述

    在同一个线程里面, 通过拨开关的方式,来同时传输多个I/O流

    那么,“一个请求到来了,nginx使用epoll接收请求的过程是怎样的”, 其实,ngnix会有很多链接进来, epoll会把他们都监视起来,然后像拨开关一样,谁有数据就拨向谁,然后调用相应的代码处理。

    其中,“复用”指的是复用同一个线程。

    其实就是操作系统为你提供了一个功能,当你的某个socket可读或者可写的时候,它可以给你一个通知,让你去处理读事件或者写事件。

    而这个功能能够通过select/poll/epoll等来使用。这些函数都可以同时监视多个描述符的读写就绪状况,这样,多个描述符的 I/O 操作都能在一个线程内并发交替地顺序完成 。

    2.如何使用 select 以及他的那些参数都分别代表了什么,在这里就不说了 。man 手册里讲的还是比较清楚的。

    3. 直接来看一个用select实现回射服务器的小例子

    origin.h 文件

    #ifndef _ORIGIN_H
    #define _ORIGIN_H
    #include <sys/socket.h>
    #include <errno.h>
    #include <unistd.h>
    #include <sys/select.h>
    #include <strings.h>
    #include <arpa/inet.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <sys/time.h>
    #include <sys/types.h>
    #include <unistd.h>
    #include <stdio.h>
    #include <readline/readline.h>
    #include <readline/history.h>
    #define MAXLINE 1024
    #define SA struct sockaddr
    
    int err_sys(const char *err_string)
    {
    	perror(err_string); // err_string + 出错信息
    	exit(1);
    }
    int Socket(int family, int type, int protocol)
    {
    	int n;
    	if ((n = socket(family, type, protocol)) < 0)
    		err_sys("socket error ");
    	return n;
    }
    int Bind(int fd, const struct sockaddr *sa, socklen_t salen)
    {
    	if (bind(fd, sa, salen) < 0)
    		err_sys("bind error");
    }
    void Listen(int fd, int backlog)
    {
    	char *ptr = nullptr;
    	if ((ptr = getenv("LISTENQ")) != NULL)
    		backlog = atoi(ptr);
    
    	if (listen(fd, backlog) < 0)
    		err_sys("listen error");
    }
    int Select(int nfds, fd_set *readfds, fd_set *writefds,
    		   fd_set *exceptfds, struct timeval *timeout)
    {
    	int n;
    	if ((n = select(nfds, readfds, writefds, exceptfds, timeout)) < 0)
    		err_sys("select error ");
    	return n;
    }
    int Accept(int fd, struct sockaddr *sa, socklen_t *salenptr)
    {
    	int n;
    again:
    	if ((n = accept(fd, sa, salenptr)) < 0)
    	{
    		/*ECONNABORTED:在于当服务和客户进程在完成用于 TCP 连接的“三次握手”后,客户 TCP 却发送了一个 RST (复位)分节,
    		在服务进程看来,就在该连接已由 TCP 排队,等着服务进程调用 accept 的时候 RST 却到达了。
    		POSIX 规定此时的 errno 值必须 ECONNABORTED */
    		/*EPROTO 与 ECONNABORTED 是一样的,看具体机器*/
    		if (errno == EPROTO || errno == ECONNABORTED)
    			goto again;
    		else
    			err_sys("accept error ");
    	}
    	return n;
    }
    ssize_t Read(int fd, void *ptr, size_t nbytes)
    {
    	ssize_t n;
    	if ( (n = read(fd, ptr, nbytes)) == -1)
    		err_sys("read error");
    	return n;
    }
    size_t /* Write "n" bytes to a descriptor. */
    writen(int fd, const void *vptr, size_t n)
    {
    	size_t nleft;
    	ssize_t nwritten;
    	const char *ptr;
    
    	ptr = (const char *)vptr;
    	nleft = n;
    	while (nleft > 0)
    	{
    		if ((nwritten = write(fd, ptr, nleft)) <= 0)
    		{
    			if (nwritten < 0 && errno == EINTR)
    				nwritten = 0; /* and call write() again */
    			else
    				return (-1); /* error */
    		}
    
    		nleft -= nwritten;
    		ptr += nwritten;
    	}
    	return (n);
    }
    void Writen(int fd, void *ptr, size_t nbytes)
    {
    	if (writen(fd, ptr, nbytes) != nbytes)
    		err_sys("writen error");
    }
    void Close(int fd)
    {
    	if (close(fd) == -1)
    		err_sys("close error");
    }
    
    #endif
    

    myhead.h 文件

    #ifndef _MYHEAD_H
    #define _MYHEAD_H
    #define LISTENQ 1024
    #include <sys/time.h>
    #include <sys/types.h>
    #include <unistd.h>
    class mySelect
    {
      public:
    	mySelect(const char *_ip, const int _port) : ip(_ip), port(_port) { run(ip, port); }
    	~mySelect();
    	int run(const char *ip, const int port);
    
      private:
    	const int port;
    	const char *ip;
    	/*(maxfd+1)是select函数的第一个参数的当前值
    	listenfd 是监听套接字
    	maxi 是client 数组当前使用项的最大下标
    	*/
    	int listenfd, maxfd, maxi;
    	int client[FD_SETSIZE]; //保存已连接套接字,刚开始初始化为 -1
    	fd_set rset, allset;
    	int nready;
    };
    #endif
    

    真正实现函数文件

    #include "myhead.h"
    #include "origin.h"
    int mySelect::run(const char *ip, const int port)
    {
    	struct sockaddr_in servaddr;
    
    	listenfd = Socket(AF_INET, SOCK_STREAM, 0);
    
    	bzero(&servaddr, sizeof(servaddr));
    	servaddr.sin_family = AF_INET;
    	inet_pton(AF_INET, ip, &servaddr.sin_addr);
    	servaddr.sin_port = htons(port);
    
    	Bind(listenfd, (SA *)&servaddr, sizeof(servaddr));
    
    	Listen(listenfd, LISTENQ);
    
    	maxfd = listenfd; //0,1,2 被标准I/O占用,所以第一个可用描述符是 3 ,先初始化
    	maxi = -1;
    
    	for (int i = 0; i < FD_SETSIZE; ++i)
    		client[i] = -1;
    	FD_ZERO(&allset);
    	FD_SET(listenfd, &allset); //打开监听套接字对应位
    
    	for (;;)
    	{
    		rset = allset;
    		int nready = Select(maxfd + 1, &rset, NULL, NULL, NULL);
    		if (FD_ISSET(listenfd, &rset))
    		{
    			struct sockaddr_in cliaddr;
    			socklen_t clilen = sizeof(cliaddr);
    			int connfd = Accept(listenfd, (SA *)&cliaddr, &clilen);
    			/*
    		printf("new client: %s, port %d
    ",
    					Inet_ntop(AF_INET, &cliaddr.sin_addr, 4, NULL),
    					ntohs(cliaddr.sin_port));
    		*/
    			int i;
    			for (i = 0; i < FD_SETSIZE; i++)
    			{
    				if (client[i] < 0)
    				{
    					client[i] = connfd;
    					break;
    				}
    			}
    			if (i == FD_SETSIZE)
    				err_sys("too many clients ");
    			FD_SET(connfd, &allset); //加入新的套接字
    			if (connfd > maxfd)
    				maxfd = connfd; //为了select 函数的第一个参数
    			if (i > maxi)
    				maxi = i;
    			if (--nready <= 0)
    				continue;
    		}
    		int sockfd;
    		ssize_t n;
    		char buf[MAXLINE];
    		for (int i = 0; i <= maxi; i++)
    		{
    			if ((sockfd = client[i]) < 0)
    				continue;
    			if (FD_ISSET(sockfd, &rset))
    			{ //看描述符是否在select 返回的描述符集中
    				bzero(&buf, sizeof(buf));
    				if ((n = Read(sockfd, buf, MAXLINE)) == 0)
    				{ //对方关闭连接
    					Close(sockfd);
    					FD_CLR(sockfd, &allset);
    					client[i] = -1;
    				}
    				else
    					Writen(sockfd, buf, n);
    				if (--nready <= 0)
    					break;
    			}
    		}
    	}
    }
    mySelect::~mySelect()
    {
    	close(listenfd);
    }
    

    main 函数

    #include"myhead.h"
    
    int main(void) {
    	mySelect my("127.0.0.1",8081);
    	return 0;
    }
    

    一些前提,不得不提:

    	(maxfd+1)是select函数的第一个参数的当前值
    	listenfd 是监听套接字
    	maxi 是client 数组当前使用项的最大下标
    	client 保存已连接套接字,刚开始全初始化为 -1
    	nready 也就是select 的返回值是已就绪的描述符数目。
    

    其他的东西都是一些辅助,我们直接来分析 run 函数即可

    1. 初始化描述符集合,将 listenfd 所对应的位打开。
    2. select 等待事件发生。或者是新连接,数据,FIN,RST的到达。
    3. 如果监听套接字变为可读,意味这已经建立了一个新的连接,于是调用accept接受连接。client 记录已连接描述符。就绪描述符数目减一,若其值变为0,就可以避免进入下一个 for 循环。让我们可以使用select的返回值来检查未就绪的描述符。
    4. 检查现有连接,进行相应处理即可 。

    测试:

    这里写图片描述

    需要注意的是:

    1. select 的第一个参数是maxfdp1指定待测试的描述符个数,是待测试的最大描述符+1,select 监视{ 0~maxfdp1-1 }范围内的套接字。
    2. 描述符集合:通常是一个整数数组,其中每个整数的每一位对应一个描述符,比如:假设使用32为整数,那么该数组的第一个整数就对应与描述符 0~31,第二个元素就对应与32~63,依次类推。
    3. 严禁混合使用标准I/O函数库stdio(fgets等)和 select 函数!!!

    问题大杂汇:

    1. 为什么服务端的套接字总是非阻塞的?

    2. 那些事件是可写事件?那些事件是可读事件?

    所谓可读事件,具体的说是指以下事件:1 socket内核接收缓冲区中的可用字节数大于或等于其低水位SO_RCVLOWAT;2 socket通信的对方关闭了连接,这个时候在缓冲区里有个文件结束符EOF,此时读操作将返回0;3 监听socket的backlog队列有已经完成三次握手的连接请求,可以调用accept;4 socket上有未处理的错误,此时可以用getsockopt来读取和清除该错误。

    所谓可写事件,则是指:1 socket的内核发送缓冲区的可用字节数大于或等于其低水位SO_SNDLOWAIT;2 socket的写端被关闭,继续写会收到SIGPIPE信号;3 非阻塞模式下,connect返回之后,发起连接成功或失败;4 socket上有未处理的错误,此时可以用getsockopt来读取和清除该错误。

    3. 什么是time_wait 状态?

    4. 在服务端accept之前,连接中止会发生什么?

    5. SIGPIPE信号是什么?

    6.为什么使用复用技术?而不是简单的来一个客户连接就开一个进程/线程去处理?

    7. select /poll /epoll 都有什么优缺点?

  • 相关阅读:
    Sql Server 2016数据库定时备份操作步骤
    .net 生成原图和多张缩略图
    python小知识
    python小知识
    Q pi (lambda)
    GAE&reward shaping
    yield函数
    关于vs code和markdown
    强化学习第七章
    强化学习第六章
  • 原文地址:https://www.cnblogs.com/Tattoo-Welkin/p/9494020.html
Copyright © 2011-2022 走看看