zoukankan      html  css  js  c++  java
  • Python 之协程

    多线程并发、包括线程池,是操作系统控制的并发。如果是单线程,可以通过协程实现单线程下的并发。

    协程 又称微线程,是一种用户态的轻量级线程,由用户程序自己控制调度。

    python的线程属于内核级别的,由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)

    而单线程里开启协程,一旦遇到io,由用户自己控制调度。

    特点:

    1、单线程里并发

    2、修改共享数据不需枷锁

    3、用户程序里保存多个控制流的上下文栈

    4、附加:一个协程遇到IO操作自动切换到其他协程(yield,greenlet都无法实现检测IO,用gevent模块的select机制可以)

    并发:

    # 注意到consumer函数是一个generator(生成器):
    # 任何包含yield关键字的函数都会自动成为生成器(generator)对象
    def consumer():
        r = ''
        while True:
            # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
            #    yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
            #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
            #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
            n = yield r
            if not n:
                return
            print('[CONSUMER] ←← Consuming %s...' % n)
            time.sleep(1)
            r = '200 OK'
    
    
    def produce():
        # 1、首先调用c.next()启动生成器
        c = consumer()
        next(c)
        n = 0
        while n < 5:
            n = n + 1
            print('[PRODUCER] →→ Producing %s...' % n)
            # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
            cr = c.send(n)
            # 4、produce拿到consumer处理的结果,继续生产下一条消息;
            print('[PRODUCER] Consumer return: %s' % cr)
        # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
        c.close()
    
    if __name__ == '__main__':
        start_time = time.time()
        # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,在一个线程里协作完成。
        res = produce()
        # consumer(res)
        end_time = time.time()
        print(end_time-start_time)

    串行:

    def produce():
        n = 0
        res = []
        while n < 5:
            n = n + 1
            print('[PRODUCER] →→ Producing %s...' % n)
            res.append(n)
        return res
    
    def consumer(res):
        pass
    
    if __name__ == '__main__':
        start_time = time.time()
        # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,在一个线程里协作完成。
        res = produce()
        consumer(res)
        end_time = time.time()
        print(end_time-start_time)

    greenlet模块,方便切换,但遇到IO时,不会自动切换到其他任务

    from greenlet import greenlet
    
    def eat(name):
        print('%s is eating 1'%name)
        gr2.switch('zhanwu')
        print('%s is eating 2' % name)
        gr2.switch()
    
    def play(name):
        print('%s is playing 1' % name)
        gr1.switch()
        print('%s is playing 2' % name)
    
    gr1 = greenlet(eat)
    gr2 = greenlet(play)
    gr1.switch('egon')  #切换的时候传参

    gevent模块,异步提交任务,遇到IO时可实现自动切换。

    异步提交任务后,主线程结束的话,会导致子线程任务完不成,通过sleep或join实现主线程不死直到子线程运行结束。

    from gevent import monkey
    monkey.patch_all()   #将下面的阻塞操作变为非阻塞操作,这是用gevent必须的
    def eat(name):
        print('%s is eating 1'%name)
        gevent.sleep(3)
        print('%s is eating 2' % name)
    
    def play(name):
        print('%s is playing 1' % name)
        gevent.sleep(4)
        print('%s is playing 2' % name)
    
    
    gevent.joinall(
        [gevent.spawn(eat,'egon'),
        gevent.spawn(play,'egon')]
    )

    基于gevent实现单线程下的并发。

    如果开多个进程,每个进程里开多个线程,每个线程里再开协程,会大大提升效率。
    
    server端
    from gevent import monkey,spawn;monkey.patch_all()
    from socket import *
    
    
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    
    def server(ip,port): # 来一个客户端,起一个conn
        server = socket(AF_INET, SOCK_STREAM)
        server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        server.bind((ip, port))
        server.listen(5)
    
        while True:
            conn,addr = server.accept()
            spawn(talk,conn)
        server.close()
    
    if __name__ == '__main__':
        g = spawn(server,'127.0.0.1',8087)
        g.join()
    
    client端:
    from socket import *
    from threading import Thread,currentThread
    def client():
        client = socket(AF_INET, SOCK_STREAM)
        client.connect(('127.0.0.1',8087))
    
        while True:
            client.send(('%s say hello'%currentThread().getName()).encode('utf8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
    if __name__ == '__main__':
        for i in range(500):
            t = Thread(target=client)
            t.start()
  • 相关阅读:
    git操作说明书
    python之routes入门
    python inspect库
    Python SMTP发送邮件
    Python深入:setuptools进阶
    Python打包之setuptools
    python graphviz的使用(画图工具)
    pathlib的使用
    python tempfile 创建临时目录
    python flake8 代码扫描
  • 原文地址:https://www.cnblogs.com/stin/p/8549051.html
Copyright © 2011-2022 走看看