selcet(等待I/O完成)的介绍:
select同时监控多个socket,select()的机制提供了fd_set的数据结构,实际是long类型的数组,优点是跨平台性,select的缺点在于单个进程能够监视的文件描述符的数量存在最大限制,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的的开销也线性增长。同时,由于网络响应时间的延迟使得大量的tcp链接处于非常活跃状态,但调用select()会对所有的socket进行一次线性扫描,所以这也浪费了一定的开销。
poll
poll与select在本质上没有区别,但是poll没有最大文件描述符数量的限制。
select和poll同样存在的一个缺点就是:包含大量文件描述符的数组被整体复制于用户态和内核的内存地址之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符的数量增加啊而线性增加。
select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行I/O操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以他们一般不会丢失就绪的消息,这种方式叫做水平触发。
epoll:linux2.6最好的多路I/O就绪通知方法。
epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发)
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获取就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样更彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式,在select/poll中进程只有在调用一定的方法后内核才对所有的监视文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某一个文件描述符就绪时,内存会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
I/O多路复用是在单线程模式下实现多线程的效果,实现一个多I/O并发的效果。
import socket SOCKET_FAMILY = socket.AF_INET SOCKET_TYPE = socket.SOCK_STREAM sockServer = socket.socket() sockServer.bind(('0.0.0.0', 8888)) sockServer.listen(5) while True: cliobj, addr = sockServer.accept() while True: recvdata = cliobj.recv(1024) if recvdata: print(recvdata.decode()) else: cliobj.close() break
客户端:
import socket socCli = socket.socket() socCli.connect(('127.0.0.1', 8888)) while True: data = input("input str:") socCli.send(data.encode())
改进上面额的demo
elect 详细解释,用线程的IO多路复用实现一个读写分离的、支持多客户端的连接请求 """ import socket import queue from select import select SERVER_IP = ('127.0.0.1', 9999) # 保存客户端发送过来的消息,将消息放入队列中 message_queue = {} input_list = [] output_list = [] if __name__ == "__main__": server = socket.socket() server.bind(SERVER_IP) server.listen(10) # 设置为非阻塞 server.setblocking(False) # 初始化将服务端加入监听列表 input_list.append(server) while True: # 开始 select 监听,对input_list中的服务端server进行监听 stdinput, stdoutput, stderr = select(input_list, output_list, input_list) # 循环判断是否有客户端连接进来,当有客户端连接进来时select将触发 for obj in stdinput: # 判断当前触发的是不是服务端对象, 当触发的对象是服务端对象时,说明有新客户端连接进来了 if obj == server: # 接收客户端的连接, 获取客户端对象和客户端地址信息 conn, addr = server.accept() print("Client {0} connected! ".format(addr)) # 将客户端对象也加入到监听的列表中, 当客户端发送消息时 select 将触发 input_list.append(conn) # 为连接的客户端单独创建一个消息队列,用来保存客户端发送的消息 message_queue[conn] = queue.Queue() else: # 由于客户端连接进来时服务端接收客户端连接请求,将客户端加入到了监听列表中(input_list),客户端发送消息将触发 # 所以判断是否是客户端对象触发 try: recv_data = obj.recv(1024) # 客户端未断开 if recv_data: print("received {0} from client {1}".format(recv_data.decode(), addr)) # 将收到的消息放入到各客户端的消息队列中 message_queue[obj].put(recv_data) # 将回复操作放到output列表中,让select监听 if obj not in output_list: output_list.append(obj) except ConnectionResetError: # 客户端断开连接了,将客户端的监听从input列表中移除 input_list.remove(obj) # 移除客户端对象的消息队列 del message_queue[obj] print(" [input] Client {0} disconnected".format(addr)) # 如果现在没有客户端请求,也没有客户端发送消息时,开始对发送消息列表进行处理,是否需要发送消息 for sendobj in output_list: try: # 如果消息队列中有消息,从消息队列中获取要发送的消息 if not message_queue[sendobj].empty(): # 从该客户端对象的消息队列中获取要发送的消息 send_data = message_queue[sendobj].get() sendobj.sendall(send_data) else: # 将监听移除等待下一次客户端发送消息 output_list.remove(sendobj) except ConnectionResetError: # 客户端连接断开了 del message_queue[sendobj] output_list.remove(sendobj) print(" [output] Client {0} disconnected".format(addr))
epoll实现实例:
#!/usr/bin/env python import select import socket response = b'' serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.bind(('0.0.0.0', 8080)) serversocket.listen(1) # 因为socket默认是阻塞的,所以需要使用非阻塞(异步)模式。 serversocket.setblocking(0) # 创建一个epoll对象 epoll = select.epoll() # 在服务端socket上面注册对读event的关注。一个读event随时会触发服务端socket去接收一个socket连接 epoll.register(serversocket.fileno(), select.EPOLLIN) try: # 字典connections映射文件描述符(整数)到其相应的网络连接对象 connections = {} requests = {} responses = {} while True: # 查询epoll对象,看是否有任何关注的event被触发。参数“1”表示,我们会等待1秒来看是否有event发生。 # 如果有任何我们感兴趣的event发生在这次查询之前,这个查询就会带着这些event的列表立即返回 events = epoll.poll(1) # event作为一个序列(fileno,event code)的元组返回。fileno是文件描述符的代名词,始终是一个整数。 for fileno, event in events: # 如果是服务端产生event,表示有一个新的连接进来 if fileno == serversocket.fileno(): connection, address = serversocket.accept() print('client connected:', address) # 设置新的socket为非阻塞模式 connection.setblocking(0) # 为新的socket注册对读(EPOLLIN)event的关注 epoll.register(connection.fileno(), select.EPOLLIN) connections[connection.fileno()] = connection # 初始化接收的数据 requests[connection.fileno()] = b'' # 如果发生一个读event,就读取从客户端发送过来的新数据 elif event & select.EPOLLIN: print("------recvdata---------") # 接收客户端发送过来的数据 requests[fileno] += connections[fileno].recv(1024) # 如果客户端退出,关闭客户端连接,取消所有的读和写监听 if not requests[fileno]: connections[fileno].close() # 删除connections字典中的监听对象 del connections[fileno] # 删除接收数据字典对应的句柄对象 del requests[connections[fileno]] print(connections, requests) epoll.modify(fileno, 0) else: # 一旦完成请求已收到,就注销对读event的关注,注册对写(EPOLLOUT)event的关注。写event发生的时候,会回复数据给客户端 epoll.modify(fileno, select.EPOLLOUT) # 打印完整的请求,证明虽然与客户端的通信是交错进行的,但数据可以作为一个整体来组装和处理 print('-' * 40 + ' ' + requests[fileno].decode()) # 如果一个写event在一个客户端socket上面发生,它会接受新的数据以便发送到客户端 elif event & select.EPOLLOUT: print("-------send data---------") # 每次发送一部分响应数据,直到完整的响应数据都已经发送给操作系统等待传输给客户端 byteswritten = connections[fileno].send(requests[fileno]) requests[fileno] = requests[fileno][byteswritten:] if len(requests[fileno]) == 0: # 一旦完整的响应数据发送完成,就不再关注写event epoll.modify(fileno, select.EPOLLIN) # HUP(挂起)event表明客户端socket已经断开(即关闭),所以服务端也需要关闭。 # 没有必要注册对HUP event的关注。在socket上面,它们总是会被epoll对象注册 elif event & select.EPOLLHUP: print("end hup------") # 注销对此socket连接的关注 epoll.unregister(fileno) # 关闭socket连接 connections[fileno].close() del connections[fileno] finally: # 打开的socket连接不需要关闭,因为Python会在程序结束的时候关闭。这里显式关闭是一个好的代码习惯 epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close()