zoukankan      html  css  js  c++  java
  • 爬虫必备—性能相关(异步非阻塞)

    在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。

    1. 同步执行

     1 import requests
     2 
     3 def fetch_async(url):
     4     response = requests.get(url)
     5     return response
     6 
     7 
     8 url_list = ['http://www.github.com', 'http://www.bing.com']
     9 
    10 for url in url_list:
    11     fetch_async(url)

    2. 多线程执行(多个线程并发执行,时间长短取决于最长的URL请求)

     1 from concurrent.futures import ThreadPoolExecutor
     2 import requests
     3 
     4 
     5 def fetch_async(url):
     6     response = requests.get(url)
     7     return response
     8 
     9 
    10 url_list = ['http://www.github.com', 'http://www.bing.com']
    11 pool = ThreadPoolExecutor(5)
    12 for url in url_list:
    13     pool.submit(fetch_async, url)
    14 pool.shutdown(wait=True)

    3.  多进程执行(在CPU核心数足够的情况下,多个进程并行执行,时间长短取决于最长的URL请求,理论上会快于多线程)

     1 from concurrent.futures import ProcessPoolExecutor
     2 import requests
     3 
     4 def fetch_async(url):
     5     response = requests.get(url)
     6     return response
     7 
     8 
     9 url_list = ['http://www.github.com', 'http://www.bing.com']
    10 pool = ProcessPoolExecutor(5)
    11 for url in url_list:
    12     pool.submit(fetch_async, url)
    13 pool.shutdown(wait=True)

    4. 多线程+回调函数(实现了异步非阻塞,在IO等待的情况下可以做其它事情)

     1 from concurrent.futures import ThreadPoolExecutor
     2 import requests
     3 
     4 def fetch_async(url):
     5     response = requests.get(url)
     6     return response
     7 
     8 
     9 def callback(future):
    10     print(future.result())
    11 
    12 
    13 url_list = ['http://www.github.com', 'http://www.bing.com']
    14 pool = ThreadPoolExecutor(5)
    15 for url in url_list:
    16     v = pool.submit(fetch_async, url)
    17     v.add_done_callback(callback)
    18 pool.shutdown(wait=True)

    5. 多进程+回调函数(实现了异步非阻塞,在IO等待的情况下可以做其它事情)

     1 from concurrent.futures import ProcessPoolExecutor
     2 import requests
     3 
     4 
     5 def fetch_async(url):
     6     response = requests.get(url)
     7     return response
     8 
     9 
    10 def callback(future):
    11     print(future.result())
    12 
    13 
    14 url_list = ['http://www.github.com', 'http://www.bing.com']
    15 pool = ProcessPoolExecutor(5)
    16 for url in url_list:
    17     v = pool.submit(fetch_async, url)
    18     v.add_done_callback(callback)
    19 pool.shutdown(wait=True)

    通过上述代码均可以完成对请求性能的提高,对于多线程和多进程的缺点是在IO阻塞时会造成了线程和进程的浪费,所以异步IO会是首选:

    1. asyncio示例一

    import asyncio
    
    
    @asyncio.coroutine
    def func1():
        print('before...func1......')
        yield from asyncio.sleep(5)
        print('end...func1......')
    
    
    tasks = [func1(), func1()]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

    2. asyncio示例二

     1 import asyncio
     2 
     3 
     4 @asyncio.coroutine
     5 def fetch_async(host, url='/'):
     6     print(host, url)
     7     reader, writer = yield from asyncio.open_connection(host, 80)
     8 
     9     request_header_content = """GET %s HTTP/1.0
    Host: %s
    
    """ % (url, host,)
    10     request_header_content = bytes(request_header_content, encoding='utf-8')
    11 
    12     writer.write(request_header_content)
    13     yield from writer.drain()
    14     text = yield from reader.read()
    15     print(host, url, text)
    16     writer.close()
    17 
    18 tasks = [
    19     fetch_async('www.cnblogs.com', '/wupeiqi/'),
    20     fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
    21 ]
    22 
    23 loop = asyncio.get_event_loop()
    24 results = loop.run_until_complete(asyncio.gather(*tasks))
    25 loop.close()

    3. asyncio + aiohttp

     1 import aiohttp
     2 import asyncio
     3 
     4 
     5 @asyncio.coroutine
     6 def fetch_async(url):
     7     print(url)
     8     response = yield from aiohttp.request('GET', url)
     9     # data = yield from response.read()
    10     # print(url, data)
    11     print(url, response)
    12     response.close()
    13 
    14 
    15 tasks = [fetch_async('http://www.google.com/'), fetch_async('http://www.chouti.com/')]
    16 
    17 event_loop = asyncio.get_event_loop()
    18 results = event_loop.run_until_complete(asyncio.gather(*tasks))
    19 event_loop.close()

    4. asyncio + requests

     1 import asyncio
     2 import requests
     3 
     4 
     5 @asyncio.coroutine
     6 def fetch_async(func, *args):
     7     loop = asyncio.get_event_loop()
     8     future = loop.run_in_executor(None, func, *args)
     9     response = yield from future
    10     print(response.url, response.content)
    11 
    12 
    13 tasks = [
    14     fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
    15     fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
    16 ]
    17 
    18 loop = asyncio.get_event_loop()
    19 results = loop.run_until_complete(asyncio.gather(*tasks))
    20 loop.close()

    5. gevent + requests

     1 import gevent
     2 
     3 import requests
     4 from gevent import monkey
     5 
     6 monkey.patch_all()
     7 
     8 
     9 def fetch_async(method, url, req_kwargs):
    10     print(method, url, req_kwargs)
    11     response = requests.request(method=method, url=url, **req_kwargs)
    12     print(response.url, response.content)
    13 
    14 # ##### 发送请求 #####
    15 gevent.joinall([
    16     gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
    17     gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
    18     gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
    19 ])
    20 
    21 # ##### 发送请求(协程池控制最大协程数量) #####
    22 # from gevent.pool import Pool
    23 # pool = Pool(None)
    24 # gevent.joinall([
    25 #     pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
    26 #     pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
    27 #     pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
    28 # ])

    6. grequests(封装的 gevent + requests)

     1 import grequests
     2 
     3 
     4 request_list = [
     5     grequests.get('http://httpbin.org/delay/1', timeout=0.001),
     6     grequests.get('http://fakedomain/'),
     7     grequests.get('http://httpbin.org/status/500')
     8 ]
     9 
    10 
    11 # ##### 执行并获取响应列表 #####
    12 # response_list = grequests.map(request_list)
    13 # print(response_list)
    14 
    15 
    16 # ##### 执行并获取响应列表(处理异常) #####
    17 # def exception_handler(request, exception):
    18 # print(request,exception)
    19 #     print("Request failed")
    20 
    21 # response_list = grequests.map(request_list, exception_handler=exception_handler)
    22 # print(response_list)

    7. Twisted示例

     1 from twisted.web.client import getPage, defer
     2 from twisted.internet import reactor
     3 
     4 
     5 def all_done(arg):
     6     reactor.stop()
     7 
     8 
     9 def callback(contents):
    10     print(contents)
    11 
    12 
    13 deferred_list = []
    14 
    15 url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
    16 for url in url_list:
    17     deferred = getPage(bytes(url, encoding='utf8'))
    18     deferred.addCallback(callback)
    19     deferred_list.append(deferred)
    20 
    21 dlist = defer.DeferredList(deferred_list)
    22 dlist.addBoth(all_done)
    23 
    24 reactor.run()

    8. Tornado

     1 from tornado.httpclient import AsyncHTTPClient
     2 from tornado.httpclient import HTTPRequest
     3 from tornado import ioloop
     4 
     5 
     6 def handle_response(response):
     7     """
     8     处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()
     9     :param response: 
    10     :return: 
    11     """
    12     if response.error:
    13         print("Error:", response.error)
    14     else:
    15         print(response.body)
    16 
    17 
    18 def func():
    19     url_list = [
    20         'http://www.baidu.com',
    21         'http://www.bing.com',
    22     ]
    23     for url in url_list:
    24         print(url)
    25         http_client = AsyncHTTPClient()
    26         http_client.fetch(HTTPRequest(url), handle_response)
    27 
    28 
    29 ioloop.IOLoop.current().add_callback(func)
    30 ioloop.IOLoop.current().start()

    9. Twisted更多

     1 from twisted.internet import reactor
     2 from twisted.web.client import getPage
     3 import urllib.parse
     4 
     5 
     6 def one_done(arg):
     7     print(arg)
     8     reactor.stop()
     9 
    10 post_data = urllib.parse.urlencode({'check_data': 'adf'})
    11 post_data = bytes(post_data, encoding='utf8')
    12 headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
    13 response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),
    14                    method=bytes('POST', encoding='utf8'),
    15                    postdata=post_data,
    16                    cookies={},
    17                    headers=headers)
    18 response.addBoth(one_done)
    19 
    20 reactor.run()

    以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】:

      1 import select
      2 import socket
      3 import time
      4 
      5 
      6 class AsyncTimeoutException(TimeoutError):
      7     """
      8     请求超时异常类
      9     """
     10 
     11     def __init__(self, msg):
     12         self.msg = msg
     13         super(AsyncTimeoutException, self).__init__(msg)
     14 
     15 
     16 class HttpContext(object):
     17     """封装请求和相应的基本数据"""
     18 
     19     def __init__(self, sock, host, port, method, url, data, callback, timeout=5):
     20         """
     21         sock: 请求的客户端socket对象
     22         host: 请求的主机名
     23         port: 请求的端口
     24         port: 请求的端口
     25         method: 请求方式
     26         url: 请求的URL
     27         data: 请求时请求体中的数据
     28         callback: 请求完成后的回调函数
     29         timeout: 请求的超时时间
     30         """
     31         self.sock = sock
     32         self.callback = callback
     33         self.host = host
     34         self.port = port
     35         self.method = method
     36         self.url = url
     37         self.data = data
     38 
     39         self.timeout = timeout
     40 
     41         self.__start_time = time.time()
     42         self.__buffer = []
     43 
     44     def is_timeout(self):
     45         """当前请求是否已经超时"""
     46         current_time = time.time()
     47         if (self.__start_time + self.timeout) < current_time:
     48             return True
     49 
     50     def fileno(self):
     51         """请求sockect对象的文件描述符,用于select监听"""
     52         return self.sock.fileno()
     53 
     54     def write(self, data):
     55         """在buffer中写入响应内容"""
     56         self.__buffer.append(data)
     57 
     58     def finish(self, exc=None):
     59         """在buffer中写入响应内容完成,执行请求的回调函数"""
     60         if not exc:
     61             response = b''.join(self.__buffer)
     62             self.callback(self, response, exc)
     63         else:
     64             self.callback(self, None, exc)
     65 
     66     def send_request_data(self):
     67         content = """%s %s HTTP/1.0
    Host: %s
    
    %s""" % (
     68             self.method.upper(), self.url, self.host, self.data,)
     69 
     70         return content.encode(encoding='utf8')
     71 
     72 
     73 class AsyncRequest(object):
     74     def __init__(self):
     75         self.fds = []
     76         self.connections = []
     77 
     78     def add_request(self, host, port, method, url, data, callback, timeout):
     79         """创建一个要请求"""
     80         client = socket.socket()
     81         client.setblocking(False)
     82         try:
     83             client.connect((host, port))
     84         except BlockingIOError as e:
     85             pass
     86             # print('已经向远程发送连接的请求')
     87         req = HttpContext(client, host, port, method, url, data, callback, timeout)
     88         self.connections.append(req)
     89         self.fds.append(req)
     90 
     91     def check_conn_timeout(self):
     92         """检查所有的请求,是否有已经连接超时,如果有则终止"""
     93         timeout_list = []
     94         for context in self.connections:
     95             if context.is_timeout():
     96                 timeout_list.append(context)
     97         for context in timeout_list:
     98             context.finish(AsyncTimeoutException('请求超时'))
     99             self.fds.remove(context)
    100             self.connections.remove(context)
    101 
    102     def running(self):
    103         """事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""
    104         while True:
    105             r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)
    106 
    107             if not self.fds:
    108                 return
    109 
    110             for context in r:
    111                 sock = context.sock
    112                 while True:
    113                     try:
    114                         data = sock.recv(8096)
    115                         if not data:
    116                             self.fds.remove(context)
    117                             context.finish()
    118                             break
    119                         else:
    120                             context.write(data)
    121                     except BlockingIOError as e:
    122                         break
    123                     except TimeoutError as e:
    124                         self.fds.remove(context)
    125                         self.connections.remove(context)
    126                         context.finish(e)
    127                         break
    128 
    129             for context in w:
    130                 # 已经连接成功远程服务器,开始向远程发送请求数据
    131                 if context in self.fds:
    132                     data = context.send_request_data()
    133                     context.sock.sendall(data)
    134                     self.connections.remove(context)
    135 
    136             self.check_conn_timeout()
    137 
    138 
    139 if __name__ == '__main__':
    140     def callback_func(context, response, ex):
    141         """
    142         :param context: HttpContext对象,内部封装了请求相关信息
    143         :param response: 请求响应内容
    144         :param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None)
    145         :return:
    146         """
    147         print(context, response, ex)
    148 
    149     obj = AsyncRequest()
    150     url_list = [
    151         {'host': 'www.google.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
    152          'callback': callback_func},
    153         {'host': 'www.baidu.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
    154          'callback': callback_func},
    155         {'host': 'www.bing.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
    156          'callback': callback_func},
    157     ]
    158     for item in url_list:
    159         print(item)
    160         obj.add_request(**item)
    161 
    162     obj.running()
    史上最牛逼的异步IO模块

    本文转载自: 银角大王

    http://www.cnblogs.com/wupeiqi/articles/6229292.html

  • 相关阅读:
    读取目录中文件名的方法 C++
    std::string::compare() in C++
    std::string::erase in C++ 之 erase()方法截取string的前几位、后几位
    Source Insight 破解
    bib+windows+word=bibtex4word插件使用方法
    7K7K小游戏
    Linux+Gurobi-python调用
    使用位运算替代模运算,为了高效做模除最好选择mod除(2^n)
    windows+VS2019+gurobi解决方案配置
    The plagiarism
  • 原文地址:https://www.cnblogs.com/OldJack/p/7465140.html
Copyright © 2011-2022 走看看