zoukankan      html  css  js  c++  java
  • python — 协程

    1. 协程

    1.1 协程基础

    1.协程 :能够在一个线程下的多个任务之间来回切换,那么每一个任务都是一个协程。

    2.协程的优点:

    • 1.一个线程中的阻塞都被其他的各种任务沾满了
    • 2.让操作系统觉得这个线程很忙,尽量的减少这个线程进入阻塞的状态,提高了单线程对CPU的利用率。
    • 3.多个任务在同一个线程中执行,也达到了一个并发的效果,规避了每一个任务的io操作,减少了线程的个数,减轻了操作系统的负担。

    3.协程是用户级别的,由我们自己写的python代码来控制切换的,是操作系统不可见的。
    4.在Cpython解释器下:

    • 由于多线程本身就不能利用多核,所以即便是开启了多个线程也只能轮流在一个CPU上执行,协程如果把所有任务的IO操作都规避掉,只剩下需要使用CPU的操作,就意味着协程就可以做到提高CPU利用率的效果。
    • 协程和线程都不能利用多核,都是在一个CPU上轮流执行

    5.多线程和协程的区别:

    • 线程:切换需要操作系统,开销大,操作系统不可控,给操作系统的压力大,操作系统对IO操作的感知更加灵敏。
    • 协程:切换需要python代码,开销小,用户操作可控,完全不会增加操作系统的压力,用户级别能够对IO操作的感知比较低。

    1.2 切换

    两种切换方式:

    • 原生python完成 — asyncio(基于yield)
    • C语言完成的python模块 —— gevent 或 greenlet
    1.2.1 gevent模块 / greenlet模块

    1.greenlet模块

    import time
    from  greenlet import greenlet
    
    def eat():
        print('wusir is eating')
        time.sleep(0.5)
        g2.switch()
        print('wusir finished eat')
    
    def sleep():
        print('小马哥 is sleeping')
        time.sleep(0.5)
        print('小马哥 finished sleep')
        g1.switch()
    
    g1 = greenlet(eat)  # (实例化一个对象)创造一个协程任务
    g2 = greenlet(sleep)
    g1.switch()   # 做切换
    

    2.gevent模块

    gevent是基于greenlet切换的

    <1.> 没有返回值

    import time
    import gevent
    def eat():
        print('wusir is eating')
        time.sleep(1)
        print('wusir finished eat')
    def sleep():
        print('小马哥 is sleeping')
        time.sleep(1)
        print('小马哥 finished sleep')
    
    g1 = gevent.spawn(eat)  # 创造一个协程任务
    gevent.sleep(1)  # 阻塞,切换出去去执行任务
    

    注意:

    • 在gevent中,gevent.sleep(1)与time.sleep(1)执行的效果不一样,

      gevent.sleep(1) -- 切换、切出去,time.sleep(1) -- 就是普通的睡,没有切换的效果。

    • 如果想让gevent认识time.sleep() -- 切出去,需要导入:

      from gevent import monkey
      monkey.patch_all()

      注意:

      • 此时的time已经不是python中内置函数time了,已经被 patch_all 重写了。(效果见下面示例)

    分辨gevent是否识别了我们写的代码中的io操作的方法:

    • 在patchall之前打印一下涉及到io操作的函数地址
    • 在patchall之后打印一下涉及到io操作的函数地址
    • 如果两个地址一致,说明gevent没有识别这个io,如果不一致说明识别了
    import time
    print('-->',time.sleep)
    import gevent
    from gevent import monkey
    monkey.patch_all()
    def eat():
        print('wusir is eating')
        print('in eat: ',time.sleep)
        time.sleep(1)
        print('wusir finished eat')
    
    def sleep():
        print('小马哥 is sleeping')
        time.sleep(1)
        print('小马哥 finished sleep')
    
    g1 = gevent.spawn(eat)  # 创造一个协程任务(发布任务)
    g2 = gevent.spawn(sleep)
    g1.join()   # 阻塞 直到g1任务完成为止(没有join,主程序不会做切换)
    g2.join()
    
    import time
    import gevent
    from gevent import monkey
    monkey.patch_all()
    def eat():
        print('wusir is eating')
        time.sleep(1)
        print('wusir finished eat')
    
    def sleep():
        print('小马哥 is sleeping')
        time.sleep(1)
        print('小马哥 finished sleep')
        
    # 方式一:
    # g1 = gevent.spawn(eat)
    # g2 = gevent.spawn(sleep)
    # # g1.join()
    # # g2.join()
    
    # gevent.joinall([g1,g2])  # gevent.joinall([g1,g2]) 相当于 g1.join() + g2.join()
    
    # 方式二:
    g_l = []
    for i in range(10):
        g = gevent.spawn(eat)
        g_l.append(g)
    gevent.joinall(g_l)
    

    <2.> 有返回值

    对象.value 接收返回值

    value 是一个属性,没有阻塞功能,需要使用join / joinall 阻塞切换出去

    import time
    import gevent
    from gevent import monkey
    monkey.patch_all()
    def eat():
        print('wusir is eating')
        time.sleep(1)
        print('wusir finished eat')
        return 'wusir***'
    
    def sleep():
        print('小马哥 is sleeping')
        time.sleep(1)
        print('小马哥 finished sleep')
        return '小马哥666'
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(sleep)
    gevent.joinall([g1,g2])
    print(g1.value)
    print(g2.value)
    

    ❤️.> 用于socket

    用于socket的 server端 时,遇到 io 操作就切换。

    传参:gevent.spawn(函数名,参数1,参数2,……)

    # server端
    import gevent
    from gevent import monkey
    monkey.patch_all()
    import socket
    
    def chat(conn):
        while True:
            msg = conn.recv(1024).decode('utf-8')
            conn.send(msg.upper().encode('utf-8'))
    
    sk = socket.socket()
    sk.bind(('127.0.0.1',9000))
    sk.listen()
    
    while True:
        conn,_ = sk.accept()
        gevent.spawn(chat,conn)
    
    # client端    
    import time
    import socket
    def client(i):
        sk = socket.socket()
        sk.connect(('127.0.0.1',9000))
    
        while True:
            sk.send('hello'.encode('utf-8'))
            print(i*'*',sk.recv(1024))
            time.sleep(0.5)
    
    from threading import Thread
    for i in range(500):
        Thread(target=client,args =(i,)).start()
    
    1.2.2 asyncio 模块

    asyncio 模块是基于yield机制切换的

    await asyncio.sleep() 异步 阻塞(做切换)

    await 所在的函数前面必须加 async,变成一个async函数。

    python原生的底层的协程模块:

    • 爬虫、webserver框架
    • 提高网络编程的效率和并发效果

    语法:

    • 1.await — async wait: 阻塞 协程函数这里要切换出去,还能保证一会儿再切回来

      await 必须写在async函数里,async函数是协程函数

    • loop 事件循环

      所有的协程的执行 调度 都离不开这个loop

    1.起一个任务

    import asyncio
    async def demo():   # 协程方法(表示是一个async函数)
        print('start')
        await asyncio.sleep(1)  # 阻塞
        print('end')
    
    loop = asyncio.get_event_loop()  # 创建一个事件循环
    loop.run_until_complete(demo())  # 把demo任务丢到事件循环中去执行
    

    2.启动多个任务,并且没有返回值

    import asyncio
    async def demo():   # 协程方法
        print('start')
        await asyncio.sleep(1)  # 阻塞
        print('end')
    
    loop = asyncio.get_event_loop()  # 创建一个事件循环
    wait_obj = asyncio.wait([demo(),demo(),demo()])
    loop.run_until_complete(wait_obj)
    

    3.启动多个任务并且有返回值

    import asyncio
    async def demo():   # 协程方法
        print('start')
        await asyncio.sleep(1)  # 阻塞
        print('end')
        return 123
    
    loop = asyncio.get_event_loop()
    t1 = loop.create_task(demo())
    t2 = loop.create_task(demo())
    tasks = [t1,t2]
    wait_obj = asyncio.wait([t1,t2])
    loop.run_until_complete(wait_obj)
    for t in tasks:
        print(t.result())
    

    4.谁先回来先取谁的结果

    import asyncio
    async def demo(i):   # 协程方法
        print('start')
        await asyncio.sleep(10-i)  # 阻塞
        print('end')
        return i,123
    
    async def main():
        task_l = []
        for i in range(10):
            task = asyncio.ensure_future(demo(i))
            task_l.append(task)
        for ret in asyncio.as_completed(task_l):
            res = await ret
            print(res)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    

    5.用asyncio做一个底层的爬虫示例

    import asyncio
    
    async def get_url():
        reader,writer = await asyncio.open_connection('www.baidu.com',80)
        writer.write(b'GET / HTTP/1.1
    HOST:www.baidu.com
    Connection:close
    
    ')
        all_lines = []
        async for line in reader:
            data = line.decode()
            all_lines.append(data)
        html = '
    '.join(all_lines)
        return html
    
    async def main():
        tasks = []
        for url in range(20):
            tasks.append(asyncio.ensure_future(get_url()))
        for res in asyncio.as_completed(tasks):
            result = await res
            print(result)
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main())  # 处理一个任务
    

    1.3 总结

    1.进程、线程、协程各自的特点:

    • 1.进程:开销大,数据隔离,能利用多核,数据不安全,操作系统控制
    • 2.线程:开销适中,数据共享,在cpython解释器下不能利用多核,数据不安全,操作系统控制
    • 3.协程:开销小,数据共享,不能利用多核,数据安全,用户控制

    2.ansyncio模块为我们提供了哪两个关键字?分别用什么作用?

    • async 标识一个协程函数
    • await 后面跟着一个asyncio模块提供的io操作的函数
    • loop 事件循环,负责在多个任务之间进行切换的

    3.关于读代码相关的问题:

    • 1.开启线程是几乎不需要事件的,start是一个异步非阻塞方法
    • 2.对于整数的 += 、-= 来说 异步的多线程数据不安全,如果是同步的数据就安全了
    • 3.对于列表的操作:无论是异步还是同步的 都是数据安全的

    4.有一个文件,这个文件中有20001行数据,开启一个线程池,为每100行创建一个任务,打印这100行数据。

    # 方法一:
    def print_line(lines):
        print(lines)
    
    from concurrent.futures import ThreadPoolExecutor
    tp = ThreadPoolExecutor(20)
    with open('file',encoding='utf-8') as f:
        lines = []
        for line in f:
            if len(lines) == 100:
                tp.submit(print_line,lines)
                lines.clear()
            lines.append(line)
        if lines:
            tp.submit(print_line, lines)
    # 方法二:
    def print_line(lines):
        print(lines)
    
    def read_file(filename):
        with open(filename, encoding='utf-8') as f:
            for line in f:
                yield line
    
    def submit_func(tp,line=None,end= False,lines = []):
        if line:
            lines.append(line)
        if len(lines) == 100 or end:
            tp.submit(print_line, lines)
            lines.clear()
    
    from concurrent.futures import ThreadPoolExecutor
    tp = ThreadPoolExecutor(20)
    for line in read_file('file'):
        submit_func(tp,line)
    submit_func(tp,end=True)
    
    1. print 和 文件的读、写都是io操作:

      上面这个题主要就是起线程在一个线程读文件的时候,利用这个线程读取文件的时间交给另一个线程来进行打印操作

    2. 使用线程可以有效的规避掉io操作的时间,提高程序的的效率

    3. 解耦程序的功能

    4. 默认参数是列表

  • 相关阅读:
    Java事务
    Mybatis二级缓存问题
    183.面试题 17.14. 最小K个数(快速排序)
    182. 跟着三叶学最短路径问题(存图方式)
    181. 差分数组学习
    AI大视觉(二十) | 小目标检测的tricks汇总
    CentOS7 上安装 mysql-5.7.26
    如何欺骗 Go Mod?
    .netcore docker常用命令-持续补充
    转载:登录后,用户配置被修改的处理方法
  • 原文地址:https://www.cnblogs.com/yangjie0906/p/11214134.html
Copyright © 2011-2022 走看看