它的功能与linux的epoll,还是select模块,poll等类似;实现高效的I/O multiplexing, 常用于非阻塞的socket的编程中; 简单介绍一下这个模块,更多内容查看 python文档:https://docs.python.org/3/library/selectors.html
1. 模块定义了一个 BaseSelector的抽象基类, 以及它的子类,包括:SelectSelector, PollSelector, EpollSelector, DevpollSelector, KqueueSelector.
另外还有一个DefaultSelector类,它其实是以上其中一个子类的别名而已,它自动选择为当前环境中最有效的Selector,所以平时用 DefaultSelector类就可以了,其它用不着。
2. 模块定义了两个常量,用于描述 event Mask
EVENT_READ : 表示可读的; 它的值其实是1;
EVENT_WRITE: 表示可写的; 它的值其实是2;
3. 模块定义了一个 SelectorKey类, 一般用这个类的实例 来描述一个已经注册的文件对象的状态, 这个类的几个属性常用到:
fileobj: 表示已经注册的文件对象;
fd: 表示文件对象的描述符,是一个整数,它是文件对象的 fileno()方法的返回值;
events: 表示注册一个文件对象时,我们等待的events, 即上面的event Mask, 是可读呢还是可写呢!!
data: 表示注册一个文件对象是邦定的data;
4. 最后说说抽象基类中的方法;
register(fileobj, events, data=None) |
作用:注册一个文件对象。 参数: fileobj——即可以是fd 也可以是一个拥有fileno()方法的对象; events——上面的event Mask 常量; data 返回值: 一个SelectorKey类的实例; |
unregister(fileobj) |
作用: 注销一个已经注册过的文件对象; 返回值:一个SelectorKey类的实例; |
modify(fileobj, events, data=None) |
作用:用于修改一个注册过的文件对象,比如从监听可读变为监听可写;它其实就是register() 后再跟unregister(), 但是使用 modify( ) 更高效; 返回值:一个SelectorKey类的实例; |
select(timeout=None) |
作用: 用于选择满足我们监听的event的文件对象; 返回值: 是一个(key, events)的元组, 其中key是一个SelectorKey类的实例, 而events 就是 event Mask(EVENT_READ或EVENT_WRITE,或者二者的组合) |
close() |
作用:关闭 selector。 最后一定要记得调用它, 要确保所有的资源被释放; |
get_key(fileobj) |
作用: 返回注册文件对象的 key; 返回值 :一个SelectorKey类的实例; |
服务端
#!/usr/bin/env python # -*- coding: utf-8 -*- from socket import * import selectors sel=selectors.DefaultSelector() def accept(server_fileobj,mask): coon,addr = server_fileobj.accept() print(coon,addr,mask) sel.register(coon,selectors.EVENT_READ,read) def read(conn,mask): try: data = conn.recv(1024) if not data: print('closing',conn) sel.unregister(conn) conn.close() return conn.send(b'hello') except Exception: print('closing', conn) sel.unregister(conn) conn.close() server_fileobj = socket(AF_INET,SOCK_STREAM) server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server_fileobj.bind(('127.0.0.1',8088)) server_fileobj.listen(5) server_fileobj.setblocking(False)#设置socket的接口为非阻塞 #相当于往select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept sel.register(server_fileobj,selectors.EVENT_READ,accept) while True: #检测所有的fileobj,是否有完成wait data的 events = sel.select() # SelectorKey = namedtuple('SelectorKey', ['fileobj', 'fd', 'events', 'data']) for sel_obj,mask in events: callable = sel_obj.data callable(sel_obj.fileobj,mask)
客户端
#!/usr/bin/env python # -*- coding: utf-8 -*- from socket import * c=socket(AF_INET,SOCK_STREAM) c.connect(('127.0.0.1',8088)) while True: msg=input('>>: ') if not msg:continue c.send(msg.encode('utf-8')) data=c.recv(1024) print(data.decode('utf-8'))
模拟请求
1 #1. epoll并不代表一定比select好 2 # 在并发高的情况下,连接活跃度不是很高, epoll比select 3 # 并发性不高,同时连接很活跃, select比epoll好 4 5 #通过非阻塞io实现http请求 6 # select + 回调 + 事件循环 7 # 并发性高 8 # 使用单线程 9 10 import socket 11 from urllib.parse import urlparse 12 from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE 13 14 15 selector = DefaultSelector() 16 #使用select完成http请求 17 urls = [] 18 stop = False 19 20 21 class Fetcher: 22 def connected(self, key): 23 selector.unregister(key.fd) 24 self.client.send("GET {} HTTP/1.1 Host:{} Connection:close ".format(self.path, self.host).encode("utf8")) 25 selector.register(self.client.fileno(), EVENT_READ, self.readable) 26 27 def readable(self, key): 28 d = self.client.recv(1024) 29 if d: 30 self.data += d 31 else: 32 selector.unregister(key.fd) 33 data = self.data.decode("utf8") 34 html_data = data.split(" ")[1] 35 print(html_data) 36 self.client.close() 37 urls.remove(self.spider_url) 38 if not urls: 39 global stop 40 stop = True 41 42 def get_url(self, url): 43 self.spider_url = url 44 url = urlparse(url) 45 self.host = url.netloc 46 self.path = url.path 47 self.data = b"" 48 if self.path == "": 49 self.path = "/" 50 51 # 建立socket连接 52 self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 53 self.client.setblocking(False) 54 55 try: 56 self.client.connect((self.host, 80)) # 阻塞不会消耗cpu 57 except BlockingIOError as e: 58 pass 59 60 #注册 61 selector.register(self.client.fileno(), EVENT_WRITE, self.connected) 62 63 64 #心脏, 心脏在不停的跳动,跳动就会知道调用什么代码,执行什么方法 65 #不停的向操作系统询问哪些socket已经准备好了,然后执行回调方法 66 def loop(): 67 #事件循环,不停的请求socket的状态并调用对应的回调函数 68 #1. select本身是不支持register模式 69 #2. socket状态变化以后的回调是由程序员完成的 70 while not stop: 71 ready = selector.select() 72 for key, mask in ready: 73 call_back = key.data 74 call_back(key) 75 #回调+事件循环+select(pollepoll) 76 77 if __name__ == "__main__": 78 fetcher = Fetcher() 79 import time 80 start_time = time.time() 81 for url in range(20): 82 url = "http://shop.projectsedu.com/goods/{}/".format(url) 83 urls.append(url) 84 fetcher = Fetcher() 85 fetcher.get_url(url) 86 loop() 87 print(time.time()-start_time)