zoukankan      html  css  js  c++  java
  • 自定义异步IO模块

    高性能爬虫

    假设有3个url需要发请求。

    串行

    import requests
    
    urls_list = [
        'https://www.cnblogs.com/longyunfeigu/p/9491496.html',
        'https://dig.chouti.com/',
        'https://dig.chouti.com/r/pic/hot/1'
    ]
    
    for url in urls_list:
        requests.get(url)
    

    串行肯定是最慢的,怎么改进?第一反应:开多个线程。好吧,lowB 第一反应都是这个方法

    多线程

    import time
    from concurrent.futures import ThreadPoolExecutor
    import requests
    
    urls_list = [
        'https://www.cnblogs.com/longyunfeigu/p/9491496.html',
        'https://dig.chouti.com/',
        'https://dig.chouti.com/r/pic/hot/1'
    ]
    
    start_time = time.time()
    pool = ThreadPoolExecutor(3)
    def task(url):
        requests.get(url)
        print('ending')
    
    for i in urls_list:
        pool.submit(task, i)
    pool.shutdown()
    end_time = time.time()
    print(end_time - start_time)
    

    开多个线程虽然可以实现并发,但是开线程总归是要耗费资源的,那能不能利用一个线程帮忙提高效率呢,这就需要异步非阻塞登场了

    异步非阻塞

    什么是异步非阻塞?异步非阻塞就是 异步+非阻塞。异步就是回调,非阻塞指的是单个任务不等待。
    在网络IO中,有两个阶段会出现"浪费时间"的情况,一个阶段是connect的时候,请求发出去等待信息回来通知这个客户端socket就绪可以发http报文信息。
    另一个阶段是recv接受数据的时候(发送数据不需要等待,直接发就行)需要等待,因为服务端需通过网络把数据发送过来。非阻塞就是不再等待,connect原先需要等待是吧,ok现在我不等了,
    recv需要等待是吧,ok我也不等了。不等报错咋办?报错就报错呗,大不了异常捕获就行。如果是非阻塞socket,那么3个url的请求在connect的时候都发出去了,注意,即使报错,请求也是如弦上之箭射出去了。
    那么等connect成功后就应该发送数据了呀,这时候就体现出回调了,成功就回来调用一段代码。所有的抽象的话语都可以结合大致的代码来理解。以后如果遇到一些抽象话语不好理解,那么就应该用代码去理解这个抽象话语。
    有了异步非阻塞的概念是,那么我们就可以利用一个线程把所有的连接都发送出去,等连接都发送出去而且任意一个connect的信息都没返回的时候,线程就只好等待了,
    等有connect的信息返回表示这个客户端socket准备就绪可以发http报文了,这时候再去执行发送数据的代码。这个过程说起来简单,但有一个问题还不清晰:程序怎么知道哪个socket是就绪的?这个大致有2种解决方式:要么是通知(叫醒服务,这个更偏向底层,我们写的应用程序代码可以利用底层已经实现好的技术);要么是用while死循环不断去检测 。python中有一些模块已经帮我们实现了异步非阻塞的功能。

    twisted

    from twisted.web.client import getPage, defer
    from twisted.internet import reactor
    
    
    def all_done(arg):
        reactor.stop()
    
    
    def callback(contents):
        print(contents)
    
    
    deferred_list = []
    
    url_list = ['http://www.bing.com', 'http://www.baidu.com', ]
    for url in url_list:
        deferred = getPage(bytes(url, encoding='utf8'))
        # 次数使用回调函数,猜测用到了异步功能
        deferred.addCallback(callback)
        deferred_list.append(deferred)
    
    dlist = defer.DeferredList(deferred_list)
    dlist.addBoth(all_done)
    # 死循环,不断地处理deferred_list里的socket对象,按理说处理完成就应该自动停止了,但可能是这个框架架构设计问题,不能做到自动停止,需要手动停
    reactor.run()
    

    gevent

    gevent 是基于greenlet做的,greenlet实现了协程。协程和线程不一样,协程不是真实存在的,是程序员伪造出来的具有类似于线程的切换效果。单纯的协程不能完成提高效率的方式,如greenlet需要手动切换,所以这出现了gevent。
    gevent 可以在遇到IO阻塞的时候自动切换协程的运行,这样就可以提升效率。而协程切来切去的功能正好可以利用过来实现异步非阻塞

    import gevent
    from gevent import monkey
    
    monkey.patch_all()
    import requests
    
    def fetch_async(method, url, req_kwargs):
        print(method, url, req_kwargs)
        response = requests.request(method=method, url=url, **req_kwargs)
        print(response.url)
    
    # ##### 发送请求 #####
    gevent.joinall([
        gevent.spawn(fetch_async, method='get', url='https://www.cnblogs.com/longyunfeigu/', req_kwargs={}),
        gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}),
        gevent.spawn(fetch_async, method='get', url='https://github.com/', req_kwargs={}),
    ])
    

    requests模块发请求,内部也是用socket,先去connect,然后发送请求,最后收到响应

    asyncio

    asyncio默认只支持发tcp层的报文,想要发http请求,就需要自己封装http报文

    import asyncio
    
    @asyncio.coroutine
    def fetch_async(host, url='/'):
        print(host, url)
        reader, writer = yield from asyncio.open_connection(host, 80)
    
        request_header_content = """GET %s HTTP/1.0
    Host: %s
    
    """ % (url, host,)
        request_header_content = bytes(request_header_content, encoding='utf-8')
    
        writer.write(request_header_content)
        yield from writer.drain()
        text = yield from reader.read()
        print(host, url, text)
        writer.close()
    
    tasks = [
        fetch_async('www.cnblogs.com', '/longyunfeigu/'),
        fetch_async('baidu.com', '/')
    ]
    
    loop = asyncio.get_event_loop()
    results = loop.run_until_complete(asyncio.gather(*tasks))
    loop.close()
    

    自定义IO模块

    运用知识:非阻塞socket + IO多路复用

    from socket import *
    import select
    import time
    
    class HttpContext(object):
        def __init__(self, sock, url_dict):
            import time
            self._start_time = time.time()
            self.sock = sock
            self.port = ''
            self.host = ''
            self.method = ''
            self.data = ''
            self.path = ''
            self.content = b''
            self.text = ''
            self.timeout = 0
            self.callback = None
            self.initial(url_dict)
    
        def initial(self, url_dict):
            self.port = url_dict.get('port')
            self.host = url_dict.get('host')
            self.method = url_dict.get('method')
            self.data = url_dict.get('data')
            self.path = url_dict.get('path')
            self.timeout = url_dict.get('timeout', 5)
            self.callback = url_dict.get('callback')
    
        def connect(self):
            self.sock.connect((self.host, self.port))
    
        def fileno(self):
            return self.sock.fileno()
    
        def send_get(self):
            content = """%s %s HTTP/1.0
    Host: %s
    
    """%(
                self.method.upper(), self.path, self.host)
            self.sock.sendall(bytes(content, encoding='utf8'))
    
        def sendall(self):
            if self.method.upper() == 'GET':
                self.send_get()
            elif self.method.upper() == 'POST':
                self.send_post()
            else:
                pass
    
        def send_post(self):
            content = """%s %s HTTP/1.0
    Host: %s
    
    %s"""%(
                self.method.upper(), self.path, self.host, self.data)
            self.sock.sendall(bytes(content, encoding='utf8'))
    
        def recv(self):
            import time
            while 1:
                data = self.sock.recv(8096)
                if not data:
                    break
                self.content += data
                self.text += str(data, encoding='utf8')
                time.sleep(0.1)
            self.finish()
    
        def finish(self, msg=''):
            if msg:
                self.text = msg
                self.content = bytes(msg, encoding='utf8')
            if self.callback:
                self.callback(self.text)
    
    
    class AsynchRequest(object):
        def __init__(self):
            self.conn_socket_list = []
            self.recv_socket_list = []
    
        def add_request(self, **url_dict):
            # 立即发起connect连接
            soc = socket(AF_INET,SOCK_STREAM)
            soc.setblocking(0)
            ctx = HttpContext(soc, url_dict)
            self.conn_socket_list.append(ctx)
            self.recv_socket_list.append(ctx)
            try:
                ctx.connect()
            except BlockingIOError as e:
                pass
                # print('request is sended')
    
        def check_timeout(self):
            # 检验是否超时
            ctime = time.time()
            for ctx in self.recv_socket_list:
                if ctx._start_time + ctx.timeout <= ctime:
                    self.recv_socket_list.remove(ctx)
                    self.conn_socket_list.remove(ctx)
                    ctx.finish('connect超时')
    
        def run(self):
            while 1:
                #  r_list 代表socket对象是否有数据可以读, w_list代表socket是否可以写,也就是是否可以发送数据,写服务端程序一般只需要用到 r_list
                # IO多路复用监听的对象不一定是socket对象,只要对象有fileno方法都能监听,内部也是拿对象的fileno的返回值来监听
                r_list, w_list, e_list = select.select(self.conn_socket_list, self.recv_socket_list, [], 0.05)
                for w in w_list:
                    w.sendall()
                    self.recv_socket_list.remove(w)
                for r in r_list:
                    r.recv()
                    self.conn_socket_list.remove(r)
                if not self.conn_socket_list:
                    break
                self.check_timeout()
    
    def callback(response):
        print(response)
    
    url_list = [
        {'host': 'www.baidu.com','port': 80, 'path':'/', 'name':'baidu', 'method':'GET', 'callback': callback},
        {'host': 'cn.bing.com','port': 80,'path':'/', 'name':'chouti', 'method':'GET'},
    ]
    if __name__ == '__main__':    
        obj = AsynchRequest()
        for i in url_list:
            obj.add_request(**i)
        obj.run()
    

    这里的自定义IO模块是站在客户端的角度来定义的,借助底层的IO多路复用来帮我们检测socket是否已经准备好(不用我们手动写死循环去检测socket对象是否已经准备好)

  • 相关阅读:
    JS和C#对Json的操作
    JS图形化插件利器组件系列 —— Gojs组件
    Android APK反编译 apktool使用教程
    UML系列图
    多线程学习 ---- 系列教程
    大型网站架构之系列
    经典算法题锦集
    基本算法系列15天速成
    居转户--相关信息
    使用C#创建Windows服务
  • 原文地址:https://www.cnblogs.com/longyunfeigu/p/9493551.html
Copyright © 2011-2022 走看看