zoukankan      html  css  js  c++  java
  • Python

    目录

    • 协程介绍
    • greenlet模块
    • gevent模块

    1,协程介绍

    • 协程:是单线程下的并发,又称微线程,纤程。英文名Coroutine。协程是一种用户态的轻量级线程,即协程是由用户程序自己控制调度的。

    • 并发的本质:切换+保存状态

    • 在操作系统中

      • 进程:是资源分配的最小单位
      • 线程:是CPU调度的最小单位
      • 协程:是单线程内实现并发切换执行任务的
    • 使用yield也可以实现在一个主线程中切换执行

    # 生成器 yield
    def func1():
        print(1)
        yield
        print(3)
        yield
    
    def func2():
        g = func1()   # 生成器函数在被调用时不会立即执行,除非next(g)触发才可以
        next(g)       # 开始执行func1()函数,但是遇到yield就会停止
        print(2)
        next(g)
        print(4)
    
    func2()
    
    # 结果呈现
    1
    2
    3
    4
    
    def consumer():    # 消费者模型
        while True:
            n = yield     # yield接收g.send()的结果,然后赋值给n
            print("消费了包子 %s" % n)
    
    def producer():   # 生产者模型
        g = consumer()    # 调用生成器函数,并不会理解执行生成器函数内部的代码(除非next()进行触发)
        next(g)   # 开始执行生成器函数
        for i in range(10):
            print("生产了包子 %s" % i)
            g.send(i) # send()给生成器函数yield处接收
    
    producer()
    
    # 结果呈现
    生产了包子 0
    消费了包子 0
    生产了包子 1
    消费了包子 1
    生产了包子 2
    消费了包子 2
    生产了包子 3
    消费了包子 3
    生产了包子 4
    消费了包子 4
    生产了包子 5
    消费了包子 5
    生产了包子 6
    消费了包子 6
    生产了包子 7
    消费了包子 7
    生产了包子 8
    消费了包子 8
    生产了包子 9
    消费了包子 9
    
    • 在yield切换中,在任务一遇到io情况下,切到任务二去执行,这样就可以利用任务一阻塞的时间完成任务二的计算,效率的提升就在于此。
    • 需要强调的是:
      • 1 python的线程属于内核级别的,即由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)
      • 2 单线程内开启协程,一旦遇到io,就会从应用程序级别(而非操作系统)控制切换,以此来提升效率(!!!非io操作的切换与效率无关)
    • 对比操作系统控制线程的切换,用户在单线程内控制协程的切换
      • 优点:
        • 1 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
        • 2 单线程内就可以实现并发的效果,最大限度地利用CPU
      • 缺点:
        • 1 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程开启多个线程,每个线程内开启协程
        • 2 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
    • 总结协程特点
      • 1 必须在只有要给单线程里实现并发
      • 2 修改共享数据不需要加锁
      • 3 用户程序里自己保持多个控制的上下文栈
      • 4 附加:一个协程遇到IO操作自动切换到其它协程(如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制))

    2,greenlet模块

    2.1 greenlet 实现同一线程内切换

    import time
    from greenlet import greenlet   # 在单线程中切换状态的模块
    def eat1():
        print("吃鸡腿")
        g2.switch()     # 切换执行eat2
        time.sleep(5)         # greenlet进行切换时,并不会规避掉IO时间(也就是切换回来时还是需要等待2秒在执行)
        print("吃鸡翅")
        g2.switch()     # 切换执行eat2
    def eat2():
        print("吃饺子")
        g1.switch()     # 切换执行eat1
        time.sleep(3)         # greenlet进行切换时,并不会规避掉IO时间(也就是切换回来时还是需要等待2秒在执行)
        print("白切鸡")
    
    g1 = greenlet(eat1)
    g2 = greenlet(eat2)
    g1.switch()    # 切换执行eat1
    
    # 结果呈现
    吃鸡腿
    吃饺子
    吃鸡翅
    白切鸡
    
    • 如果在同一个程序有IO的情况下,才切换会让效率提高很多,但是yield greenlet均不会在切换时规避掉IO时间

    2.2 greenlet 实现 效率对比

    #顺序执行
    import time
    def f1():
        res=1
        for i in range(100000000):
            res+=i
    
    def f2():
        res=1
        for i in range(100000000):
            res*=i
    
    start=time.time()
    f1()
    f2()
    stop=time.time()
    print('run time is %s' %(stop-start)) #10.985628366470337
    
    #切换
    from greenlet import greenlet
    import time
    def f1():
        res=1
        for i in range(100000000):
            res+=i
            g2.switch()
    
    def f2():
        res=1
        for i in range(100000000):
            res*=i
            g1.switch()
    
    start=time.time()
    g1=greenlet(f1)
    g2=greenlet(f2)
    g1.switch()
    stop=time.time()
    print('run time is %s' %(stop-start)) # 52.763017892837524
    

    3,gevent模块

    • gevent 就是当遇到gevent.sleep() IO 时会自动切换;
    # gevent 内部封装了greenlet模块
    import gevent
    def eat(name):
        print('%s eat 1' %name)
        gevent.sleep(2)        # gevent可以在gevevt.sleep()自己认识的IO操作切换
        print('%s eat 2' %name)
    
    def play(name):
        print('%s play 1' %name)
        gevent.sleep(1)
        print('%s play 2' %name)
    
    
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,name='egon')
    # g1.join()
    # g2.join()
    gevent.joinall([g1,g2])    # 相当于上面的g1.join()  g2.join()
    
    # 结果呈现
    egon eat 1
    egon play 1
    egon play 2
    egon eat 2
    
    • gevent()对普通的IO (比如time模块的sleep,socket 以及urllib request等网络请求)是无法切换的:
    import gevent,time
    def eat(name):
        print('%s eat 1' %name)
        time.sleep(2)
        print('%s eat 2' %name)
    
    def play(name):
        print('%s play 1' %name)
        time.sleep(1)
        print('%s play 2' %name)
    
    
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,name='egon')
    gevent.joinall([g1,g2])
    
    # 结果呈现
    egon eat 1
    egon eat 2
    egon play 1
    egon play 2
    
    • 上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
    • from gevent import monkey;monkey.patch_all() 必须放到被打补丁者的前面,如time,socket模块之前
    • 我们可以用 threading.current_thread().getName() 来查看每个g1和g2,查看的结果为DummyThread-n,即假线程
    from gevent import monkey;monkey.patch_all()    # 加上这句话,gevent遇到其他模块(time,socket等IO操作)的IO 需要等待时 就会切换协程
    from threading import current_thread
    import gevent,time
    def func1():
        print(current_thread().name)    # 打印当前线程名(其实协程并不是线程,多个协程是在同一个线程内完成的)
        print(123)
        time.sleep(1)
        print(456)
    
    def func2():
        print(current_thread().name)
        print(789)
        time.sleep(1)
        print(101112)
    
    g1 = gevent.spawn(func1)    # 遇见它认识的io会自动切换的模块
    g2 = gevent.spawn(func2)
    # g1.join()
    # g2.join()
    gevent.joinall([g1,g2])
    
    # 结果呈现
    DummyThread-1
    123
    DummyThread-2
    789
    456
    101112
    

    3.1 Gevent之同步与异步

    • 测试有IO操作时,使用多个协程与开单线程单步执行多个任务执行效率的对比
    from gevent import monkey;monkey.patch_all()
    import gevent,time
    
    def task(args):
        time.sleep(1)
        print(args)
    
    def sync_fucn():    # 同步
        for i in range(10):
            task(i)
    
    def async_func():   # 异步
        g_lst = []
        for i in range(10):
            g_lst.append(gevent.spawn(task,i))    # 发起协程任务,传参数
        gevent.joinall(g_lst)
    
    start = time.time()
    sync_fucn()
    print(time.time() - start)
    
    start = time.time()
    async_func()
    print(time.time() - start)
    
    # 结果呈现
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10.011728048324585
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    1.0025279521942139
    

    3.2 开多个协程去爬取多个网页与单线程单步执行爬取网页的效率对比

    # 爬去网页信息的例子
    from gevent import monkey;monkey.patch_all()
    import gevent,requests,time
    
    # 协程函数发起10个网页的爬取任务
    def get_url(url):
        res = requests.get(url)
        print(url,res.status_code,len(res.text))    # 返回爬取网页的信息(requests.get(url).text----获取网页源代码; requests.get(url).status_code----获取网页状态码)
    
    url_lst = [
        "http://www.sohu.com",
        "http://www.baidu.com",
        "http://www.qq.com",
        "http://www.python.org",
        "http://www.cnblogs.com",
        "http://www.mi.com",
        "http://www.apache.org"
    ]
    
    g_lst = []
    start = time.time()
    for url in url_lst:
        g = gevent.spawn(get_url,url)
        g_lst.append(g)
    gevent.joinall(g_lst)
    print(time.time() - start)
    
    start = time.time()
    for url in url_lst:
        get_url(url)
    print(time.time() - start)
    
    # 结果呈现
    http://www.baidu.com 200 2381
    http://www.sohu.com 200 178923
    http://www.qq.com 200 205793
    http://www.mi.com 200 312788
    http://www.cnblogs.com 200 41063
    http://www.apache.org 200 62019
    http://www.python.org 200 49235
    1.198430061340332
    http://www.sohu.com 200 178923
    http://www.baidu.com 200 2381
    http://www.qq.com 200 205793
    http://www.python.org 200 49235
    http://www.cnblogs.com 200 41043
    http://www.mi.com 200 312788
    http://www.apache.org 200 62019
    2.1779263019561768
    
    • 协程在响应一个网页时,有网络延时,它就可能利用这个时间去打开其他网页了,也就是时间复用,有可能利用第一个网页等待时间,把剩下所有网页的请求都发出去了;
    • 同步单步执行时,每执行一个网页就会等待网络延时,串行的;而协程就是在发送一个网页时,不等,因为它直到有网络延时,所以直接执行下一个任务;

    3.3 使用协程完成server端和client端的通信

    • 测试连通性
    # server
    import socket
    
    sk = socket.socket()
    sk.bind(("127.0.0.1",8080))
    sk.listen()
    
    conn,addr = sk.accept()
    ret = conn.recv(1024).decode("utf-8")
    print(ret)
    conn.send(ret.upper().encode("utf-8"))
    conn.close()
    sk.close()
    
    # client
    import socket
    sk = socket.socket()
    sk.connect(("127.0.0.1",8080))
    sk.send(b"hi")
    ret = sk.recv(1024).decode("utf-8")
    print(ret)
    sk.close()
    
    • 客户端并发连接服务端
    # server
    from gevent import monkey;monkey.patch_all()
    import socket,gevent
    
    def talk(conn):
        while True:
            ret = conn.recv(1024).decode("utf-8")
            print(ret)
            conn.send(ret.upper().encode("utf-8"))
        conn.close()
    
    sk = socket.socket()
    sk.bind(("127.0.0.1",8080))
    sk.listen()
    
    while True:
        conn,addr = sk.accept()
        gevent.spawn(talk,conn)
    sk.close()
    
    # client
    from gevent import monkey;monkey.patch_all()
    import socket,gevent,time,threading
    
    def my_client():
        sk = socket.socket()
        sk.connect(("127.0.0.1",8080))
        while True:
            sk.send(b"hi")
            ret = sk.recv(1024).decode("utf-8")
            print(ret)
            time.sleep(1)
        sk.close()
    for i in range(500):
        threading.Thread(target=my_client).start()
    
  • 相关阅读:
    安装kafka
    linux安装jdk
    rabbitmq
    企业级docker镜像仓库----Harbor高可用部署
    kubernetes基础概念理解
    kubeadm安装kubernetes集群v1.14.3
    salt-stack深入学习
    salt-stack的数据系统Pillars
    salt-stack的数据系统Grains
    salt-stack
  • 原文地址:https://www.cnblogs.com/xiaoqshuo/p/9878021.html
Copyright © 2011-2022 走看看