zoukankan      html  css  js  c++  java
  • python线程、进程、协程详解

    python线程、进程、协程

    python的GIL

    GIL简介

    python的GIL的全程是global interpreterer lock(全局解释器锁)

    在cpython中,python的一个线程对应c语言的一个线程,早期一些历史原因,GIL使得在一个进程中的一个时间点上只有一个线程在执行python编译的字节码。这就意味着一个线程中无法让多个线程映射到多个cpu上,不能在一个线程内实现并行。

    GIL的释放

    示例:

    import threading
    
    a = 0
    
    
    def add():
        global a
        for i in range(1000000):
            a += 1
    
    
    def sub():
        global a
        for i in range(1000000):
            a -= 1
    
    
    t1 = threading.Thread(target=add)
    t2 = threading.Thread(target=sub)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print(a)
    

    上面这段代码开启两个线程,一个对a进行加1的操作,执行1000000次,另一个对a进行减1的操作,执行1000000次,然后等两个线程执行完毕,打印a的值。

    按照正常思维,打印的结果应该是0,但我们每次运行发现,结果都是不一样的,有些远远大于0,有些远远小于0。

    上面说到,python因为GIL的存在,在一个进程中的某一时刻只有一个线程在运行python编译的字节码。比如拿上面的例子来说,可能t1线程计算到a=99,已经确定了下次计算的结果为100,但还未来的及给a赋值为100的时候,python切换到了t2线程。假设t2线程执行了10次,计算了a的结果为89,同上,已经确定了下次计算的值为90,但还未来的及给a赋值为90的之后,线程切换到了t1,继续上次线程未完成的操作,将a赋值给了100。这样就造成了t2线程执行的那10次的for循环"失效"了。如此往复,只要for循环的次数足够大,"失效"的次数就会多起来,最后得到的值也就变差很大了。

    从上面的例子可以看出,python的GIL会在适当的时候切换线程,它并不会等到一个线程完全执行完毕才会释放。实际上GIL会根据线程执行的任务的字节码的长度和时间片来随机释放,或者GIL在遇到IO阻塞的时候也会释放。

    多线程

    操作系统能够调度的最小单元是线程,无论是什么并发模型,底层"干活"的一定是线程

    如果没有共享资源,不同的线程之间是不会相互影响的,线程必须依赖进程存在,在进程结束时,子线程全部退出。一个进程中一定有一个主线程。

    线程实例

    import time
    
    
    def test():
        print(666)
        time.sleep(3)
        print(777)
    
    
    if __name__ == '__main__':
        import threading
    
        t1 = threading.Thread(target=test)
        t1.start()
    
        print("end")
    

    运行上面的代码:

    666
    end
    777
    

    上面的例子中主线程中开启了子线程,子线程执行test函数,最后主线程在即将退出的时候打印出"end"。从最后的结果可以看出,当主线程结束的时候,子线程并不会受到影响,在3秒后打印出"777",子线程结束完毕之后,整个程序结束。

    守护线程

    有时候我们希望当主线程结束的时候,子线程全部都要kill掉,这时候就要使用Thread提供的一个方法setDaemon 。

    import time
    
    
    def test():
        print(666)
        time.sleep(3)
        print(777)
    
    
    if __name__ == '__main__':
        import threading
    
        t1 = threading.Thread(target=test)
        t1.setDaemon(True)
        t1.start()
    
        print("end")
    

    还是上面的例子,只是在t1线程开始之前调用它的一个setDaemon方法。这时候当前的t1线程就被成为守护线程。当主线程结束时,t1也随之结束。

    运行结果:

    666
    end
    

    阻塞等待

    既然有上面的守护线程,有时候就想让所有的子线程全部执行完毕,再去执行主线程中操作

    import time
    
    
    def test():
        print(666)
        time.sleep(3)
        print(777)
    
    
    if __name__ == '__main__':
        import threading
    
        t1 = threading.Thread(target=test)
        t1.start()
        t1.join()
        print("end")
    

    在子线程开始后,执行当前线程实例的join方法,主线程就会等待子线程的执行完毕

    执行结果:

    666
    777
    end
    

    线程通信

    线程间通信有多种方式,共享内存、网络、文件、数据库...

    这里简单介绍下共享内存

    共享内存就是多个线程来对一块内存就行操作,但是这样会有线程安全的隐患(参考上面GIL随机释放的例子),要想线程安全,就得对共享的内存进行加锁。并不是很推荐使用锁的方式,在共享内存中,锁的释放和开启都是有程序来主动控制,把锁的逻辑和业务的逻辑放在一起会造成可读性降低。

    使用队列来通信

    import threading
    
    
    def pop(quene):
        while True:
            data = quene.get()
            print(data)
    
    
    def insert(quene):
        for i in range(20):
            quene.put(i)
    
    
    if __name__ == '__main__':
        from queue import Queue
    
        quene = Queue(maxsize=20)
    
        t1 = threading.Thread(target=insert, args=(quene,))
        t2 = threading.Thread(target=pop, args=(quene,))
        t1.start()
        t2.start()
    

    python为我们提供了一个Quene的类, Quene实例化的时候会接受一个maxsize的参数来确定内部的队列的最大程度。

    Quene的实例是一个引用类型,put方法将一个对象传入的队列中,get方法从队列中取出一个对象。同时,put和get都是阻塞的,当队列中没有数据的时候,get阻塞,当队列满的时候,put阻塞。

    Quene是线程安全的,它的内部使用了deque, deque是一个线程安全的双向队列。

    使用Quene能够在上层减少对锁的操作,简化代码,提升代码的可读性。

    Quene中还提供了很多功能,如join:等待完成,task_down:任务完成,put_nowait:非阻塞插入队列,get_nowait:非阻塞从队列中获取....

    线程同步

    线程在原则上是不相互影响的,但在最开始的GIL释放的例子中,由于两个线程共享了全局的变量,GIL随机切换造成了线程在执行字节码的时候并不是理想的状态。

    这时候就需要两个线程间的某些代码片段同步执行,这时候就需要线程同步。

    import threading
    
    a = 0
    
    lock = threading.Lock()
    
    
    def add():
        global a
        global lock
        for i in range(1000000):
            lock.acquire()
            a += 1
            lock.release()
    
    
    def sub():
        global a
        global lock
        for i in range(1000000):
            lock.acquire()
            a -= 1
            lock.release()
    
    
    t1 = threading.Thread(target=add)
    t2 = threading.Thread(target=sub)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print(a)
    

    这是最开始的那段代码,上面我们分析到造成最终结果不为0的主要原因是在两个线程最后给a赋值的时候GIL切换了线程。

    threading库提供了锁的机制,这里我们将锁加在给a赋值的地方,这样即使当GIL切换了线程,当需要给a赋值的时候必须要拿到锁,拿到锁的线程的锁的资源没有被释放,别的线程就无法拿到锁,代码就无法继续向下执行。这样就可以在线程中实现局部同步的效果。

    运行代码,最终的结果为0

    Note

    锁的使用会影响当前程序的性能

    锁运用的不好会造成死锁,导致程序无法正常运行

    可重用锁

    threading提供了可重用锁,它可以在一个线程中当锁的资源没释放的情况下,多次去获取锁并不会阻塞,但获取锁的次数必须和释放锁的次数一致。

    import threading
    
    a = 0
    
    lock = threading.RLock()
    
    
    def add():
        global a
        global lock
        for i in range(1000000):
            lock.acquire()
    
            # 使用Lock这里会阻塞,Rlock并不会
            lock.acquire()
            lock.release()
            a += 1
            lock.release()
    
    
    def sub():
        global a
        global lock
        for i in range(1000000):
            lock.acquire()
            a -= 1
            lock.release()
    
    
    t1 = threading.Thread(target=add)
    t2 = threading.Thread(target=sub)
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print(a)
    

    Condition

    import threading
    
    lock = threading.Lock()
    
    
    def func1():
        global lock
    
        lock.acquire()
        print(1)
        lock.release()
    
        lock.acquire()
        print(3)
        lock.release()
    
    
    def func2():
        global lock
    
        lock.acquire()
        print(2)
        lock.release()
    
        lock.acquire()
        print(4)
        lock.release()
    
    
    t1 = threading.Thread(target=func1)
    t2 = threading.Thread(target=func2)
    
    t1.start()
    t2.start()
    

    上面的这段代码我们希望打印的顺序为1234,但运行的结果不为我们期望的那样。这时候使用Lock来实现线程同步就不可以了

    import threading
    
    lock = threading.Condition()
    
    
    def func1():
        global lock
    
        lock.acquire()
    
        print(1)
        lock.notify()
    
        lock.wait()
        print(3)
    
        lock.notify()
    
        lock.release()
    
    
    def func2():
        global lock
    
        lock.acquire()
        lock.wait()
        print(2)
        lock.notify()
    
        lock.wait()
        print(4)
        lock.release()
    
    
    t1 = threading.Thread(target=func1)
    t2 = threading.Thread(target=func2)
    
    t1.start()
    t2.start()
    

    将上面的Lock对象改为Condition。

    Condition实例使用之前的结束的时候要分别调用acquire和release

    Condition实例的wait方法会等待Condition实例的notify方法的调用,notify方法会通知wait不再阻塞,上面的例子反复调用wait和notify来达到线程同步的效果。

    但是代码一运行,程序只打印出1后就阻塞住了。

    Condition运行的顺序

    上面的代码中,t1线程线运行,t2后运行,在t1开始时,运行notify,这时候t2的wait还没执行到,但t1已经运行了notify,t2在阻塞的时候没有notify来通知它,所以它会一致阻塞。

    所以,在使用Condition时,应该让阻塞的线程先运行。

    控制线程的运行量

    一次开启运行多个线程,但线程量太大对操作系统是一种负担,有时候我们只希望在开始的线程中运行固定数量的线程来减少cpu的压力。这时候就可以用到Semaphore

    import threading
    import time
    
    lock = threading.Semaphore(4)
    
    
    def func1(l):
        time.sleep(3)
        print("success")
        l.release()
    
    
    for i in range(20):
        lock.acquire()
        t = threading.Thread(target=func1, args=(lock,))
        t.start()
    

    Semaphore对象接受一个最大线程数的参数,当当前进程内的活跃线程达到最大线程数时,剩下的线程就会被sleep,当活跃线程结束时,需要调用release来通知Semaphore的实例内部的计数器减1。

    运行这段代码:每个3秒就会在屏幕上打印4个success,一个会打印5次。

    线程池

    为什么需要线程池

    线程频繁的创建和销毁会消耗cpu的资源,为了让线程充分的利用,可以常见线程池来进行任务调度。

    线程池可以控制并发的数量,当线程池满时,剩下的任务就得等待线程池中有空闲的线程才可以进行调度,这样减少操作系统的压力。

    简单的的实例

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    executor = ThreadPoolExecutor(max_workers=2)
    
    
    def test():
        time.sleep(3)
        print("test done")
        return "test"
    
    
    def done_callback(response):
        print(response)
    
    
    task = executor.submit(test)
    
    task.add_done_callback(done_callback)
    
    print(task.done())  # False
    print(task.running())  # True
    print(task.result())  # test
    print(task.done())  # True
    print(task.cancel())  # False
    

    首先实例化一个线程池对象,线程池对象可以传入一个池的容量,如果不传,那么会默认为当前机器的cpu的数量。

    通过线程池对象的submit方法可以将任务传入线程池,如果线程池没满,那么会去执行这个任务,如果池满了就会去等待线程池空闲。

    submit方法是非阻塞的,它会返回一个Future的对象,可以通过Future对象的一些方法来获取当前任务的一些属性和状态。

    done: 非阻塞方法, 判断当前Future对象关联的任务是否已经完成

    running: 非阻塞方法,判断当前的Future对象关联的任务是否正在运行。

    cancel: 非阻塞方法,取消当前Future对象关联的任务。如果当前Future对象关联的任务是在“排队”,那么cancel返回True,并取消任务。如果当前Future对象关联的任务正在执行,那么将无法取消,返回False。

    result:阻塞方法,该方法会等待Future对象关联的任务执行结束,并返回任务的执行结果。

    add_done_callback:为当前的Future对象的处理结果增加回调函数。

    获取完成的任务

    上面介绍可以通过线程池的submit方法返回的Future对象来获取任务的返回结果,假设任务数目过多,我们一个个来获取,显然显得不方便。

    from concurrent.futures import ThreadPoolExecutor, as_completed
    import time
    
    executor = ThreadPoolExecutor(max_workers=2)
    
    
    def test(s_time: int):
        time.sleep(s_time)
        return f"test sleep {s_time}s"
    
    
    tasks = (executor.submit(test, i) for i in range(4))
    
    for future in as_completed(tasks):
        data = future.result()
        print(data)
    

    同as_completed方法来获取线程池中的任务的返回结果。

    这种方法谁先完成就先返回谁

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    executor = ThreadPoolExecutor(max_workers=2)
    
    
    def test(s_time: int):
        time.sleep(s_time)
        return f"test sleep {s_time}s"
    
    
    for data in executor.map(test, range(4)):
        print(data)
    

    通过executor.map来获取返回结果

    这种方式会按照传入线程池的顺序来返回。

    主线程阻塞

    from concurrent.futures import ThreadPoolExecutor, as_completed, wait, ALL_COMPLETED
    import time
    
    executor = ThreadPoolExecutor(max_workers=2)
    
    
    def test(s_time: int):
        time.sleep(s_time)
        return f"test sleep {s_time}s"
    
    
    tasks = [executor.submit(test, i) for i in range(4)]
    
    wait(tasks, return_when=ALL_COMPLETED)
    
    for future in as_completed(tasks):
        data = future.result()
        print(data)
    

    可以使用wait方法来使得程序的主线程阻塞。

    wait可以设置超时时间、指定等待某个task的完成后解除阻塞状态。

    wait的使用非常灵活,更多的可以去查看它的源码及其注释。

    多进程

    进程是操作系统资源分配的单元。一个进程的所有线程可以空享其进程的资源,但不同进程的资源是相互隔离的。

    由于python中存在GIL锁,所以多线程无法发挥多cpu的优势,但是多进程可以,每个进程中python虚拟机的资源是独立的。

    但是进程占用的资源比线程多,操作系统切换进程比切换线程要慢,所以多进程不一定比多线程快。

    多进程的简单实用

    from multiprocessing import Process
    import time
    
    
    def test(n):
        time.sleep(n)
        print("test over")
    
    
    if __name__ == '__main__':
        process = Process(target=test, args=(2,))
        process.start()
        print("main over")
    

    打印结果:

    main over
    test over
    

    上面代码可以看到,进程的使用和线程使用的接口几乎是一致的,主线程结束后,子线程并不会随之结束。

    与线程不同的是,线程可以在启动后主动退出,调用线程实例的kill方法即可。

    但在windows下,进程以及后面的进程池的使用必须放在if name == 'main'才可以。

    进程的使用很简单,更多的可以去查看multiprocessing的api

    进程池

    简单使用

    from multiprocessing import Pool
    import time
    
    
    def test(n):
        time.sleep(n)
        return f"test slept {n}s"
    
    
    if __name__ == '__main__':
        pool = Pool()
    
        task = pool.apply_async(test, args=(2,))
        pool.close()
    
        pool.join()
        print(task.get())  # get会阻塞到结果返回
        print("main over")
    

    代码很简单,看完上面的,这里的很容看懂,就不过多赘述了。

    其中注意的就是在调用join之前,一定要将进程池关闭。

    获取返回结果

    from multiprocessing import Pool
    import time
    
    
    def test(n):
        time.sleep(n)
        return f"test slept {n}s"
    
    
    if __name__ == '__main__':
        pool = Pool()
    
        for result in pool.map(test, range(3)): 
            print(result)
    
        for result in pool.imap(test, range(3)):
            print(result)
    
        for result in pool.imap_unordered(test, range(3)):
            print(result)
    

    上面介绍了3中方式来获取请求结果。他们差别在于对请求结果的排序返回,感兴趣的可以深入研究一下。

    使用ProcessPoolExecutor来创建线程池

    from concurrent.futures import ProcessPoolExecutor
    import time
    
    
    def test(n):
        time.sleep(n)
        return f"test slept {n}s"
    
    
    if __name__ == '__main__':
        executor = ProcessPoolExecutor()
    
        for data in executor.map(test, range(4)):
            print(data)
    

    ProcessPoolExecutor的接口几乎和ThreadPoolExecutor的一致,具体使用看上面叙述。

    进程间通信

    由于进程间资源是相互隔离的,所以通过一般的方式来共享全局变量的方式是不可行的

    from queue import Queue
    

    这种队列只适合线程间通信,进程间是行不通的。

    简单实例

    from multiprocessing import Queue, Process
    import time
    
    
    def test1(q):
        q.put("args")
    
    
    def test2(q):
        time.sleep(3)
        print(q.get())
    
    
    if __name__ == '__main__':
        queue = Queue()
    
        t1 = Process(target=test1, args=(queue,))
        t2 = Process(target=test2, args=(queue,))
        t1.start()
        t2.start()
    

    通过multiprocessing包提供的Queue可以完成进程间通信,Queue的api和前面的线程中的Queue使用基本相同。

    但是这个方式不能用于进程池间的通信。

    进程池通信

    from multiprocessing import Manager, Pool
    import time
    
    
    def test1(q):
        q.put("args")
    
    
    def test2(q):
        time.sleep(3)
        print(q.get())
    
    
    if __name__ == '__main__':
        queue = Manager().Queue()
    
        pool = Pool()
    
        t1 = pool.apply_async(test1, args=(queue,))
        t2 = pool.apply_async(test2, args=(queue,))
    
        pool.close()
        pool.join()
    

    将queue替换为Manager().Queue()即可。

    进程间通信的方式还可以通过管道,共享特殊的全局变量

    if __name__ == '__main__':
        from multiprocessing import Manager, Pipe
    
        share_dict = Manager().dict()
        pipe = Pipe()
    

    有兴趣的可以去了解一下,使用起来也很简单。

    并发、并行、同步、异步、阻塞、非阻塞

    并发和并行

    并发和并行是两个概念,经常会说高并发,但基本没人说高并行。

    • 并发:一段时间内,有几个程序在一个cpu上运行,但在该时间段内的一个时间点上只有一个程序在cpu上运行
    • 并行:在一个时间点上,有几个程序在几个cpu上同时运行。

    并行完全依赖物理资源,而并发对物理资源的依赖程序低一些。

    同步、异步

    • 同步:同步是指代码调用IO操作时,必须等待IO操作完成才能继续调用的方式。
    • 异步:指代码调用IO操作时,不用等待IO操作完成也能继续调用的方式。

    阻塞和非阻塞

    • 阻塞:调用函数的当前的线程被挂起
    • 非阻塞:调用函数的时候,当前线程不会被挂起,而是继续执行。

    协程和异步IO

    C10K问题

    如何在一颗1GHz CPU, 2G内存,1gbps的网络环境中为10000个客户端提供FTP服务。

    在这样的环境下,如果开启10000个线程来为客户端服务,显然是很难实现的。

    python著名的tornado框架就是为了解决C10K的问题而诞生的。

    IO多路复用

    SELECT、POLL、EPOLL

    SELECT 、POLL、EPOLL都是IO多路复用的机制。IO多路复用就是通过一种机制来监听多个描述符,一个某个描述符准备就绪(可读、可写),就能通知程序进行相应的操作。但是select、poll、epoll本质都是同步IO,因为他们都需要在读写时间就绪后自己负责进行读写,读写的过程是阻塞的,而异步IO则无须自己负责进行读写,异步IO的实现则会负责将数据从内核拷贝到用户空间。

    如果有go语言开发经验的话,会容易理解这个select, goselect可以监听多个chan,如果监听的chan中有非阻塞的,那么就去执行对应的case,否则就一定等待。

    同样的,这里的select可以监听多个描述符,当监听的对象的状态发生改变的时候,select就立即返回,否则就一直等待。select立即返回之后,需要遍历fdset来获取就绪的描述符。select单个线程能够监听的描述符的数量存在最大限制,在linux下为1024。

    select使用三个位图来表示三个fdset, 而poll使用一个pollfd的指针实现。

    pollfd没有最大限制,同select一样,poll返回之后,需要轮询pollfd来获取就绪的描述符。随之描述符的数量增长,其效率也会线性降低。

    epoll是在linux2.6内核提出的,是select和poll的增强版本。epoll没有描述符的限制。epoll使用一个描述符来管理多个描述符,将用户关系的描述符的事件存放到内核的一个时间表中,这样用户空间和内核空间的copy只需要一次。epoll查询内部使用红黑树的数据结构,查询起来特比快。

    但select并不代表比epoll差,分场景。

    在并发高,用户活跃度不高(web系统),epoll要优于select

    在并发不高,用户活跃度高 ( 游戏服务 ), select要优于epoll

    协程

    C10M

    如何利用8核心,64内存,在10gbps的网络上保持1000万的并发连接

    随着互联网技术日新月异的发展,c10k已经不能满足需求,c10m成为了一种挑战。

    但,同步变成的并发性不高,多线程编程需要线程同步,加锁会降低性能。线程的创建切换的开销大。

    现在希望可以使用写同步代码的方式去编写异步的代码,可以使用单线程去切换任务。

    但,线程的切换是有操作系统完成的,单线程的任务切换意味着开发者需要自己去调度任务。

    实例:

    def get_html(url):
        pass
    
    
    def parse_html(html):
        pass
    
    
    def get_next_url_list_from_html(url):
        html = get_html(url)  # io等待
        next_url_list = parse_html(html)
    
    
    get_next_url_list_from_html("your url")
    

    写过爬虫的对上面的代码一定不陌生。从一个网页中解析出需要爬取的url。

    但是从网络中获取html源码是一个IO操作,必须得等待html获取完成才可以继续执行html解析。假设,这个段代码循环10次,每次等待的时间为0.5秒,那么这段代码有5秒的时候cpu是空闲的。这时候我们希望,在执行get_html的时候,当前的函数可以暂停,等待IO操作执行完了再切换回来,中间等待的时候让cpu去执行别的任务。

    协程

    asyncio

    asyncio是python3.4引入的一个异步框架

    简单使用

    import asyncio
    
    
    async def test1():
        await asyncio.sleep(3)
        return "test1 done"
    
    
    if __name__ == '__main__':
        # 事件循环
        loop = asyncio.get_event_loop()
    
        # 给事件循环添加任务
        task = loop.create_task(test1())
    
        # 阻塞的方法,直到所有的任务全部完成!
        loop.run_until_complete(task)
    
        # 获取返回结果
        print(task.result())
    

    task返回的是一个asyncio提供的Future对象,它和线程的Future对象的接口使用起来类似。

    批量注册

    import asyncio
    
    
    async def test1():
        await asyncio.sleep(3)
        return "test1 done"
    
    
    if __name__ == '__main__':
        # 事件循环
        loop = asyncio.get_event_loop()
    
        # 给事件循环添加任务
        tasks = [test1() for i in range(10)]
    
        # 阻塞的方法,直到所有的任务全部完成!
        loop.run_until_complete(asyncio.wait(tasks))
    

    使用的是asyncio.sleep的休眠,还段代码是不会阻塞的,这个代码运行完毕使用了3秒多。

    import asyncio
    
    
    async def test1():
        await asyncio.sleep(3)
        return "test1 done"
    
    
    def callback(response):
        print(response)
    
    
    if __name__ == '__main__':
        # 事件循环
        loop = asyncio.get_event_loop()
    
        task = loop.create_task(test1())
    
        task.add_done_callback(callback)
    
        loop.run_until_complete(task)
    

    给任务添加回调函数

    关于loop还有很多高级的功能,这里只是简单演示一下

    import asyncio
    
    
    async def test1(time):
        await asyncio.sleep(time)
        print("test")
    
    
    if __name__ == '__main__':
        # 事件循环
        loop = asyncio.get_event_loop()
    
        tasks = [test1(i) for i in range(4)]
    
        try:
            loop.run_until_complete(asyncio.wait(tasks))
        except KeyboardInterrupt:
            all_tasks = asyncio.Task.all_tasks(loop)
            for task in all_tasks:
                print(task.cancel())
    
            # 这里一定要写上,不然会报错
            loop.stop()
            loop.run_forever()
        finally:
            loop.close()
    

    任务取消,在控制台运行代码,然后按ctrl+c停止就可以正常运行

    我的运行结果:

    test
    test
    False
    False
    True
    True
    True
    

    在协程中集成阻塞IO

    有时候在协程里调用别的api,但是该api是阻塞的,这时候就需要在协程里集成阻塞IO了

    from concurrent.futures import ThreadPoolExecutor
    import asyncio
    import time
    
    
    def get_html(url):
        time.sleep(2)
        return f'get html from {url} success'
    
    
    if __name__ == '__main__':
    
        start = time.time()
    
        loop = asyncio.get_event_loop()
        executor = ThreadPoolExecutor()
    
        # 单个任务
        tasks = loop.run_in_executor(executor, get_html, 'http://www.baidu.com')
    
        # 多个任务
        results = []
        for i in range(5):
            task = loop.run_in_executor(executor, get_html, 'http://www.baidu.com')
            results.append(task)
        loop.run_until_complete(asyncio.wait(results))
    
        print(time.time() - start)  # 2.0136215686798096
    

    asyncio的同步和通信

    asyncio这个框架提供了跟线程模块同样的同步机制,使用起来和线程模块的同步机制几乎没有区别。

    import asyncio
    
    lock = asyncio.Lock()
    
    results = []
    
    
    async def put():
        global lock, results
        await lock.acquire()
        results.append(1)
        await asyncio.sleep(3)
        lock.release()
    
    
    async def get():
        global lock, results
        async with lock:
            item = results.pop()
            print(item)
    
    
    tasks = [put(), get(), ]
    
    loop = asyncio.get_event_loop()
    
    loop.run_until_complete(asyncio.wait(tasks))
  • 相关阅读:
    Centos 7 安装配置
    日常问题
    Fluent_Python_Part1序幕,01-data-model, 数据模型
    计算机基础
    dist-packages vs site-packages
    斗地主 (NOIP2015 Day1 T3)
    字串变换 (2002 年NOIP全国联赛提高组)
    搜索
    关于动态最大子段和--线段树查询
    Caused by: org.apache.ibatis.reflection.ReflectionException: There is no getter for property named 'company' in 'class java.lang.String'
  • 原文地址:https://www.cnblogs.com/ivy-blogs/p/13405219.html
Copyright © 2011-2022 走看看