1、常见并发类型
I/ O密集型:
蓝色框表示程序执行工作的时间,红色框表示等待I/O操作完成的时间。此图没有按比例显示,因为internet上的请求可能比CPU指令要多花费几个数量级的时间,所以你的程序可能会花费大部分时间进行等待。
CPU密集型:
IO密集型程序将时间花在cpu计算上。
常见并发类型以及区别:
2、同步版本
我们将使用requests访问100个网页,使用同步的方式,requests的请求是同步的,所以代码就很好写了。
同步的版本代码逻辑简单,编写也会很相对容易。
import requests import time def download_site(url,session): with session.get(url) as response: print(len(response.content)) def download_all_site(sites): with requests.Session() as session: for url in sites: download_site(url,session) if __name__ =="__main__": sites = ["https://www.baidu.com","https://www.jython.org"] * 50 start_time = time.time() download_all_site(sites) end_time = time.time() print("执行时间:%s" % (end_time - start_time) + "秒") #download_site()只从一个URL下载内容并打印其大小 #需要知道的是我们这里没有使用requests.get(),而使用了session.get(),我们使用requests.Session()创建了一个Session对象,每次请求使用了session.get(url,因为可以让requests运用一些神奇的网络小技巧,从而真正使程序加速。 #执行时间:33.91123294830322秒
3、多线程
ThreadPoolExecutor,: ThreadPoolExecutor =Thread+Pool+ Executor。
你已经了解了Thread部分。那只是我们之前提到的一个思路。Pool部分是开始变得有趣的地方。这个对象将创建一个线程池,其中的每个线程都可以并发运行。最后,Executor是控制线程池中的每个线程如何以及何时运行的部分。它将在线程池中执行请求。
对我们很有帮助的是,标准库将ThreadPoolExecutor实现为一个上下文管理器,因此你可以使用with语法来管理Threads池的创建和释放。
一旦有了ThreadPoolExecutor,你就可以使用它方便的.map()方法。此方法在列表中的每个站点上运行传入函数。最重要的是,它使用自己管理的线程池自动并发地运行它们。
来自其他语言,甚至Python 2的人可能想知道,在处理threading时,管理你习惯的细节的常用对象和函数在哪里,比如Thread.start()、Thread.join()和Queue。
这些都还在那里,你可以使用它们来实现对线程运行方式的精细控制。但是,从Python 3.2开始,标准库添加了一个更高级别的抽象,称为Executor,如果你不需要精细控制,它可以为你管理许多细节。
本例中另一个有趣的更改是,每个线程都需要创建自己的request . Session()对象。当你查看requests的文档时,不一定就能很容易地看出,但在阅读这个问题(https://github.com/requests/requests/issues/2766 )时,你会清晰地发现每个线程都需要一个单独的Session。
这是threading中有趣且困难的问题之一。因为操作系统可以控制任务何时中断,何时启动另一个任务,所以线程之间共享的任何数据都需要被保护起来,或者说是线程安全的。不幸的是,requests . Session()不是线程安全的。
根据数据是什么以及如何你使用它们,有几种策略可以使数据访问变成线程安全的。其中之一是使用线程安全的数据结构,比如来自 Python的queue模块的Queue。
这些对象使用低级基本数据类型,比如threading.Lock,以确保只有一个线程可以同时访问代码块或内存块。你可以通过ThreadPoolExecutor对象间接地使用此策略。
import requests import concurrent.futures import threading import time #创建线程池 thread_local= threading.local() def get_session(): if not getattr(thread_local,"session",None): thread_local.session = requests.Session() return thread_local.session def download_site(url): session = get_session() with session.get(url) as response: print(len(response.content)) def download_all_site(sites): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exector: exector.map(download_site,sites) if __name__ =="__main__": sites = ["https://www.baidu.com","https://www.jython.org"] * 50 start_time = time.time() download_all_site(sites) end_time = time.time() print("执行时间:%s" % (end_time - start_time) + "秒") #执行时间:6.152076244354248秒
这里要使用的另一种策略是线程本地存储。Threading.local()会创建一个对象,它看起来像一个全局对象但又是特定于每个线程的。在我们的示例中,这是通过threadLocal和get_session()完成的:
ThreadLocal是threading模块中专门用来解决这个问题的。它看起来有点奇怪,但是你只想创建其中一个对象,而不是为每个线程创建一个对象。对象本身将负责从不同的线程到不同的数据的分开访问。
当get_session()被调用时,它所查找的session是特定于它所运行的线程的。因此,每个线程都将在第一次调用get_session()时创建一个单个的会话,然后在整个生命周期中对每个后续调用使用该会话。
最后,简要介绍一下选择线程的数量。你可以看到示例代码使用了5个线程。随意改变这个数字,看看总时间是如何变化的。你可能认为每次下载只有一个线程是最快的,但至少在我的系统上不是这样。我在5到10个线程之间找到了最快的结果。如果超过这个值,那么创建和销毁线程的额外开销就会抵消程序节省的时间。
这里比较困难的答案是,从一个任务到另一个任务的正确线程数不是一个常量。需要进行一些实验来得到。
注意:request . Session()不是线程安全的。这意味着,如果多个线程使用同一个Session,那么在某些地方可能会发生上面描述的交互类型问题。
多线程代码的执行时序表:
4、异步IO
asyncio的一般概念是一个单个的Python对象,称为事件循环,它控制每个任务如何以及何时运行。事件循环会关注每个任务并知道它处于什么状态。在实际中,任务可以处于许多状态,但现在我们假设一个简化的只有两种状态的事件循环。
就绪状态将表明一个任务有工作要做,并且已经准备好运行,而等待状态意味着该任务正在等待一些外部工作完成,例如网络操作。
我们简化的事件循环维护两个任务列表,每一个对应这些状态。它会选择一个就绪的任务,然后重新启动它。该任务处于完全控制之中,直到它配合地将控制权交还给事件循环为止。
当正在运行的任务将控制权交还给事件循环时,事件循环将该任务放入就绪或等待列表中,然后遍历等待列表中的每个任务,以查看I/O操作完成后某个任务是否已经就绪。时间循环知道就绪列表中的任务仍然是就绪的,因为它知道它们还没有运行。
一旦所有的任务都重新排序到正确的列表中,事件循环将选择下一个要运行的任务,然后重复这个过程。我们简化的事件循环会选择等待时间最长的任务并运行该任务。此过程会一直重复,直到事件循环结束。
asyncio的一个重要之处在于,如果没有刻意去释放控制权,任务是永远不会放弃控制权的。它们在操作过程中从不会被打断。这使得我们在asyncio中比在threading中能更容易地共享资源。你不必担心代码是否是线程安全的。
import time import asyncio from aiohttp import ClientSession async def download_site(session,url): global i try: async with session.get(url) as response: i=i+1 print(i) return await response.read() except Exception as e: pass async def download_all_site(sites): async with ClientSession() as session: tasks = [] for url in sites: task = asyncio.create_task(download_site(session,url)) tasks.append(task) result = await asyncio.gather(*tasks) #等待一组协程运行结束并接收结果 print(result) if __name__ =="__main__": i=0 sites = ["http://www.360kuai.com/","https://www.jython.org"] * 50 start_time = time.time() asyncio.run(download_all_site(sites)) end_time = time.time() print("执行时间:%s" % (end_time - start_time) + "秒")
#执行时间:5.29184889793396秒
异步IO的执行时序表:
asyncio版本的问题
此时asyncio有两个问题。你需要特殊的异步版本的库来充分利用asycio。如果你只是使用requests下载站点,那么速度会慢得多,因为requests的设计目的不是通知事件循环它被阻塞了。随着时间的推移,这个问题变得微不足道,因为越来越多的库包含了asyncio。
另一个更微妙的问题是,如果其中一个任务不合作,那么协作多任务处理的所有优势都将不存在。代码中的一个小错误可能会导致任务运行超时并长时间占用处理器,使需要运行的其他任务无法运行。如果一个任务没有将控制权交还给事件循环,则事件循环无法中断它。
考虑到这一点,我们来开始讨论一种完全不同的并发性——multiprocessing。