zoukankan      html  css  js  c++  java
  • 爬虫之性能相关

    在编写爬虫时,性能的消耗主要在IO请求中,当单进程单线程模式下请求URL时必然会引起等待,从而使得请求整体变慢。

    1、同步执行

    import requests
    
    def fetch_async(url):
        response = requests.get(url)
        return response
    
    url_list = ['http://www.github.com', 'http://www.bing.com']
    
    for url in url_list:
        fetch_async(url)

    2、多线程执行

    • 线程池不能太多,因为线程的上下文切换,浪费时间,会降低整体效率;

    • 每个线程发出请求之后就阻塞,等待返回数据,这中间的时间线程处于空闲状态;

    from concurrent.futures import ThreadPoolExecutor
    import requests
    
    def fetch_async(url):
        response = requests.get(url)
        return response
    
    url_list = ['http://www.github.com', 'http://www.bing.com']
    pool = ThreadPoolExecutor(5)
    for url in url_list:
        pool.submit(fetch_async, url)
    pool.shutdown(wait=True)

    3、多线程+回调函数执行

    • 优点:请求成功返回之后调用回调函数,降低耦合
    from concurrent.futures import ThreadPoolExecutor
    import requests
    
    def fetch_async(url):
        response = requests.get(url)
        return response
    
    def callback(future):
        print(future.result())
    
    url_list = ['http://www.github.com', 'http://www.bing.com']
    pool = ThreadPoolExecutor(5)
    for url in url_list:
        v = pool.submit(fetch_async, url)
        v.add_done_callback(callback)
    pool.shutdown(wait=True)

    4、多进程执行

    • 每个进程发出请求之后就阻塞,等待返回数据,这中间的时间进程处于空闲状态;
    from concurrent.futures import ProcessPoolExecutor
    import requests
    
    def fetch_async(url):
        response = requests.get(url)
        return response
    
    url_list = ['http://www.github.com', 'http://www.bing.com']
    pool = ProcessPoolExecutor(5)
    for url in url_list:
        pool.submit(fetch_async, url)
    pool.shutdown(wait=True)

    5、多进程+回调函数执行

    from concurrent.futures import ProcessPoolExecutor
    import requests
    
    def fetch_async(url):
        response = requests.get(url)
        return response
    
    def callback(future):
        print(future.result())
    
    url_list = ['http://www.github.com', 'http://www.bing.com']
    pool = ProcessPoolExecutor(5)
    for url in url_list:
        v = pool.submit(fetch_async, url)
        v.add_done_callback(callback)
    pool.shutdown(wait=True)

    多线程和多进程的区别:

    • IO密集型操作,使用多线程,因为不调用CPU

    • 计算密集型操作,使用多进程,调用CPU

    • 线程之间共用资源,可以节省资源空间

    • 进程之间不共享资源,比较占用资源空间

    • 因为GIL锁的原因,如果用多线程进行计算型操作,每次一个进程同一时间只能有一个线程被CPU调用,效率不高;

    通过上述代码均可以完成对请求性能的提高,对于多线程和多进程的缺点是在IO阻塞时会造成了线程和进程的浪费,所以异步IO会是首选:

    • asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

    • asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架。

    6、async io(异步IO)示例一

    • asyncio 不支持HTTP请求,只支持TCP
    import asyncio
    
    @asyncio.coroutine
    def func1():
        print('before...func1......')
        yield from asyncio.sleep(5)
        print('end...func1......')
    
    tasks = [func1(), func1()]
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

    7、async io(异步IO)示例二

    • 把发送的数据封装成HTTP请求的方式
    import asyncio
    
    
    @asyncio.coroutine
    def fetch_async(host, url='/'):
        print(host, url)
        reader, writer = yield from asyncio.open_connection(host, 80)
    
        request_header_content = """GET %s HTTP/1.0
    Host: %s
    
    """ % (url, host,)
        request_header_content = bytes(request_header_content, encoding='utf-8')
    
        writer.write(request_header_content)
        yield from writer.drain()
        text = yield from reader.read()
        print(host, url, text)
        writer.close()
    
    tasks = [
        fetch_async('www.cnblogs.com', '/wupeiqi/'),
        fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
    ]
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

    8、asyncio + aiohttp

    • aiohttp帮我们封装了HTTP数据包
    • 两个模块组合实现异步IO
    import aiohttp
    import asyncio
    
    @asyncio.coroutine
    def fetch_async(url):
        print(url)
        response = yield from aiohttp.request('GET', url)
        # data = yield from response.read()
        # print(url, data)
        print(url, response)
        response.close()
    
    tasks = [fetch_async('http://www.google.com/'), fetch_async('http://www.chouti.com/')]
    
    event_loop = asyncio.get_event_loop()
    results = event_loop.run_until_complete(asyncio.gather(*tasks))
    event_loop.close()

    9、asyncio + requests

    • requests帮我们封装了HTTP数据包
    import asyncio
    import requests
    
    @asyncio.coroutine
    def fetch_async(func, *args):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(None, func, *args)
        response = yield from future
        print(response.url, response.content)
    
    tasks = [
        fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
        fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
    ]
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()

    10、gevent + requests

    • Python内部socket在发送完数据后等待接收数据,是阻塞的,monkey.patch_all()之后,就会把内部所有的socket换成gevent封装的异步IO操作;

    • gevent是第三方库,通过greenlet实现协程,greenlet可以实现协程,不过每一次都要人为的去指向下一个该执行的协程,显得太过麻烦。

    • 当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

    • 协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。
    import gevent
    
    import requests
    from gevent import monkey
    
    monkey.patch_all()
    
    
    def fetch_async(method, url, req_kwargs):
        print(method, url, req_kwargs)
        response = requests.request(method=method, url=url, **req_kwargs)
        print(response.url, response.content)
    
    # ##### 发送请求 #####
    gevent.joinall([
        gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
        gevent.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
        gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
    ])
    
    # ##### 发送请求(协程池控制最大协程数量) #####
    # from gevent.pool import Pool
    # pool = Pool(5)最多同时5个协程
    # gevent.joinall([
    #     pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
    #     pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
    #     pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
    # ])

    11、grequests

    • gevent + requests组合成一个模块
    import grequests
    
    
    request_list = [
        grequests.get('http://httpbin.org/delay/1', timeout=0.001),
        grequests.get('http://fakedomain/'),
        grequests.get('http://httpbin.org/status/500')
    ]
    
    
    # ##### 执行并获取响应列表 #####
    # response_list = grequests.map(request_list)
    # print(response_list)
    
    
    # ##### 执行并获取响应列表(处理异常) #####
    # def exception_handler(request, exception):
    # print(request,exception)
    #     print("Request failed")
    
    # response_list = grequests.map(request_list, exception_handler=exception_handler)
    # print(response_list)

    12、Twisted示例

    from twisted.web.client import getPage, defer
    from twisted.internet import reactor
    
    def one_done(arg):
      print('finished...')

    def all_done(arg): reactor.stop() def callback(contents): print(contents) deferred_list = [] # 列表里是一些特殊对象,封装了已经向URL发送请求的对象 url_list = ['http://www.bing.com', 'http://www.baidu.com', ] for url in url_list: deferred = getPage(bytes(url, encoding='utf8'))#发送HTTP请求 deferred.addCallback(callback)#执行回调函数 deferred_list.append(deferred) dlist = defer.DeferredList(deferred_list) dlist.addBoth(all_done)#给每个对象添加回调函数 reactor.run()#检测是否有执行完成的请求,每完成一个执行一次one_done,等所有的请求都回来,执行all_done(),这是个死循环,需要all_done来停止它

    13、Tornado

    from tornado.httpclient import AsyncHTTPClient
    from tornado.httpclient import HTTPRequest
    from tornado import ioloop
    
    COUNT = 0
    def handle_response(response):
        """
        处理返回值内容(需要维护计数器,来停止IO循环),调用 ioloop.IOLoop.current().stop()
        """
        global COUNT
        COUNT -= 1
        if response.error:
            print("Error:", response.error)
        else:
            print(response.body)
        if COUNT == 0:
            ioloop.IOLoop.current().stop()
    
    def func():
        url_list = [
            'http://www.baidu.com',
            'http://www.bing.com',
        ]
        global COUNT
        COUNT = len(url_list)
        for url in url_list:
            print(url)
            http_client = AsyncHTTPClient()
            http_client.fetch(HTTPRequest(url), handle_response)
    
    ioloop.IOLoop.current().add_callback(func)
    ioloop.IOLoop.current().start() # 也是个死循环,需要自定义一个停止条件,一个简单的计数器

    14、Twisted更多

    from twisted.internet import reactor
    from twisted.web.client import getPage
    import urllib.parse
    
    
    def one_done(arg):
        print(arg)
        reactor.stop()
    
    post_data = urllib.parse.urlencode({'check_data': 'adf'})
    post_data = bytes(post_data, encoding='utf8')
    headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
    response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),
                       method=bytes('POST', encoding='utf8'),
                       postdata=post_data,
                       cookies={},
                       headers=headers)
    response.addBoth(one_done)
    
    reactor.run()

    总结:以上选择使用的优先级为:

    grequests(gevent+requests) --> Twisted --> Tornado --> asyncio

    以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】

    15、自定义异步IO模块

    • IO多路复用:监听多个socket对象(while循环),谁有变化就处理谁,利用这个特性,可以开发出很多操作,比如异步IO模块;

    • 异步IO:当进程执行到一个IO(等待外部数据)的时候,不等待,直到数据接收成功,再回来处理,其实就是回调;

    • 利用非阻塞的socket+IO多路复用,可以实现伪并发;

    •  精简版
    import select
    import socket
    import time
    
    
    class HttpRequest(object):
        """封装请求和相应的基本数据"""
        def __init__(self, sock, host, callback):
            self.sock = sock
            self.callback = callback
            self.host = host
    
        def fileno(self):
            """请求sockect对象的文件描述符,用于select监听"""
            return self.sock.fileno()
    
    class HttpResponse:
        def __init__(self,recv_data):
            self.recv_data = recv_data
            self.header_dict = {}
            self.body = None
            self.initialize()
    
        def initialize(self):
            # 把响应头和响应体分开
            headers, body = self.recv_data.split(b'
    
    ', 1)
            self.body = body
            header_list = headers.split(b'
    ')
            for head in header_list:
                head = str(head,encoding='utf-8')
                v = head.split(':',1)
                if len(v) == 2:
                    self.header_dict[v[0]] = v[1]
                elif len(v) == 1:
                    self.header_dict['method'] = v[0]
    
    
    class AsyncRequest(object):
        def __init__(self):
            self.conn = []  # 检测是否有数据返回
            self.connections = []#检测是否已经链接成功
    
        def add_request(self, host, callback,):
            """创建一个要请求"""
            try:
                sk = socket.socket()
                sk.setblocking(False)
                sk.connect((host, 80))
            except BlockingIOError as e:
                pass
            # print('已经向远程发送连接的请求')
            req = HttpRequest(sk, host, callback)
            self.connections.append(req)
            self.conn.append(req)
    
        def run(self):
            """事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""
            while True:
                rlist, wlist, elist = select.select(self.conn, self.connections, self.conn, 0.05)
    
                for w in wlist:
                    # 已经连接成功远程服务器,开始向远程发送请求数据
                    print(w.host,'连接成功。。。')
                    data = "GET / HTTP/1.0
    Host:%s
    
    "%(w.host,)
                    w.sock.sendall(bytes(data,encoding='utf-8'))
                    # 连接成功,发送请求之后,移除监听对象
                    self.connections.remove(w)
    
                for r in rlist:
                    sock = r.sock
                    recv_data = bytes()
                    while True: # 服务端返回的数据可能很多,需要循环接收
                        try:
                            data = sock.recv(8096)
                            recv_data += data
                            r.write(recv_data)
                        except Exception as e:
                            break
                    # print(recv_data)
                    response = HttpResponse(recv_data)
                    r.callback(r.host,response)
                    sock.close() # 接收完成,关闭链接
                    self.conn.remove(r) # 移除监听对象
    
                # 如果接收数据的对象列表为空,说明所有接收数据完成,结束循环
                if len(self.conn) == 0:
                    break
    
    
    if __name__ == '__main__':
        def callback_1(host,response):
            print(host,'保存到文件',response.header_dict,response.body)
    
        def callback_2(host,response):
            print(host,'保存到数据库',response.header_dict,response.body)
    
        obj = AsyncRequest()
        url_list = [
            {'host': 'www.cnblogs.com','callback': callback_1},
            {'host': 'www.baidu.com','callback': callback_2},
            {'host': 'www.zhihu.com', 'callback': callback_2},
        ]
        for item in url_list:
            obj.add_request(**item)
    
        obj.run()
    • 增强版
    import select
    import socket
    import time
    
    
    class AsyncTimeoutException(TimeoutError):
        """
        请求超时异常类
        """
    
        def __init__(self, msg):
            self.msg = msg
            super(AsyncTimeoutException, self).__init__(msg)
    
    
    class HttpContext(object):
        """封装请求和相应的基本数据"""
    
        def __init__(self, sock, host, port, method, url, data, callback, timeout=5):
            """
            sock: 请求的客户端socket对象
            host: 请求的主机名
            port: 请求的端口
            port: 请求的端口
            method: 请求方式
            url: 请求的URL
            data: 请求时请求体中的数据
            callback: 请求完成后的回调函数
            timeout: 请求的超时时间
            """
            self.sock = sock
            self.callback = callback
            self.host = host
            self.port = port
            self.method = method
            self.url = url
            self.data = data
    
            self.timeout = timeout
    
            self.__start_time = time.time()
            self.__buffer = []
    
        def is_timeout(self):
            """当前请求是否已经超时"""
            current_time = time.time()
            if (self.__start_time + self.timeout) < current_time:
                return True
    
        def fileno(self):
            """请求sockect对象的文件描述符,用于select监听"""
            return self.sock.fileno()
    
        def write(self, data):
            """在buffer中写入响应内容"""
            self.__buffer.append(data)
    
        def finish(self, exc=None):
            """在buffer中写入响应内容完成,执行请求的回调函数"""
            if not exc:
                response = b''.join(self.__buffer)
                self.callback(self, response, exc)
            else:
                self.callback(self, None, exc)
    
        def send_request_data(self):
            content = """%s %s HTTP/1.0
    Host: %s
    
    %s""" % (
                self.method.upper(), self.url, self.host, self.data,)
    
            return content.encode(encoding='utf8')
    
    
    class AsyncRequest(object):
        def __init__(self):
            self.fds = []
            self.connections = []
    
        def add_request(self, host, port, method, url, data, callback, timeout):
            """创建一个要请求"""
            client = socket.socket()
            client.setblocking(False)
            try:
                client.connect((host, port))
            except BlockingIOError as e:
                pass
                # print('已经向远程发送连接的请求')
            req = HttpContext(client, host, port, method, url, data, callback, timeout)
            self.connections.append(req)
            self.fds.append(req)
    
        def check_conn_timeout(self):
            """检查所有的请求,是否有已经连接超时,如果有则终止"""
            timeout_list = []
            for context in self.connections:
                if context.is_timeout():
                    timeout_list.append(context)
            for context in timeout_list:
                context.finish(AsyncTimeoutException('请求超时'))
                self.fds.remove(context)
                self.connections.remove(context)
    
        def running(self):
            """事件循环,用于检测请求的socket是否已经就绪,从而执行相关操作"""
            while True:
                r, w, e = select.select(self.fds, self.connections, self.fds, 0.05)
    
                if not self.fds:
                    return
    
                for context in r:
                    sock = context.sock
                    while True:
                        try:
                            data = sock.recv(8096)
                            if not data:
                                self.fds.remove(context)
                                context.finish()
                                break
                            else:
                                context.write(data)
                        except BlockingIOError as e:
                            break
                        except TimeoutError as e:
                            self.fds.remove(context)
                            self.connections.remove(context)
                            context.finish(e)
                            break
    
                for context in w:
                    # 已经连接成功远程服务器,开始向远程发送请求数据
                    if context in self.fds:
                        data = context.send_request_data()
                        context.sock.sendall(data)
                        self.connections.remove(context)
    
                self.check_conn_timeout()
    
    
    if __name__ == '__main__':
        def callback_func(context, response, ex):
            """
            :param context: HttpContext对象,内部封装了请求相关信息
            :param response: 请求响应内容
            :param ex: 是否出现异常(如果有异常则值为异常对象;否则值为None)
            :return:
            """
            print(context, response, ex)
    
        obj = AsyncRequest()
        url_list = [
            {'host': 'www.google.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
             'callback': callback_func},
            {'host': 'www.baidu.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
             'callback': callback_func},
            {'host': 'www.bing.com', 'port': 80, 'method': 'GET', 'url': '/', 'data': '', 'timeout': 5,
             'callback': callback_func},
        ]
        for item in url_list:
            print(item)
            obj.add_request(**item)
    
        obj.running()
  • 相关阅读:
    AWTK-MVVM 在 STM32H743 上的移植笔记
    windows 中文 unicode 编码显示
    SpringBoot项目jar包运行
    Activiti中的互斥网关、并行网关、兼容网关、事件网关
    【LeetCode】739.每日温度(5种方法,详细图解)
    【LeetCode】20.有效的括号(使用栈,动图详解)
    你知道权限管理的RBAC模型吗?
    关闭Win10自动更新
    iOS 中如何判断当前是2G/3G/4G/5G/WiFi
    GCD API 记录 (三)
  • 原文地址:https://www.cnblogs.com/charliedaifu/p/10573499.html
Copyright © 2011-2022 走看看