CPython 解释器本身就不是线程安全的,因此有全局解释器锁(GIL),一次只允许使用一个线程执行 Python 字节码。因此,一个 Python 进程通常不能同时使用多个 CPU 核心。然而,标准库中所有执行阻塞型 I/O 操作的函数,在等待操作系统返回结果时都会释放GIL。这意味着在 Python 语言这个层次上可以使用多线程,而 I/O 密集型 Python 程序能从中受益:一个 Python 线程等待网络响应时,阻塞型 I/O 函数会释放 GIL,再运行一个线程。asyncio这个包使用事件循环驱动的协程实现并发。 asyncio 大量使用 yield from 表达式,因此与Python 旧版不兼容。
asyncio 包使用的“协程”是较严格的定义。适合asyncio API 的协程在定义体中必须使用 yield from,而不能使用 yield。此外,适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用;
先看2个例子:
import threading import asyncio @asyncio.coroutine def hello(): print('Start Hello', threading.currentThread()) yield from asyncio.sleep(5) print('End Hello', threading.currentThread()) @asyncio.coroutine def world(): print('Start World', threading.currentThread()) yield from asyncio.sleep(3) print('End World', threading.currentThread()) # 获取EventLoop: loop = asyncio.get_event_loop() tasks = [hello(), world()] # 执行coroutine loop.run_until_complete(asyncio.wait(tasks)) loop.close()
@asyncio.coroutine把生成器函数标记为协程类型。
asyncio.sleep(3) 创建一个3秒后完成的协程。
loop.run_until_complete(future),运行直到future完成;如果参数是 coroutine object,则需要使用 ensure_future()函数包装。
loop.close() 关闭事件循环
import asyncio @asyncio.coroutine def worker(text): """ 协程运行的函数 :param text::return: """ i = 0 while True: print(text, i) try: yield from asyncio.sleep(.1) except asyncio.CancelledError: break i += 1 @asyncio.coroutine def client(text, io_used): work_fu = asyncio.ensure_future(worker(text)) # 假装等待I/O一段时间 yield from asyncio.sleep(io_used) # 结束运行协程 work_fu.cancel() return 'done' loop = asyncio.get_event_loop() tasks = [client('xiaozhe', 3), client('zzz', 5)] result = loop.run_until_complete(asyncio.wait(tasks)) loop.close() print('Answer:', result)
asyncio.ensure_future(coro_or_future, *, loop=None):计划安排一个 coroutine object的执行,返回一个 asyncio.Task object。
worker_fu.cancel(): 取消一个协程的执行,抛出CancelledError异常。
asyncio.wait():协程的参数是一个由期物或协程构成的可迭代对象; wait 会分别把各个协程包装进一个 Task 对象。
async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换。
1. 把@asyncio.coroutine替换为async
2. 把yield from替换为await
@asyncio.coroutine def hello(): print("Hello world!") r = yield from asyncio.sleep(1) print("Hello again!")
等价于
async def hello(): print("Hello world!") r = await asyncio.sleep(1) print("Hello again!")
asyncio可以实现单线程并发IO操作。如果仅用在客户端,发挥的威力不大。如果把asyncio用在服务器端,例如Web服务器,由于HTTP连接就是IO操作,因此可以用单线程+coroutine实现多用户的高并发支持。
asyncio实现了TCP、UDP、SSL等协议,aiohttp则是基于asyncio实现的HTTP框架
客户端:
import aiohttp import asyncio import async_timeout async def fetch(session, url): async with async_timeout.timeout(10): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'http://python.org') print(html) loop = asyncio.get_event_loop() loop.run_until_complete(main())
服务端:
from aiohttp import web async def handle(request): name = request.match_info.get('name', "Anonymous") text = "Hello, " + name return web.Response(text=text) app = web.Application() app.router.add_get('/', handle) app.router.add_get('/{name}', handle) web.run_app(app)
运行结果:
爬取当当畅销书的图书信息的代码如下:
'''异步方式爬取当当畅销书的图书信息''' import os import time import aiohttp import asyncio import pandas as pd from bs4 import BeautifulSoup # table表格用于储存书本信息 table = [] # 获取网页(文本信息) async def fetch(session, url): async with session.get(url) as response: return await response.text(encoding='gb18030') # 解析网页 async def parser(html): # 利用BeautifulSoup将获取到的文本解析成HTML soup = BeautifulSoup(html, 'lxml') # 获取网页中的畅销书信息 book_list = soup.find('ul', class_='bang_list clearfix bang_list_mode')('li') for book in book_list: info = book.find_all('div') # 获取每本畅销书的排名,名称,评论数,作者,出版社 rank = info[0].text[0:-1] name = info[2].text comments = info[3].text.split('条')[0] author = info[4].text date_and_publisher= info[5].text.split() publisher = date_and_publisher[1] if len(date_and_publisher)>=2 else '' # 将每本畅销书的上述信息加入到table中 table.append([rank, name, comments, author, publisher]) # 处理网页 async def download(url): async with aiohttp.ClientSession() as session: html = await fetch(session, url) await parser(html) # 全部网页 urls = ['http://bang.dangdang.com/books/bestsellers/01.00.00.00.00.00-recent7-0-0-1-%d'%i for i in range(1,26)] # 统计该爬虫的消耗时间 print('#' * 50) t1 = time.time() # 开始时间 # 利用asyncio模块进行异步IO处理 loop = asyncio.get_event_loop() tasks = [asyncio.ensure_future(download(url)) for url in urls] tasks = asyncio.gather(*tasks) loop.run_until_complete(tasks) # 将table转化为pandas中的DataFrame并保存为CSV格式的文件 df = pd.DataFrame(table, columns=['rank', 'name', 'comments', 'author', 'publisher']) df.to_csv('dangdang.csv', index=False) t2 = time.time() # 结束时间 print('使用aiohttp,总共耗时:%s' % (t2 - t1)) print('#' * 50)