一、greenlet模块
如果我们在单个线程内有20个任务,要想实现在多个任务之间切换,使用yield生成器的方式过于麻烦(需要先得到初始化一次的生成器,然后再调用send。。。非常麻烦),而使用greenlet模块可以非常简单地实现这20个任务直接的切换。
from greenlet import greenlet # greenlet能实现很方便的切换,但是不能实现检测到 I/O才切 # 吃一会返,玩一会手机,来回切换 def eat(name): print("%s eat 1." % name) g2.switch("托儿所") # 第一次切换 print("%s eat 2." % name) g2.switch() def play(name): print("%s play 1" % name) g1.switch() print("%s play 2" % name) g1 = greenlet(eat) g2 = greenlet(play) g1.switch("托儿所") # 启动/切换,可以在第一次switch时传入参数,以后都不需要。 """ 托儿所 eat 1. 托儿所 play 1 托儿所 eat 2. 托儿所 play 2 """
单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度。
# 顺序执行 import time def f1(): res =1 for i in range(100000000): res += i def f2(): res = 1 for i in range(100000000): res *= i start = time.time() f1() f2() stop = time.time() print('run time is %s' % (stop-start)) # 10.985628366470337
# 切换 from greenlet import greenlet import time def f1(): res = 1 for i in range(100000000): res += i g2.switch() def f2(): res = 1 for i in range(100000000): res *= i g1.switch() start = time.time() g1 = greenlet(f1) g2 = greenlet(f2) g1.switch() stop = time.time() print('run time is %s' % (stop-start)) # 52.763017892837524
greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
单线程里的这20个任务的代码通常会既有计算操作又有阻塞操作,我们完全可以在执行任务1时遇到阻塞,就利用阻塞的时间去执行任务2。。。。如此,才能提高效率,这就用到了Gevent模块。
二、gevent模块
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,它是以 C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
# 用法 # 创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的 g1 = gevent.spawn(func,1,2,3,x=4,y=5) g2 = gevent.spawn(func2) g1.join() # 等待g1结束 g2.join() # 等待g2结束 # 或者上述两步合作一步:gevent.joinall([g1,g2]) g1.value # 拿到func1的返回值
遇到IO阻塞时会自动切换任务:
import gevent import time def eat(name): print("%s eat 1." % name) gevent.sleep(3) # 与 time.sleep() 是一样的道理 print("%s eat 2." % name) def play(name): print("%s play 1" % name) gevent.sleep(4) print("%s play 2" % name) start = time.time() g1 = gevent.spawn(eat,"托儿所") # 提交任务,(任务名,参数) g2 = gevent.spawn(play,"托儿所") # 异步提交,不等待结果,提交完后,有可能这俩任务还没开起来,就结束了。 g1.join() # 等这俩任务执行, g2.join() # 或者gevent.joinall([g1,g2]) stop = time.time() print(start-stop) """ 托儿所 eat 1. 托儿所 play 1 托儿所 eat 2. 托儿所 play 2 -4.05228066444397 """
上例 gevent.sleep(3)模拟的是 gevent可以识别的 I/O阻塞,而 time.sleep(3)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了,
from gevent import monkey;monkey.patch_all()
必须放到被打补丁者的前面,如 time,socket模块之前,或者我们干脆记忆成:要用gevent,需要将
from gevent import monkey;monkey.patch_all() 放到文件的开头。
# 但凡要用 gevent模块实现一个检测 I/O操作,就需要在整个文件的开头写上下面两行代码。 from gevent import monkey;monkey.patch_all() # monkey.path_all() import gevent import time def eat(name): print("%s eat 1." % name) time.sleep(3) # 即便不是 gevent模块的 I/O操作,加上最上面的两行代码,也是能够实现监测的 print("%s eat 2." % name) def play(name): print("%s play 1" % name) time.sleep(4) print("%s play 2" % name) start = time.time() g1 = gevent.spawn(eat,"托儿所") g2 = gevent.spawn(play,"托儿所") g1.join() g2.join() stop = time.time() print(start-stop) """ 托儿所 eat 1. 托儿所 play 1 托儿所 eat 2. 托儿所 play 2 -4.036099433898926 如果串行的话,需要7s多, """
我们可以用 threading.current_thread().getName()来查看每个g1和g2,查看的结果为DummyThread-n,即假线程。
# 如果单线程下多个任务是计算密集型,gevent模块就没用了,因为gevent模块就是要监测 I/O, # 遇到 I/O才切,好利用第一个任务 I/O的时间把第二个任务顺便给做了,实现提升单线程的运行效率的效果。 # gevent模块的应用场景是:单线程下多个任务时,I/O密集型。
三、练习
通过gevent实现单线程下的socket并发(from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞)
服务端:
# 基于 gevent模块实现 from gevent import monkey,spawn;monkey.patch_all() # spawn用它来提交线程对象 from socket import * def communicate(conn): while True: try: data = conn.recv(1024) if not data:continue conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip,port)) server.listen(5) while True: conn, addr = server.accept() spawn(communicate,conn) # 这里没必要加 join,死循环,不会结束, server.close() if __name__ == '__main__': g = spawn(server,"192.168.2.209",8900) # 异步提交,必须join g.join()
客户端:
from socket import * from threading import Thread,currentThread def client(): client = socket(AF_INET,SOCK_STREAM) client.connect(("192.168.2.209",8900)) while True: client.send(("%s say hello." % currentThread().getName()).encode("utf-8")) data = client.recv(1024) print("收到的数据:%s" % data.decode("utf-8")) client.close() if __name__ == '__main__': for i in range(10): t = Thread(target=client) t.start()