知识点:进程、线程、协程、IO多路
模块:asyncio
思考:在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。
import requests def fetch_async(url): response=requests.get(url) return response url_list=[ 'http://www.baidu.com', 'http://www.cnblogs.com', 'http://www.sina.com', 'http://www.bing.com', 'http://www.qq.com', 'https://www.taobao.com/', ] for url in url_list: res=fetch_async(url) print(res)
import requests from concurrent.futures import ThreadPoolExecutor def fetch_async(url): response=requests.get(url) return response url_list=[ 'http://www.baidu.com', 'http://www.cnblogs.com', 'http://www.sina.com', 'http://www.bing.com', 'http://www.qq.com', 'https://www.taobao.com/', ] pool=ThreadPoolExecutor(5) for url in url_list: res=pool.submit(fetch_async,url) print(res) pool.shutdown(wait=True)
import requests from concurrent.futures import ThreadPoolExecutor def fetch_async(url): response=requests.get(url) return response def callback(future): print(future.result().text) url_list=[ 'http://www.baidu.com', 'http://www.cnblogs.com', 'http://www.sina.com', 'http://www.bing.com', 'http://www.qq.com', 'https://www.taobao.com/', ] pool=ThreadPoolExecutor(5) for url in url_list: res=pool.submit(fetch_async,url) res.add_done_callback(callback) pool.shutdown(wait=True)
from concurrent.futures import ProcessPoolExecutor import myrequests def fetch_async(url): res=requests.get(url) return res url_list=[ 'http://www.baidu.com', 'http://www.cnblogs.com', 'http://www.sina.com', 'http://www.bing.com', 'http://www.qq.com', 'https://www.taobao.com/', ] if __name__ == '__main__': pool=ProcessPoolExecutor(5) for url in url_list: pool.submit(fetch_async,url) pool.shutdown(wait=True)
from concurrent.futures import ProcessPoolExecutor import myrequests def fetch_async(url): res=requests.get(url) return res def callback(future): print(future.result()) url_list=[ 'http://www.baidu.com', 'http://www.cnblogs.com', 'http://www.sina.com', 'http://www.bing.com', 'http://www.qq.com', 'https://www.taobao.com/', ] if __name__ == '__main__': pool=ProcessPoolExecutor(5) for url in url_list: res=pool.submit(fetch_async,url) res=res.add_done_callback(res) pool.shutdown(wait=True)
通过上述代码均可以完成对请求性能的提高,对于多线程和多进行的缺点是在IO阻塞时会造成了线程和进程的浪费,所以异步IO回事首选:
import asyncio @asyncio.coroutine def fun1(): print('before...func1......') yield from asyncio.sleep(5) print('end...func1......') task =[fun1(),fun1()] loop=asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*task)) loop.close()
import asyncio @asyncio.coroutine def fun(host,url='/'): print(host, url) reader,writer=yield from asyncio.open_connection(host,80) # 创建连接 request_header_content="GET %s HTTP/1.0 Host: %s "%(url,host,) # 请求格式 writer.write(request_header_content) # 发送请求 yield from writer.drain() # 执行发送 text=yield from reader.read() # 获得返回值 print(host,url,text) writer.close() tasks = [ # fun('host','/url') fun('www.cnblogs.com', '/index/'), fun('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091') ] loop=asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
import asyncio import aiohttp @asyncio.coroutine def fun(url): print(url) response=yield from aiohttp.request('GET',url) response.close() tasks = [ # fun('host','/url') fun('http://www.baidu.com'), fun('http://dig.chouti.com') ] if __name__ == '__main__': loop=asyncio.get_event_loop() results=loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
import asyncio import requests @asyncio.coroutine def fun(func, *args): loop = asyncio.get_event_loop() future = loop.run_in_executor(None, func, *args) response = yield from future print(response.url, response.content) tasks = [ fun(requests.get,'http://www.baidu.com'), fun(requests.get,'http://dig.chouti.com') ] loop = asyncio.get_event_loop() results = loop.run_until_complete(asyncio.gather(*tasks)) loop.close()
import grequests request_list = [ grequests.get('http://httpbin.org/delay/1', timeout=0.001), grequests.get('http://fakedomain/'), grequests.get('http://httpbin.org/status/500') ] # ##### 执行并获取响应列表 ##### # response_list = grequests.map(request_list) # print(response_list) # ##### 执行并获取响应列表(处理异常) ##### # def exception_handler(request, exception): # print(request,exception) # print("Request failed") # response_list = grequests.map(request_list, exception_handler=exception_handler) # print(response_list) 5.grequests
import gevent,requests from gevent import monkey def fun(method,url,req_kwargs): print(method, url, req_kwargs) res=requests.request(method=method,url=url,**req_kwargs) print(res.url, res.content) # ##### 发送请求 ##### # gevent.joinall([ # gevent.spawn(fun,method='get',url='https://www.python.org/', req_kwargs={}), # gevent.spawn(fun, method='get', url='https://www.yahoo.com/', req_kwargs={}), # gevent.spawn(fun, method='get', url='https://github.com/', req_kwargs={}), # ]) # ##### 发送请求(协程池控制最大协程数量) ##### from gevent.pool import Pool pool = Pool(4) gevent.joinall([ pool.spawn(fun, method='get', url='https://www.python.org/', req_kwargs={}), pool.spawn(fun, method='get', url='https://www.baidu.com', req_kwargs={}), pool.spawn(fun, method='get', url='www.cnblogs.com', req_kwargs={}), ])
from twisted.internet import defer from twisted.web.client import getPage from twisted.internet import reactor def one_done(arg): print(arg) @defer.inlineCallbacks def task(url): res=getPage(bytes(url,encoding='utf-8')) # 发送Http请求 # 获取页面 res.addCallback(one_done) # 添加一个回掉函数 yield res def all_done(arg): print('done') reactor.stop() url_list=[ 'http://www.cnblogs.com', 'http://www.cnblogs.com', 'http://www.cnblogs.com', 'http://www.cnblogs.com', ] defer_list = [] # [特殊,特殊,特殊(已经向url发送请求)] for url in url_list: v = task(url) # 由于@defer.inlineCallbacks,请求完成马上执下一个请求 没有阻塞 defer_list.append(v) d=defer.DeferredList(defer_list) ''' 内部存在计数器所有完成向下进行''' d.addBoth(all_done) reactor.run() # 死循环
from tornado.httpclient import AsyncHTTPClient from tornado.httpclient import HTTPRequest from tornado import ioloop COUNT = 0 def handle_response(response): global COUNT COUNT -= 1 if response.error: print("Error:", response.error) else: print(response.body) # 方法同twisted # ioloop.IOLoop.current().stop() if COUNT == 0: ioloop.IOLoop.current().stop() def func(): url_list = [ 'http://www.cnblogs.com', 'http://www.baidu.com', ] global COUNT COUNT = len(url_list) for url in url_list: print(url) http_client = AsyncHTTPClient() http_client.fetch(HTTPRequest(url), handle_response) # ioloop.IOLoop.current().add_callback(func) ioloop.IOLoop.current().start() #死循环
以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】:
import socket import select # sk=socket.socket() # sk.connect(('www.baidu.com',80,)) # sk.send(b'GET / HTTP/1.0 Host:www.baidu.com ') # sk.send(b'POST / HTTP/1.0 Host:www.baidu.com k1=v1&k2=v2') # # data = sk.recv(8096) # IO阻塞 # print(data) # sk.close() # sk = socket.socket() # sk.setblocking(False) # # 1.连接 # try: # sk.connect(('www.baidu.com',80,)) # IO阻塞 # print('连接成功了...') # except BlockingIOError as e: # print(e) # # 2. 连接成功发送消息 # sk.send(b'GET / HTTP/1.0 Host:www.baidu.com ') # # sk.send(b'POST / HTTP/1.0 Host:www.baidu.com k1=v1&k2=v2') # # # 3. 等待着服务端响应 # data = sk.recv(8096) # IO阻塞 # print(data) # # # 关闭连接 # sk.close() class HttpRequest: def __init__(self,sk,host,callback): self.socket=sk self.host=host self.callback=callback def fileno(self): # 必须有一个文件描述符, return self.socket.fileno() class HttpResponse: def __init__(self,recv_data): self.recv_data = recv_data self.header_dict = {} self.body = None self.initialize() def initialize(self): headers, body = self.recv_data.split(b' ', 1) self.body = body header_list = headers.split(b' ') for h in header_list: h_str = str(h, encoding='utf-8') v = h_str.split(':', 1) if len(v) == 2: self.header_dict[v[0]] = v[1] class AsyncRequest: def __init__(self): self.conn = [] self.connection = [] # 用于检测是否已经连接成功 def add_request(self,host,callback): try: sk = socket.socket() sk.setblocking(0) sk.connect((host, 80,)) except BlockingIOError as e: print("报错了亲",e) request=HttpRequest(sk,host,callback) self.conn.append(request) self.connection.append(request) def run(self): while True: rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05) for w in wlist: print(w.host,'连接成功...') # 只要能循环到,表示socket和服务器端已经连接成功 tpl = "GET / HTTP/1.0 Host:%s " %(w.host,) w.socket.send(bytes(tpl,encoding='utf-8')) self.connection.remove(w) for r in rlist: # r,是HttpRequest recv_data = bytes() while True: try: chunck = r.socket.recv(8096) recv_data += chunck except Exception as e: break response = HttpResponse(recv_data) r.callback(response) r.socket.close() self.conn.remove(r) if len(self.conn) == 0: break def f1(response): print('保存到文件',response.header_dict) def f2(response): print('保存到数据库', response.header_dict) url_list = [ {'host':'www.baidu.com','callback': f1}, {'host':'cn.bing.com','callback': f2}, {'host':'www.cnblogs.com','callback': f2}, ] req = AsyncRequest() for item in url_list: req.add_request(item['host'],item['callback']) req.run()