zoukankan      html  css  js  c++  java
  • Python 之协程

    多线程并发、包括线程池,是操作系统控制的并发。如果是单线程,可以通过协程实现单线程下的并发。

    协程 又称微线程,是一种用户态的轻量级线程,由用户程序自己控制调度。

    python的线程属于内核级别的,由操作系统控制调度(如单线程遇到io或执行时间过长就会被迫交出cpu执行权限,切换其他线程运行)

    而单线程里开启协程,一旦遇到io,由用户自己控制调度。

    特点:

    1、单线程里并发

    2、修改共享数据不需枷锁

    3、用户程序里保存多个控制流的上下文栈

    4、附加:一个协程遇到IO操作自动切换到其他协程(yield,greenlet都无法实现检测IO,用gevent模块的select机制可以)

    并发:

    # 注意到consumer函数是一个generator(生成器):
    # 任何包含yield关键字的函数都会自动成为生成器(generator)对象
    def consumer():
        r = ''
        while True:
            # 3、consumer通过yield拿到消息,处理,又通过yield把结果传回;
            #    yield指令具有return关键字的作用。然后函数的堆栈会自动冻结(freeze)在这一行。
            #    当函数调用者的下一次利用next()或generator.send()或for-in来再次调用该函数时,
            #    就会从yield代码的下一行开始,继续执行,再返回下一次迭代结果。通过这种方式,迭代器可以实现无限序列和惰性求值。
            n = yield r
            if not n:
                return
            print('[CONSUMER] ←← Consuming %s...' % n)
            time.sleep(1)
            r = '200 OK'
    
    
    def produce():
        # 1、首先调用c.next()启动生成器
        c = consumer()
        next(c)
        n = 0
        while n < 5:
            n = n + 1
            print('[PRODUCER] →→ Producing %s...' % n)
            # 2、然后,一旦生产了东西,通过c.send(n)切换到consumer执行;
            cr = c.send(n)
            # 4、produce拿到consumer处理的结果,继续生产下一条消息;
            print('[PRODUCER] Consumer return: %s' % cr)
        # 5、produce决定不生产了,通过c.close()关闭consumer,整个过程结束。
        c.close()
    
    if __name__ == '__main__':
        start_time = time.time()
        # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,在一个线程里协作完成。
        res = produce()
        # consumer(res)
        end_time = time.time()
        print(end_time-start_time)

    串行:

    def produce():
        n = 0
        res = []
        while n < 5:
            n = n + 1
            print('[PRODUCER] →→ Producing %s...' % n)
            res.append(n)
        return res
    
    def consumer(res):
        pass
    
    if __name__ == '__main__':
        start_time = time.time()
        # 6、整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,在一个线程里协作完成。
        res = produce()
        consumer(res)
        end_time = time.time()
        print(end_time-start_time)

    greenlet模块,方便切换,但遇到IO时,不会自动切换到其他任务

    from greenlet import greenlet
    
    def eat(name):
        print('%s is eating 1'%name)
        gr2.switch('zhanwu')
        print('%s is eating 2' % name)
        gr2.switch()
    
    def play(name):
        print('%s is playing 1' % name)
        gr1.switch()
        print('%s is playing 2' % name)
    
    gr1 = greenlet(eat)
    gr2 = greenlet(play)
    gr1.switch('egon')  #切换的时候传参

    gevent模块,异步提交任务,遇到IO时可实现自动切换。

    异步提交任务后,主线程结束的话,会导致子线程任务完不成,通过sleep或join实现主线程不死直到子线程运行结束。

    from gevent import monkey
    monkey.patch_all()   #将下面的阻塞操作变为非阻塞操作,这是用gevent必须的
    def eat(name):
        print('%s is eating 1'%name)
        gevent.sleep(3)
        print('%s is eating 2' % name)
    
    def play(name):
        print('%s is playing 1' % name)
        gevent.sleep(4)
        print('%s is playing 2' % name)
    
    
    gevent.joinall(
        [gevent.spawn(eat,'egon'),
        gevent.spawn(play,'egon')]
    )

    基于gevent实现单线程下的并发。

    如果开多个进程,每个进程里开多个线程,每个线程里再开协程,会大大提升效率。
    
    server端
    from gevent import monkey,spawn;monkey.patch_all()
    from socket import *
    
    
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not data: break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    
    def server(ip,port): # 来一个客户端,起一个conn
        server = socket(AF_INET, SOCK_STREAM)
        server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
        server.bind((ip, port))
        server.listen(5)
    
        while True:
            conn,addr = server.accept()
            spawn(talk,conn)
        server.close()
    
    if __name__ == '__main__':
        g = spawn(server,'127.0.0.1',8087)
        g.join()
    
    client端:
    from socket import *
    from threading import Thread,currentThread
    def client():
        client = socket(AF_INET, SOCK_STREAM)
        client.connect(('127.0.0.1',8087))
    
        while True:
            client.send(('%s say hello'%currentThread().getName()).encode('utf8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
    if __name__ == '__main__':
        for i in range(500):
            t = Thread(target=client)
            t.start()
  • 相关阅读:
    Educational Codeforces Round 20 D. Magazine Ad
    Educational Codeforces Round 20 C. Maximal GCD
    紫书第三章训练2 暴力集
    Educational Codeforces Round 20 B. Distances to Zero
    Educational Codeforces Round 20 A. Maximal Binary Matrix
    紫书第三章训练1 D
    紫书第一章训练1 D -Message Decoding
    HAZU校赛 Problem K: Deadline
    Mutual Training for Wannafly Union #8 D
    紫书第三章训练1 E
  • 原文地址:https://www.cnblogs.com/stin/p/8549051.html
Copyright © 2011-2022 走看看