zoukankan      html  css  js  c++  java
  • 高性能爬虫相关(IO多路复用,异步非阻塞)

    高性能爬虫相关(IO多路复用,异步非阻塞)

     

    说到提高性能,我们可以想到的是利用多进程、多线程以及单线程实现并发,由于爬虫爬取信息时IO操作较多,所以利用单线程实现并发是较好的选择

    爬虫本质上还是建立socket连接,通过http请求获取数据

    复制代码
    本质:
        sk = socket()
        # 阻塞
        sk.connect(('www.cnblogs.com',80))
        
        sk.sendall(b"GET /wupeiqi http1.1
    .....
    
    ")
        sk.sendall(b"POST /wupeiqi http1.1
    .....
    
    user=alex&pwd=123")
        
        # 阻塞
        data = sk.recv(8096)
        
        sk.close()
    复制代码

    IO多路复用和异步非阻塞

    IO多路复用:监听多个socket是否发生变化,可以监听到socket是否已经建立连接,是否接收到数据

    复制代码
    import select 
    
    while True:
        # 让select模块帮助我们去检测sk1/sk2两个socket对象是否已经发生“变化”
        # r=[]
        #       如果r中有值
        #        r=[sk1,]    表示:sk1这个socket已经获取到响应的内容
        #        r=[sk1,sk2] 表示:sk1,sk2两个socket已经获取到响应的内容
        # w=[],如果w中有值
        #        w=[sk1,],   表示:sk1这个socket已经连接成功;
        #        w=[sk1,sk2],表示:sk1/sk2两个socket已经连接成功;
        
        r,w,e = select.select([sk1,sk2],[sk1,sk2],[],0.5)
        
        for client in w:
            content = "GET /wupeiqi HTTP/1.1
    User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) 
            client.sendall(content.encode('utf-8'))
        
        for cli in r:
            response = cli.recv(8096)
            print(response)
            cli.close()
    复制代码

    异步非阻塞

    复制代码
    - 非阻塞
        - 不等待(报错,捕捉异常),建立socket连接以及等待接收数据时不阻塞
        - 代码:
            sk = socket.socket()
            sk.setblocking(False)
    - 异步:
        - 回调,当达到某个指定的状态之后,自动调用特定函数。
    复制代码

    如何自定义异步非阻塞模块?
    基于socket设置setblocking和IO多路复用来实现。
    爬虫发送Http请求本质创建socket对象;
    IO多路复用"循环"监听socket是否发生变化,一旦发生变化, 我们可以自定义操作(触发某个函数的执行)

    自定义异步非阻塞模块

    复制代码
    import socket
    import select
    
    
    class Request(object):
        def __init__(self, sk, callback):
            self.sk = sk
            self.callback = callback
    
        def fileno(self):
            return self.sk.fileno()
    
    
    class AsyncHttp(object):
        def __init__(self):
            self.fds = []
            self.conn = []
    
        def add(self, url, callback):
            sk = socket.socket()
            sk.setblocking(False)
            try:
                sk.connect((url, 80))
            except BlockingIOError as e:
                pass
            req = Request(sk, callback)
            self.fds.append(req)
            self.conn.append(req)
    
        def run(self):
            """
            监听socket是否发生变化
            :return:
            """
            while True:
                """
                fds=[req(sk,callback),req,req]
                conn=[req,req,req]
                """
                r, w, e = select.select(self.fds, self.conn, [], 0.05)  # sk.fileno() = req.fileno()
    
                # w=已经连接成功的socket列表 w=[sk1,sk2]
                for req in w:
                    req.sk.sendall(
                        b'GET /wupeiqi HTTP/1.1
    User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36
    
    ')
                    # 已经连接成功的socket,无需再继续监听
                    self.conn.remove(req)
    
                # r=服务端给用户返回数据了 r=[sk1,]
                for req in r:
                    data = req.sk.recv(8096)
                    req.callback(data)
    
                    req.sk.close()  # 断开连接:短连接、无状态
                    self.fds.remove(req)  # 不再监听
    
                if not self.fds:
                    break
    
    
    ah = AsyncHttp()
    
    
    def callback1(data):
        print(11111, data)
    
    
    def callback2(data):
        print(22222, data)
    
    
    def callback3(data):
        print(333333, data)
    
    
    ah.add('www.cnblogs.com', callback1)  # sk1
    ah.add('www.baidu.com', callback2)  # sk2
    ah.add('www.luffycity.com', callback3)  # sk3
    
    ah.run()
    复制代码

    本质:socket+IO多路复用

    IO多路复用的作用

    select,内部循环检测socket是否发生变化;1024个socket
    poll,内部循环检测socket是否发生变化;
    epoll,回调的方式

    协程

    什么是协程

    - 是“微线程”,不存在;是由程序员人为创造出来并控制程序:先执行某段代码、再跳到某处执行某段代码。

    - 如果遇到非IO请求来回切换:性能更低。

    - 如果遇到IO(耗时)请求来回切换:性能高、实现并发(本质上利用IO等待的过程,再去干一些其他的事)

     示例

    复制代码
    情况一:
        import asyncio
        import requests
    
    
        @asyncio.coroutine
        def fetch_async(func, *args):
            loop = asyncio.get_event_loop()
            future = loop.run_in_executor(None, func, *args)
            response = yield from future
            print(response.url, len(response.content))
    
    
        tasks = [
            fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
            fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
        ]
    
        loop = asyncio.get_event_loop()
        results = loop.run_until_complete(asyncio.gather(*tasks))
        loop.close()
    情况二:
        import gevent
        from gevent import monkey
        monkey.patch_all()
        import requests
    
    
        def fetch_async(method, url, req_kwargs):
            print(method, url, req_kwargs)
            response = requests.request(method=method, url=url, **req_kwargs)
            print(response.url, len(response.content))
    
        # ##### 发送请求 #####
        gevent.joinall([
            gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/', req_kwargs={}),
            gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}),
            gevent.spawn(fetch_async, method='get', url='https://www.sogo.com/', req_kwargs={}),
        ])
        # ##### 发送请求(协程池控制最大协程数量) #####
        # from gevent.pool import Pool
        # pool = Pool(None)
        # gevent.joinall([
        #     pool.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
        #     pool.spawn(fetch_async, method='get', url='https://www.yahoo.com/', req_kwargs={}),
        #     pool.spawn(fetch_async, method='get', url='https://www.github.com/', req_kwargs={}),
        # ])
                    
    情况三:
        from twisted.web.client import getPage, defer
        from twisted.internet import reactor
    
    
        def all_done(arg):
            reactor.stop()
    
    
        def callback(contents):
            print(contents)
    
    
        d_list = []
    
        url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
        for url in url_list:
            d = getPage(bytes(url, encoding='utf8'))
            d.addCallback(callback)
    
            d_list.append(d)
    
        # 用于检查是否页面已经全部下载完成,如果已下载完成那么,就停止循环。
        dlist = defer.DeferredList(d_list)
        dlist.addBoth(all_done) #
    
        reactor.run()
    复制代码
  • 相关阅读:
    CentOS 6.4 利用 Awstats 7.2 分析 Nginx 日志
    CentOS 6.4 x64 postfix + dovecot + 虚拟用户认证
    配置日志logwarch 每天发送到邮箱
    CentOSx64 安装 Gearmand 和 Gearman php扩展
    Redis之基本介绍
    Synchronized锁的基本介绍
    Lock的基本介绍
    java之threadlocal的使用
    Java之事务的基本应用
    linux之cp和scp的使用
  • 原文地址:https://www.cnblogs.com/xyhh/p/10860559.html
Copyright © 2011-2022 走看看