zoukankan      html  css  js  c++  java
  • 爬虫之高性能相关

      问题:我给你10个的url,你帮我去把10url的网址下载。

    传统方式

    # 传统串行方式
    import requests
    import time
    
    urls = ['https://github.com/' for _ in range(10)]
    start = time.time()
    for url in urls:
        response = requests.get(url)
        # print(response)
    
    spend_time = time.time() - start
    print(spend_time)
    # 12.493084907531738

    一、多进程和多线程实现并发

    import time
    from concurrent.futures import ProcessPoolExecutor
    
    import requests
    
    start = time.time()
    
    
    def task(url):
        response = requests.get(url)
        return response
    
    
    def done(future, *args, **kwargs):
        response = future.result()
        print(response.url)
    
    
    if __name__ == '__main__':
        url_list = ['https://www.douban.com/' for _ in range(100)]
        with ProcessPoolExecutor(max_workers=10) as pool:
            for url in url_list:
                v = pool.submit(task, url)
                v.add_done_callback(done)
        print(time.time() - start)
        # 11.862671136856079
    

      花费时间 11.862671136856079秒

    #########编写方式二#########
    import requests
    from concurrent.futures import ThreadPoolExecutor
    
    pool = ThreadPoolExecutor()
    
    
    def task(url):
        response = requests.get(url)
        return response
    
    
    def done(future,*args,**kwargs):
        response = future.result()
        print(response.url)
    
    
    if __name__ == '__main__':
        start = time.time()
        with ThreadPoolExecutor(max_workers=os.cpu_count()) as pool:
            url_list = ['https://www.douban.com/' for _ in range(100)]
            for url in url_list:
                v = pool.submit(task,url)
                v.add_done_callback(done)
        print(time.time() - start)
        # 8.904985904693604
    

      花费时间 8.904985904693604秒

      由于GIL限制,建议:IO密集的任务,用ThreadPoolExecutor;CPU密集任务,用ProcessPoolExcutor。

    二、基于事件循环的异步IO

     1. asyncio + aiohttp

    import time
    import asyncio
    
    import aiohttp
    
    
    async def start_request(session, url):
        sem = asyncio.Semaphore(10, loop=loop)
        async with sem:
            print(f'make request to {url}')
            with async_timeout.timeout(60):
                async with session.get(url, verify_ssl=False) as response:
                    if response.status == 200:
                        print(response.status)
    
    
    async def run(urls):
        conn = aiohttp.TCPConnector(ssl=False,
                                    limit=60,  # 连接池在windows下不能太大, <500
                                    use_dns_cache=True)
        async with aiohttp.ClientSession(connector=conn, loop=loop) as session:
            datas = await asyncio.gather(*[start_request(session, url) for url in urls], return_exceptions=True)
            for ind, url in enumerate(urls):
                if isinstance(datas[ind], Exception):
                    print(f"{ind}, {url}: 下载失败 请重新下载:")
    
    
    if __name__ == '__main__':
        start = time.time()
        urls = (('http://www.baidu.com/') for _ in range(100))
        loop = asyncio.get_event_loop()
        loop.run_until_complete(run(urls))
        print(time.time() - start)
        # 1.4860568046569824
    

     2. Twisted 

    import time
    
    from twisted.internet import defer
    from twisted.internet import reactor
    from twisted.web.client import getPage
    
    start = time.time()
    
    
    def one_done(content, arg):
        response = content.decode('utf-8')
        # print(response)
        print(arg)
    
    
    def all_done(arg):
        reactor.stop()
        print(time.time() - start)
    
    
    @defer.inlineCallbacks
    def task(url):
        res = getPage(bytes(url, encoding='utf8'))  # 发送Http请求
        res.addCallback(one_done, url)
        yield res
    
    
    url_list = ('http://www.cnblogs.com' for _ in range(100))
    
    defer_list = []  # [特殊,特殊,特殊(已经向url发送请求)]
    for url in url_list:
        v = task(url)
        defer_list.append(v)
    
    d = defer.DeferredList(defer_list)
    d.addBoth(all_done)
    
    reactor.run()  # 死循环
    # 5.039534091949463
    

     3. tornado

    import time
    
    from tornado.httpclient import AsyncHTTPClient
    from tornado.httpclient import HTTPRequest
    from tornado import ioloop
    
    COUNT = 0
    start = time.time()
    
    
    def handle_response(response):
        global COUNT
        COUNT -= 1
        if response.error:
            print("Error:", response.error)
        else:
            # print(response.body)
            print(response.request)
            # 方法同twisted
            # ioloop.IOLoop.current().stop()
        if COUNT == 0:
            ioloop.IOLoop.current().stop()
    
    
    def func():
        url_list = ['http://www.baidu.com' for _ in range(100)]
        global COUNT
        COUNT = len(url_list)
        for url in url_list:
            print(url)
            http_client = AsyncHTTPClient()
            http_client.fetch(HTTPRequest(url), handle_response)
    
    
    if __name__ == '__main__':
        ioloop.IOLoop.current().add_callback(func)
        ioloop.IOLoop.current().start()  # 死循环
        print(time.time() - start)
        # 3.0621743202209473

    以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】

    """
    ########http请求本质,IO阻塞########
    sk = socket.socket()
    #1.连接
    sk.connect(('www.baidu.com',80,)) #阻塞
    print('连接成功了')
    #2.连接成功后发送消息
    sk.send(b"GET / HTTP/1.0
    Host: baidu.com
    
    ")
    
    #3.等待服务端响应
    data = sk.recv(8096)#阻塞
    print(data) #
    
    区分响应头和影响体
    
    #关闭连接
    sk.close()
    """
    """
    ########http请求本质,IO非阻塞########
    sk = socket.socket()
    sk.setblocking(False)
    #1.连接
    try:
        sk.connect(('www.baidu.com',80,)) #非阻塞,但会报错
        print('连接成功了')
    except BlockingIOError as e:
        print(e)
    
    #2.连接成功后发送消息
    sk.send(b"GET / HTTP/1.0
    Host: baidu.com
    
    ")
    
    #3.等待服务端响应
    data = sk.recv(8096)#阻塞
    print(data) #
    
    区分响应头和影响体
    
    #关闭连接
    sk.close()
    """
    异步非阻塞请求的本质

    自定义异步非阻塞IO

    class HttpRequest:
        def __init__(self,sk,host,callback):
            self.socket = sk
            self.host = host
            self.callback = callback
    
        def fileno(self):
            return self.socket.fileno()
    
    
    class HttpResponse:
        def __init__(self,recv_data):
            self.recv_data = recv_data
            self.header_dict = {}
            self.body = None
    
            self.initialize()
    
        def initialize(self):
            headers, body = self.recv_data.split(b'
    
    ', 1)
            self.body = body
            header_list = headers.split(b'
    ')
            for h in header_list:
                h_str = str(h,encoding='utf-8')
                v = h_str.split(':',1)
                if len(v) == 2:
                    self.header_dict[v[0]] = v[1]
    
    
    class AsyncRequest:
        def __init__(self):
            self.conn = []
            self.connection = [] # 用于检测是否已经连接成功
    
        def add_request(self,host,callback):
            try:
                sk = socket.socket()
                sk.setblocking(0)
                sk.connect((host,80))
            except BlockingIOError as e:
                pass
            request = HttpRequest(sk,host,callback)
            self.conn.append(request)
            self.connection.append(request)
    
        def run(self):
    
            while True:
                rlist,wlist,elist = select.select(self.conn,self.connection,self.conn,0.05)
                for w in wlist:
                    print(w.host,'连接成功...')
                    # 只要能循环到,表示socket和服务器端已经连接成功
                    tpl = "GET / HTTP/1.0
    Host:%s
    
    "  %(w.host,)
                    w.socket.send(bytes(tpl,encoding='utf-8'))
                    self.connection.remove(w)
                for r in rlist:
                    # r,是HttpRequest
                    recv_data = bytes()
                    while True:
                        try:
                            chunck = r.socket.recv(8096)
                            recv_data += chunck
                        except Exception as e:
                            break
                    response = HttpResponse(recv_data)
                    r.callback(response)
                    r.socket.close()
                    self.conn.remove(r)
                if len(self.conn) == 0:
                    break
    
    
    def f1(response):
        print('保存到文件',response.header_dict)
    
    
    def f2(response):
        print('保存到数据库', response.header_dict)
    
    
    url_list = [
        {'host':'www.youku.com','callback': f1},
        {'host':'v.qq.com','callback': f2},
        {'host':'www.cnblogs.com','callback': f2},
    ]
    
    
    if __name__ == '__main__':
        req = AsyncRequest()
        for item in url_list:
            req.add_request(item['host'], item['callback'])
        req.run()
    自定异步非阻塞IO



  • 相关阅读:
    hdu2063:过山车
    牛客网:ph试纸
    牛客网:石子阵列
    最短路
    POJ1067:取石子游戏
    实现DataGridView实时更新数据
    SendMessage API
    使用Intel IPT技术保护您的帐号安全
    它是对 ACME(automated certificate management environment) 协议的实现,只要实现了 ACME 协议的客户端都可以跟它交互。
    time out 超时
  • 原文地址:https://www.cnblogs.com/zhangyafei/p/10244633.html
Copyright © 2011-2022 走看看