zoukankan      html  css  js  c++  java
  • Python中syncio和aiohttp

    CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程。asyncio这个包使用事件循环驱动的协程实现并发。 asyncio 大量使用 yield from 表达式,因此与Python 旧版不兼容。
    asyncio 包使用的“协程”是较严格的定义。适合asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield。此外,适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用;

    先看2个例子:

    import threading
    import asyncio
     
    @asyncio.coroutine
    def hello():
        print('Start Hello', threading.currentThread())
        yield from asyncio.sleep(5)
        print('End Hello', threading.currentThread())
     
    @asyncio.coroutine
    def world():
        print('Start World', threading.currentThread())
        yield from asyncio.sleep(3)
        print('End World', threading.currentThread())
     
    # 获取EventLoop:
    loop = asyncio.get_event_loop()
    tasks = [hello(), world()]
    # 执行coroutine
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

    @asyncio.coroutine把生成器函数标记为协程类型。
    asyncio.sleep(3) 创建一个3秒后完成的协程。
    loop.run_until_complete(future),运行直到future完成;如果参数是 coroutine object,则需要使用 ensure_future()函数包装。
    loop.close() 关闭事件循环

    import asyncio
     
    @asyncio.coroutine
    def worker(text):
        """ 协程运行的函数 :param text::return: """
        i = 0
        while True:
            print(text, i)
            try:
                yield from asyncio.sleep(.1)
            except asyncio.CancelledError:
                break
            i += 1
     
    @asyncio.coroutine
    def client(text, io_used):
        work_fu = asyncio.ensure_future(worker(text))
        # 假装等待I/O一段时间
        yield from asyncio.sleep(io_used)
        # 结束运行协程
        work_fu.cancel()
        return 'done'
     
    loop = asyncio.get_event_loop()
    tasks = [client('xiaozhe', 3), client('zzz', 5)]
    result = loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    print('Answer:', result)

    asyncio.ensure_future(coro_or_future, *, loop=None):计划安排一个 coroutine object的执行,返回一个 asyncio.Task object。
    worker_fu.cancel(): 取消一个协程的执行,抛出CancelledError异常。
    asyncio.wait():协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别把各个协程包装进一个 Task 对象。

    async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
    1. 把@asyncio.coroutine替换为async
    2. 把yield from替换为await

    @asyncio.coroutine
    def hello():
      print("Hello world!")
      r = yield from asyncio.sleep(1)
      print("Hello again!")

    等价于

    async def hello():
      print("Hello world!")
      r = await asyncio.sleep(1)
      print("Hello again!")  

    asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。

    asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架

    客户端:

    import aiohttp
    import asyncio
    import async_timeout
     
     
    async def fetch(session, url):
        async with async_timeout.timeout(10):
            async with session.get(url) as response:
                return await response.text()
     
     
    async def main():
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, 'http://python.org')
            print(html)
     
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    服务端:

    from aiohttp import web
     
    async def handle(request):
        name = request.match_info.get('name', "Anonymous")
        text = "Hello, " + name
        return web.Response(text=text)
     
    app = web.Application()
    app.router.add_get('/', handle)
    app.router.add_get('/{name}', handle)
     
    web.run_app(app)

    运行结果:

     

    爬取当当畅销书的图书信息的代码如下:

    '''异步方式爬取当当畅销书的图书信息'''
    import  os
    import time
    import aiohttp
    import asyncio
    import pandas as pd
    from bs4 import BeautifulSoup
    
    # table表格用于储存书本信息
    table = []
    
    
    # 获取网页(文本信息)
    async def fetch(session, url):
        async with session.get(url) as response:
            return await response.text(encoding='gb18030')
    
    
    # 解析网页
    async def parser(html):
        # 利用BeautifulSoup将获取到的文本解析成HTML
        soup = BeautifulSoup(html, 'lxml')
        # 获取网页中的畅销书信息
        book_list = soup.find('ul', class_='bang_list clearfix bang_list_mode')('li')
        for book in book_list:
            info = book.find_all('div')
            # 获取每本畅销书的排名,名称,评论数,作者,出版社
            rank = info[0].text[0:-1]
            name = info[2].text
            comments = info[3].text.split('')[0]
            author = info[4].text
            date_and_publisher= info[5].text.split()
            publisher = date_and_publisher[1] if len(date_and_publisher)>=2 else ''
            # 将每本畅销书的上述信息加入到table中
            table.append([rank, name, comments, author, publisher])
    
    
    # 处理网页
    async def download(url):
        async with aiohttp.ClientSession() as session:
            html = await fetch(session, url)
            await parser(html)
    
    # 全部网页
    urls = ['http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-%d'%i for i in range(1,26)]
    # 统计该爬虫的消耗时间
    print('#' * 50)
    t1 = time.time() # 开始时间
    
    # 利用asyncio模块进行异步IO处理
    loop = asyncio.get_event_loop()
    tasks = [asyncio.ensure_future(download(url)) for url in urls]
    tasks = asyncio.gather(*tasks)
    loop.run_until_complete(tasks)
    
    # 将table转化为pandas中的DataFrame并保存为CSV格式的文件
    df = pd.DataFrame(table, columns=['rank', 'name', 'comments', 'author', 'publisher'])
    df.to_csv('dangdang.csv', index=False)
    
    t2 = time.time() # 结束时间
    print('使用aiohttp,总共耗时:%s' % (t2 - t1))
    print('#' * 50)
  • 相关阅读:
    jekyll简单使用
    三、ansible简要使用
    四、ansible主机组定义
    项目中远程连接404 NOT FOUND问题的原因以及解决办法(这里只涉及我遇到的问题)
    AS3中的位操作
    AS3中is和as操作符的区别
    static 函数和普通函数的区别
    [译] SystemTap
    2017-09-17 python 学习笔记
    xargs 命令使用小记
  • 原文地址:https://www.cnblogs.com/majiang/p/9877504.html
Copyright © 2011-2022 走看看