# !/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9', }
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
class Crawler:
def __init__(self, url):
self.url = url
self.response = b''
def fetch(self):
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
f = Future()
def on_connected():
f.set_result(None) # 会执行yield f 添加的step函数,然后send之后执行到下一个yield
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield f # 让出cpu权限,此时,next_future是当前这个未来对象,并且把step函数添加到当前未来对象的_callbacks
selector.unregister(sock.fileno()) # 一次注册只能使用一次
get = 'GET {0} HTTP/1.0 Host: example.com '.format(self.url)
sock.send(get.encode('ascii'))
global stopped
while True:
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield f # 先yield f,chunk=等会send的值
# 1.此时,next_future是当前这个未来对象,并且把step函数添加到当前未来对象的_callbacks
# 2.loop检测到事件后,执行on_readable,on_readable调用set_result
# 3.set_result把sock.recv(4096)设置为当前Future对象的result
# 4.执行1添加的step函数,send 当前未来对象的result,即sock.recv(4096)
# 此时chunk=send 的future.result,即sock.recv(4096)
# 接下来执行到下一个yield
selector.unregister(sock.fileno())
if chunk:
self.response += chunk
else:
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
break
class Task:
def __init__(self, core):
self.core = core
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
next_future = self.core.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
def loop(): # 监听事件执行回掉
while not stopped:
events = selector.select()
print(events)
for event_key, event_mask in events:
callback = event_key.data # 回掉函数的函数名
callback()
if __name__ == '__main__':
import time
start = time.time()
for url in urls_todo:
crawer = Crawler(url)
Task(crawer.fetch())
loop()
print(time.time() - start)
2.使用yield from 重构:
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
selector = DefaultSelector()
stopped = False
urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9', }
class Future:
def __init__(self):
self.result = None
self._callbacks = []
def add_done_callback(self, fn):
self._callbacks.append(fn)
def set_result(self, result):
self.result = result
for fn in self._callbacks:
fn(self)
def __iter__(self):
yield self
return self.result
def connect(sock, address):
f = Future()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
pass
def on_connected():
f.set_result(None)
selector.register(sock.fileno(), EVENT_WRITE, on_connected)
yield from f
selector.unregister(sock.fileno())
def read(sock):
f = Future()
def on_readable():
f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunk = yield from f
selector.unregister(sock.fileno())
return chunk
def read_all(sock):
response = []
chunk = yield from read(sock)
while chunk:
response.append(chunk)
chunk = yield from read(sock)
return b''.join(response)
class Crawler:
def __init__(self, url):
self.url = url
self.response = b''
def fetch(self):
global stopped
sock = socket.socket()
yield from connect(sock, ('example.com', 80))
get = 'GET {0} HTTP/1.0 Host: example.com '.format(self.url)
sock.send(get.encode('ascii'))
self.response = yield from read_all(sock)
urls_todo.remove(self.url)
if not urls_todo:
stopped = True
class Task:
def __init__(self, core):
self.core = core
f = Future()
f.set_result(None)
self.step(f)
def step(self, future):
try:
next_future = self.core.send(future.result)
except StopIteration:
return
next_future.add_done_callback(self.step)
def loop(): # 监听事件执行回掉
while not stopped:
events = selector.select()
print(events)
for event_key, event_mask in events:
callback = event_key.data # 回掉函数的函数名
callback()
if __name__ == '__main__':
import time
start = time.time()
for url in urls_todo:
crawer = Crawler(url)
Task(crawer.fetch())
loop()
print(time.time() - start)