zoukankan      html  css  js  c++  java
  • 深入理解Python异步编程(上)

    本文代码整理自:深入理解Python异步编程(上)

    参考:A Web Crawler With asyncio Coroutines

    一、同步阻塞方式

    import socket
    
    def blocking_way():
        sock = socket.socket()
        # blocking
        sock.connect(('example.com', 80))
        request = 'GET / HTTP/1.0
    Host: example.com
    
    '
        sock.send(request.encode('ascii'))
        response = b''
        chunk = sock.recv(4096)
        while chunk:
            response += chunk
            # blocking
            chunk = sock.recv(4096)
        return response
    
    def sync_way():
        res = []
        for i in range(10):
            res.append(blocking_way())
        return len(res)
    
    def main():
        start = time.time()
        print(sync_way())
        print(time.time() - start)
    
    
    if __name__ == '__main__':
        import time
        main()
    
    
    # 5.15s
    

    二、同步多线程方式

    import socket
    from concurrent import futures
    
    def blocking_way():
        sock = socket.socket()
        # blocking
        sock.connect(('example.com', 80))
        request = 'GET / HTTP/1.0
    Host: example.com
    
    '
        sock.send(request.encode('ascii'))
        response = b''
        chunk = sock.recv(4096)
        while chunk:
            response += chunk
            # blocking
            chunk = sock.recv(4096)
        return response
    
    def thread_way():
        workers = 10
        with futures.ThreadPoolExecutor(workers) as executor:
            futs = {executor.submit(blocking_way) for i in range(10)}
        return len([fut.result() for fut in futs])
    
    def main():
        start = time.time()
        print(thread_way())
        print(time.time() - start)
    
    if __name__ == '__main__':
        import time
        main()
    
    # 0.52s
    

      

    小提示

    Python中的多线程因为GIL的存在,它们并不能利用CPU多核优势,
    一个Python进程中,只允许有一个线程处于运行状态。
    
    那为什么结果还是如预期,耗时缩减到了十分之一?
    
    因为在做阻塞的系统调用时,例如sock.connect(),sock.recv()时,当前线程会释放GIL,
    让别的线程有执行机会。但是单个线程内,在阻塞调用上还是阻塞的
    
    
    
    Python中 time.sleep 是阻塞的,都知道使用它要谨慎,
    但在多线程编程中,time.sleep 并不会阻塞其他线程。
    

      

    三、非阻塞+回调(即异步非阻塞)方式

    事件循环+回调     实现单线程内异步编程

    事件监听

    OS将I/O状态的变化都封装成了事件,如可读事件、可写事件。
    并且提供了专门的系统模块让应用程序可以接收事件通知。这个模块就是select。
    让应用程序可以通过select注册文件描述符和回调函数。
    当文件描述符的状态发生变化时,select 就调用事先注册的回调函数。
    
    
    select因其算法效率比较低,后来改进成了poll;
    再后来又有进一步改进,BSD内核改进成了kqueue模块,而Linux内核改进成了epoll模块。这四个模块的作用都相同,暴露给程序员使用的API也几乎一致,
    区别在于kqueue 和 epoll 在处理大量文件描述符时效率更高。

    selectors模块

    Python标准库提供的selectors模块是对底层select/poll/epoll/kqueue的封装。
    DefaultSelector类会根据 OS 环境自动选择最佳的模块,
    那在 Linux 2.5.44 及更新的版本上都是epoll了。
    

      

    #!/usr/bin/python3.5
    # encoding: utf-8
    
    import socket
    from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
    
    selector = DefaultSelector()
    stopped = False
    urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
    
    class Crawler:
        def __init__(self, url):
            self.url = url
            self.sock = None
            self.response = b''
    
        def fetch(self):
            self.sock = socket.socket()
            self.sock.setblocking(False)
            try:
                self.sock.connect(('example.com', 80))
            except BlockingIOError:
                pass
            selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
    
        def connected(self, key, mask):
            selector.unregister(key.fd)
            get = 'GET {0} HTTP/1.0
    Host: example.com
    
    '.format(self.url)
            self.sock.send(get.encode('ascii'))
            selector.register(key.fd, EVENT_READ, self.read_response)
    
        def read_response(self, key, mask):
            global stopped
            # 如果响应大于4KB,下一次循环会继续读
            chunk = self.sock.recv(4096)
            if chunk:
                self.response += chunk
            else:
                selector.unregister(key.fd)
                urls_todo.remove(self.url)
                if not urls_todo:
                    stopped = True
    
    # 事件循环
    def loop():
        while not stopped:
            # 阻塞, 直到一个事件发生
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback(event_key, event_mask)
    
    if __name__ == '__main__':
        import time
        start = time.time()
        for url in urls_todo:
            crawler = Crawler(url)
            crawler.fetch()
        loop()
        print(time.time() - start)
    
    # 0.53s

    回调层次过多的缺点:

        - 共享状态管理困难

    在回调的版本中,我们必须在Crawler实例化后的对象self里保存它自己的sock对象。
    
    如果不是采用OOP的编程风格,那需要把要共享的状态接力似的传递给每一个回调。
    
    多个异步调用之间,到底要共享哪些状态,事先就得考虑清楚,精心设计。

        - 错误处理困难 

    一连串的回调构成一个完整的调用链;
    如果其中一环抛了异常怎么办?
    整个调用链断掉,接力传递的状态也会丢失,这种现象称为调用栈撕裂。
    
    所以,为了防止栈撕裂,异常必须以数据的形式返回,而不是直接抛出异常,
    然后每个回调中需要检查上次调用的返回值,以防错误吞没。

    四、Python 对异步I/O的优化之路

    #!/usr/bin/python3.5
    # encoding: utf-8
    
    import socket
    from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
    
    selector = DefaultSelector()
    stopped = False
    urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
    
    
    class Future:
        def __init__(self):
            self.result = None
            self._callbacks = []
    
        def add_done_callback(self, fn):
            self._callbacks.append(fn)
    
        def set_result(self, result):
            self.result = result
            for fn in self._callbacks:
                fn(self)
    
    class Crawler:
        def __init__(self, url):
            self.url = url
            self.response = b''
    
        def fetch(self):
            sock = socket.socket()
            sock.setblocking(False)
            try:
                sock.connect(('example.com', 80))
            except BlockingIOError:
                pass
            f = Future()
    
            def on_connected():
                f.set_result(None)
    
            selector.register(sock.fileno(), EVENT_WRITE, on_connected)
            yield f
            selector.unregister(sock.fileno())
            get = 'GET {0} HTTP/1.0
    Host: example.com
    
    '.format(self.url)
            sock.send(get.encode('ascii'))
    
            global stopped
            while True:
                f = Future()
    
                def on_readable():
                    f.set_result(sock.recv(4096))
    
                selector.register(sock.fileno(), EVENT_READ, on_readable)
                chunk = yield f
                selector.unregister(sock.fileno())
                if chunk:
                    self.response += chunk
                else:
                    urls_todo.remove(self.url)
                    if not urls_todo:
                        stopped = True
                    break
    
    class Task:
        def __init__(self, coro):
            self.coro = coro
            f = Future()
            f.set_result(None)
            self.step(f)
    
        def step(self, future):
            try:
                # send会进入到coro执行, 即fetch, 直到下次yield
                # next_future 为yield返回的对象
                next_future = self.coro.send(future.result)
            except StopIteration:
                return
            next_future.add_done_callback(self.step)
    
    # 事件循环
    def loop():
        while not stopped:
            # 阻塞, 直到一个事件发生
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()
    
    if __name__ == '__main__':
        import time
        start = time.time()
        for url in urls_todo:
            crawler = Crawler(url)
            Task(crawler.fetch())
        loop()
        print(time.time() - start)
    
    # 0.53s
    

    在前辈的基础上做了一点更改:

    #!/usr/bin/python3
    # encoding: utf-8
    
    import socket
    from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
    
    selector = DefaultSelector()
    stopped = False
    urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
    
    
    class Future:
        def __init__(self):
            self.result = None
            self._callback = None   # 原来是用列表来保存
    
        def add_done_callback(self, fn):
            self._callback = fn
    
        def set_result(self, result):
            self.result = result
            # 因为只有一个对应的 Task.step()函数
            if self._callback:
                self._callback(self)
    
    class Crawler:
        def __init__(self, url):
            self.url = url
            self.response = b''
    
        def fetch(self):
            sock = socket.socket()
            sock.setblocking(False)
            try:
                sock.connect(('example.com', 80))
            except BlockingIOError:
                pass
            f = Future()
    
            def on_connected():
                f.set_result(None)
    
            selector.register(sock.fileno(), EVENT_WRITE, on_connected)
            yield f
            selector.unregister(sock.fileno())
            get = 'GET {0} HTTP/1.0
    Host: example.com
    
    '.format(self.url)
            sock.send(get.encode('ascii'))
    
            global stopped
            while True:
                f = Future()
    
                def on_readable():
                    f.set_result(sock.recv(4096))
    
                selector.register(sock.fileno(), EVENT_READ, on_readable)
                chunk = yield f
                selector.unregister(sock.fileno())
                if chunk:
                    self.response += chunk
                else:
                    urls_todo.remove(self.url)
                    if not urls_todo:
                        stopped = True
                    break
    
    class Task:
        def __init__(self, coro):
            self.coro = coro
            f = Future()
            f.set_result(None)
            self.step(f)
    
        def step(self, future):
            try:
                # send会进入到coro执行, 即fetch, 直到下次yield
                # next_future 为yield返回的对象
                next_future = self.coro.send(future.result)
            except StopIteration:
                return
            next_future.add_done_callback(self.step)
            print(next_future._callback)
    
    # 事件循环
    def loop():
        while not stopped:
            # 阻塞, 直到一个事件发生
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()
    
    if __name__ == '__main__':
        import time
        start = time.time()
        c_list = []
        for url in urls_todo:
            crawler = Crawler(url)
            Task(crawler.fetch())
            c_list.append(crawler)
    
        loop()
        # 增加了对爬取内容的输出
        for crawler in c_list:
            print(crawler.response)
        print(time.time() - start)
    

      

     五、用 yield from 改进生成器协程

    yield可以直接作用于普通Python对象,而yield from却不行,

    所以我们对Future还要进一步改造,把它变成一个iterable对象就可以了

    #!/usr/bin/python3.5
    # -*- coding:utf-8 -*-
    
    import socket
    from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
    
    selector = DefaultSelector()
    stopped = False
    urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
    
    
    def connect(sock, address):
        f = Future()
        sock.setblocking(False)
        try:
            sock.connect(address)
        except BlockingIOError:
            pass
    
        def on_connected():
            f.set_result(None)
    
        selector.register(sock.fileno(), EVENT_WRITE, on_connected)
        yield from f
        selector.unregister(sock.fileno())
    
    def read(sock):
        f = Future()
    
        def on_readable():
            f.set_result(sock.recv(4096))
    
        selector.register(sock.fileno(), EVENT_READ, on_readable)
        chunk = yield from f
        selector.unregister(sock.fileno())
        return chunk
    
    def read_all(sock):
        response = []
        chunk = yield from read(sock)
        while chunk:
            response.append(chunk)
            chunk = yield from read(sock)
        return b''.join(response)
    
    class Future:
        def __init__(self):
            self.result = None
            self._callbacks = []
    
        def add_done_callback(self, fn):
            self._callbacks.append(fn)
    
        def set_result(self, result):
            self.result = result
            for fn in self._callbacks:
                fn(self)
    
        def __iter__(self):
            yield self
            return self.result
    
    class Crawler:
        def __init__(self, url):
            self.url = url
            self.response = b''
    
        def fetch(self):
            global stopped
            sock = socket.socket()
            yield from connect(sock, ('example.com', 80))
            get = 'GET {0} HTTP/1.0
    Host: example.com
    
    '.format(self.url)
            sock.send(get.encode('ascii'))
            self.response = yield from read_all(sock)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True
    
    class Task:
        def __init__(self, coro):
            self.coro = coro
            f = Future()
            f.set_result(None)
            self.step(f)
    
        def step(self, future):
            try:
                # send会进入到coro执行, 即fetch, 直到下次yield
                # next_future 为yield返回的对象
                next_future = self.coro.send(future.result)
            except StopIteration:
                return
            next_future.add_done_callback(self.step)
    
    # 事件循环
    def loop():
        while not stopped:
            # 阻塞, 直到一个事件发生
            events = selector.select()
            for event_key, event_mask in events:
                callback = event_key.data
                callback()
    
    if __name__ == '__main__':
        import time
        start = time.time()
        for url in urls_todo:
            crawler = Crawler(url)
            Task(crawler.fetch())
        loop()
        print(time.time() - start)
    
    # 0.53s
    

      

    六、asyncio和原生协程初体验

    asyncio是Python 3.4 试验性引入的异步I/O框架(PEP 3156),提供了基于协程做异步I/O编写单线程并发代码的基础设施。

    其核心组件有事件循环(Event Loop)、协程(Coroutine)、任务(Task)、未来对象(Future)以及其他一些扩充和辅助性质的模块。

    在引入asyncio的时候,还提供了一个装饰器@asyncio.coroutine用于装饰使用了yield from的函数,以标记其为协程。但并不强制使用这个装饰器。

    在 3.5 中新增了async/await语法(PEP 492),对协程有了明确而显式的支持,称之为原生协程

    async/await 和 yield from这两种风格的协程底层复用共同的实现,而且相互兼容。

    在Python 3.6 中asyncio库“转正”,不再是实验性质的,成为标准库的正式一员。

    #!/usr/bin/python3.5
    # -*- coding:utf-8 -*-
    
    import asyncio
    import aiohttp
    
    host = 'http://example.com'
    urls_todo = {'/', '/1', '/2', '/3', '/4', '/5', '/6', '/7', '/8', '/9'}
    
    loop = asyncio.get_event_loop()
    
    async def fetch(url):
        async with aiohttp.ClientSession(loop=loop) as session:
            async with session.get(url) as response:
                response = await response.read()
                return response
    
    
    if __name__ == '__main__':
        import time
        start = time.time()
        tasks = [fetch(host + url) for url in urls_todo]
        loop.run_until_complete(asyncio.gather(*tasks))
        print(time.time() - start)
    
    # 0.54s
    

    2019-06-26补充demo示例

     1 import time
     2 import asyncio
     3 import requests
     4 
     5 urls = [
     6     'http://httpbin.org/get',
     7     'http://httpbin.org/ip',
     8     'http://httpbin.org/json',
     9     'http://httpbin.org/uuid',
    10     'http://httpbin.org/user-agent',
    11     'http://httpbin.org/headers',
    12     'http://httpbin.org/response-headers',
    13 ]
    14 
    15 def get_result(url):
    16     d = requests.get(url)
    17     dd = d.json()
    18     return dd
    19 
    20 start = time.time()
    21 
    22 results = []
    23 for url in urls:
    24     d = get_result(url)
    25     results.append(d)
    26 
    27 print('RUN : {}'.format(time.time()-start))
    28 print(results)

    耗时:RUN : 6.703306198120117

     1 import time
     2 import asyncio
     3 import requests
     4 
     5 urls = [
     6     'http://httpbin.org/get',
     7     'http://httpbin.org/ip',
     8     'http://httpbin.org/json',
     9     'http://httpbin.org/uuid',
    10     'http://httpbin.org/user-agent',
    11     'http://httpbin.org/headers',
    12     'http://httpbin.org/response-headers',
    13 ]
    14 
    15 def myfunc(url):
    16     d = requests.get(url)
    17     dd = d.json()
    18     return dd
    19 
    20 @asyncio.coroutine
    21 def fetch_async(func, url):
    22     loop = asyncio.get_event_loop()
    23     future = loop.run_in_executor(None, func, url)
    24     data = yield from future
    25     return data
    26 
    27 start = time.time()
    28 loop = asyncio.get_event_loop()
    29 tasks = [fetch_async(myfunc, url) for url in urls]
    30 results = loop.run_until_complete(asyncio.gather(*tasks))
    31 loop.close()
    32 
    33 print('RUN : {}'.format(time.time()-start))
    34 print(results)

    耗时:RUN : 1.0276665687561035

    补充说明

    run_in_executor(self, executor, func, *args) 第一个参数是传入一个executor(即concurrent.futures.ThreadPoolExecutor,线程池对象),
    不传的话,默认使用 (os.cpu_count() or 1) * 5 这个数值,即如果是4核的cpu,就会对应生成一个含有20线程的线程池,来执行传入的第二个函数func.
    所以run_in_executor其实开启了新的线程,再协调各个线程

      

    作者:Standby一生热爱名山大川、草原沙漠,还有妹子
    出处:http://www.cnblogs.com/standby/

    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

  • 相关阅读:
    ^_^【CSS代码规范】规则顺序
    【html】三
    【代码组织】♣一
    LINUX nautilus 命令
    hadoop 统计一个目录的文件大小
    hadoop基本配置信息
    linux中用到的命令
    简单的hadoop配置(我安装的问题)
    hadoop不能用root用户启动,会报错
    linux 下的ps与jps
  • 原文地址:https://www.cnblogs.com/standby/p/7783415.html
Copyright © 2011-2022 走看看