zoukankan      html  css  js  c++  java
  • python中同步、多线程、异步IO、多线程对IO密集型的影响

     1、常见并发类型

    I/ O密集型:

    蓝色框表示程序执行工作的时间,红色框表示等待I/O操作完成的时间。此图没有按比例显示,因为internet上的请求可能比CPU指令要多花费几个数量级的时间,所以你的程序可能会花费大部分时间进行等待。

     CPU密集型:

    IO密集型程序将时间花在cpu计算上。

    常见并发类型以及区别:

     2、同步版本

     我们将使用requests访问100个网页,使用同步的方式,requests的请求是同步的,所以代码就很好写了。

    同步的版本代码逻辑简单,编写也会很相对容易。

    import requests
    import time
    def download_site(url,session):
        with session.get(url) as response:
            print(len(response.content))
    
    def download_all_site(sites):
        with requests.Session() as session:
            for url in sites:
                download_site(url,session)
    
    if __name__ =="__main__":
        sites = ["https://www.baidu.com","https://www.jython.org"] * 50
        start_time = time.time()
        download_all_site(sites)
        end_time = time.time()
        print("执行时间:%s" % (end_time - start_time) + "")
    #download_site()只从一个URL下载内容并打印其大小
    #需要知道的是我们这里没有使用requests.get(),而使用了session.get(),我们使用requests.Session()创建了一个Session对象,每次请求使用了session.get(url,因为可以让requests运用一些神奇的网络小技巧,从而真正使程序加速。
    #执行时间:33.91123294830322秒

     3、多线程

     ThreadPoolExecutor,: ThreadPoolExecutor =Thread+Pool+ Executor。

    你已经了解了Thread部分。那只是我们之前提到的一个思路。Pool部分是开始变得有趣的地方。这个对象将创建一个线程池,其中的每个线程都可以并发运行。最后,Executor是控制线程池中的每个线程如何以及何时运行的部分。它将在线程池中执行请求。

    对我们很有帮助的是,标准库将ThreadPoolExecutor实现为一个上下文管理器,因此你可以使用with语法来管理Threads池的创建和释放。

    一旦有了ThreadPoolExecutor,你就可以使用它方便的.map()方法。此方法在列表中的每个站点上运行传入函数。最重要的是,它使用自己管理的线程池自动并发地运行它们。

    来自其他语言,甚至Python 2的人可能想知道,在处理threading时,管理你习惯的细节的常用对象和函数在哪里,比如Thread.start()、Thread.join()和Queue。

    这些都还在那里,你可以使用它们来实现对线程运行方式的精细控制。但是,从Python 3.2开始,标准库添加了一个更高级别的抽象,称为Executor,如果你不需要精细控制,它可以为你管理许多细节。

    本例中另一个有趣的更改是,每个线程都需要创建自己的request . Session()对象。当你查看requests的文档时,不一定就能很容易地看出,但在阅读这个问题(https://github.com/requests/requests/issues/2766  )时,你会清晰地发现每个线程都需要一个单独的Session。

    这是threading中有趣且困难的问题之一。因为操作系统可以控制任务何时中断,何时启动另一个任务,所以线程之间共享的任何数据都需要被保护起来,或者说是线程安全的。不幸的是,requests . Session()不是线程安全的。

    根据数据是什么以及如何你使用它们,有几种策略可以使数据访问变成线程安全的。其中之一是使用线程安全的数据结构,比如来自 Python的queue模块的Queue。

    这些对象使用低级基本数据类型,比如threading.Lock,以确保只有一个线程可以同时访问代码块或内存块。你可以通过ThreadPoolExecutor对象间接地使用此策略。

    import requests
    import concurrent.futures
    import threading
    import time
    
    #创建线程池
    thread_local= threading.local()
    
    def get_session():
        if not getattr(thread_local,"session",None):
            thread_local.session = requests.Session()
        return thread_local.session
    
    def download_site(url):
        session = get_session()
        with session.get(url) as response:
            print(len(response.content))
    
    def download_all_site(sites):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exector:
            exector.map(download_site,sites)
    
    if __name__ =="__main__":
        sites = ["https://www.baidu.com","https://www.jython.org"] * 50
        start_time = time.time()
        download_all_site(sites)
        end_time = time.time()
        print("执行时间:%s" % (end_time - start_time) + "")
    #执行时间:6.152076244354248秒

    这里要使用的另一种策略是线程本地存储。Threading.local()会创建一个对象,它看起来像一个全局对象但又是特定于每个线程的。在我们的示例中,这是通过threadLocal和get_session()完成的:

    ThreadLocal是threading模块中专门用来解决这个问题的。它看起来有点奇怪,但是你只想创建其中一个对象,而不是为每个线程创建一个对象。对象本身将负责从不同的线程到不同的数据的分开访问。

    当get_session()被调用时,它所查找的session是特定于它所运行的线程的。因此,每个线程都将在第一次调用get_session()时创建一个单个的会话,然后在整个生命周期中对每个后续调用使用该会话。

    最后,简要介绍一下选择线程的数量。你可以看到示例代码使用了5个线程。随意改变这个数字,看看总时间是如何变化的。你可能认为每次下载只有一个线程是最快的,但至少在我的系统上不是这样。我在5到10个线程之间找到了最快的结果。如果超过这个值,那么创建和销毁线程的额外开销就会抵消程序节省的时间。

    这里比较困难的答案是,从一个任务到另一个任务的正确线程数不是一个常量。需要进行一些实验来得到。

     注意:request . Session()不是线程安全的。这意味着,如果多个线程使用同一个Session,那么在某些地方可能会发生上面描述的交互类型问题。

    多线程代码的执行时序表:

     

     4、异步IO

    asyncio的一般概念是一个单个的Python对象,称为事件循环,它控制每个任务如何以及何时运行。事件循环会关注每个任务并知道它处于什么状态。在实际中,任务可以处于许多状态,但现在我们假设一个简化的只有两种状态的事件循环。

    就绪状态将表明一个任务有工作要做,并且已经准备好运行,而等待状态意味着该任务正在等待一些外部工作完成,例如网络操作。

    我们简化的事件循环维护两个任务列表,每一个对应这些状态。它会选择一个就绪的任务,然后重新启动它。该任务处于完全控制之中,直到它配合地将控制权交还给事件循环为止。

    当正在运行的任务将控制权交还给事件循环时,事件循环将该任务放入就绪或等待列表中,然后遍历等待列表中的每个任务,以查看I/O操作完成后某个任务是否已经就绪。时间循环知道就绪列表中的任务仍然是就绪的,因为它知道它们还没有运行。

    一旦所有的任务都重新排序到正确的列表中,事件循环将选择下一个要运行的任务,然后重复这个过程。我们简化的事件循环会选择等待时间最长的任务并运行该任务。此过程会一直重复,直到事件循环结束。

    asyncio的一个重要之处在于,如果没有刻意去释放控制权,任务是永远不会放弃控制权的。它们在操作过程中从不会被打断。这使得我们在asyncio中比在threading中能更容易地共享资源。你不必担心代码是否是线程安全的。

    import time
    import asyncio
    from aiohttp import ClientSession
    async def download_site(session,url):
        global i
        try:
            async with session.get(url) as response:
                i=i+1
                print(i)
                return await response.read()
        except Exception as e:
             pass
    async def download_all_site(sites):
        async with ClientSession() as session:
            tasks = []
            for url in sites:
                task = asyncio.create_task(download_site(session,url))
                tasks.append(task)
            result = await asyncio.gather(*tasks) #等待一组协程运行结束并接收结果
            print(result)
    
    
    if __name__ =="__main__":
        i=0
        sites = ["http://www.360kuai.com/","https://www.jython.org"] * 50
        start_time = time.time()
        asyncio.run(download_all_site(sites))
        end_time = time.time()
        print("执行时间:%s" % (end_time - start_time) + "")

    #执行时间:5.29184889793396秒

    异步IO的执行时序表:

    asyncio版本的问题

    此时asyncio有两个问题。你需要特殊的异步版本的库来充分利用asycio。如果你只是使用requests下载站点,那么速度会慢得多,因为requests的设计目的不是通知事件循环它被阻塞了。随着时间的推移,这个问题变得微不足道,因为越来越多的库包含了asyncio。

    另一个更微妙的问题是,如果其中一个任务不合作,那么协作多任务处理的所有优势都将不存在。代码中的一个小错误可能会导致任务运行超时并长时间占用处理器,使需要运行的其他任务无法运行。如果一个任务没有将控制权交还给事件循环,则事件循环无法中断它。

    考虑到这一点,我们来开始讨论一种完全不同的并发性——multiprocessing。

  • 相关阅读:
    通过注册表读取设置字体
    StretchBlt
    PatBlt
    如何用MaskBlt实现两个位图的合并,从而实现背景透明
    输出旋转字体
    用字体开透明窟窿
    输出空心字体
    光滑字体
    画贝塞尔曲线
    一些点运算函数
  • 原文地址:https://www.cnblogs.com/-wenli/p/11200258.html
Copyright © 2011-2022 走看看