协程
一、协程的本质:
单线程实现并发,在应用程序里控制多个任务的切换+保存状态
二、协程的目的:
- 想要在单线程下实现并发
- 并发指的是多个任务看起来是同时运行的
- 并发=切换+保存状态
三、补充:
- yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
- send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换
- 如何实现检测IO,yield、greenlet都无法实现,就用到了gevent模块(select机制)
四、优点
- 应用程序级别速度要远远高于操作系统的切换
五、缺点
- 多个任务一旦有一个阻塞没有切,整个线程都阻塞在原地,该线程内的其他的任务都不能执行了
- 一旦引入协程,就需要检测单线程下所有的IO行为,实现遇到IO就切换,少一个都不行,因为如果一个任务阻塞了,整个线程就阻塞了,其他的任务即便是可以计算,但是也无法运行了
注意:单纯地切换反而会降低运行效率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
#并发执行 import time def producer(): g = consumer() next (g) for i in range ( 100 ): g.send(i) def consumer(): while True : res = yield start_time = time.time() producer() stop_time = time.time() print (stop_time - start_time) #串行 import time def producer(): res = [] for i in range ( 10000000 ): res.append(i) return res def consumer(res): pass start_time = time.time() res = producer() consumer(res) stop_time = time.time() print (stop_time - start_time) |
greenlet
greenlet只是提供了一种比generator更加便捷的切换方式,当切到一个任务执行时如果遇到io,那就原地阻塞,仍然是没有解决遇到IO自动切换来提升效率的问题。
注意:单纯的切换(在没有io的情况下或者没有重复开辟内存空间的操作),反而会降低程序的执行速度
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
#pip3 install greenlet from greenlet import greenlet import time def eat(name): print ( '%s eat 1' % name) time.sleep( 2 ) g2.switch( 'tom' ) print ( '%s eat 2' % name) g2.switch() def play(name): print ( '%s play 1' % name ) g1.switch() print ( '%s play 2' % name ) g1 = greenlet(eat) g2 = greenlet(play) g1.switch( 'tom' ) """ tom eat 1 tom play 1 tom eat 2 tom play 2 """ |
gevent
遇到IO阻塞时会自动切换任务
一、用法:
- g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
- g2=gevent.spawn(func2)
- g1.join() #等待g1结束
- g2.join() #等待g2结束
- 或者上述两步合作一步:gevent.joinall([g1,g2])
- g1.value#拿到func1的返回值
二、补充:
- gevent.sleep(2)模拟的是gevent可以识别的io阻塞,
- 而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了
- from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
#pip3 install gevent from gevent import monkey;monkey.patch_all() import gevent import time def eat(name): print ( '%s eat 1' % name) time.sleep( 3 ) print ( '%s eat 2' % name) def play(name): print ( '%s play 1' % name) time.sleep( 2 ) print ( '%s play 2' % name) start_time = time.time() g1 = gevent.spawn(eat, 'tom' ) g2 = gevent.spawn(play, 'rose' ) g1.join() g2.join() stop_time = time.time() print (stop_time - start_time) """ tom eat 1 rose play 1 rose play 2 tom eat 2 3.003171920776367 """ from gevent import monkey;monkey.patch_all() import gevent import time def eat(name): print ( '%s eat 1' % name) time.sleep( 3 ) print ( '%s eat 2' % name) def play(name): print ( '%s play 1' % name) time.sleep( 2 ) print ( '%s play 2' % name) g1 = gevent.spawn(eat, 'tom' ) g2 = gevent.spawn(play, 'rose' ) # g1.join() # g2.join() gevent.joinall([g1,g2]) |
三、通过gevent实现单线程下的socket并发
from gevent import monkey;monkey.patch_all()一定要放到导入socket模块之前,否则gevent无法识别socket的阻塞
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
""" 服务端 #基于gevent实现 """ from gevent import monkey,spawn;monkey.patch_all() from socket import * def communicate(conn): while True : try : data = conn.recv( 1024 ) if not data: break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip,port): server = socket(AF_INET, SOCK_STREAM) server.bind((ip,port)) server.listen( 5 ) while True : conn, addr = server.accept() spawn(communicate,conn) server.close() if __name__ = = '__main__' : g = spawn(server, '127.0.0.1' , 8090 ) g.join() """ 客户端 """ from socket import * from threading import Thread,currentThread def client(): client = socket(AF_INET,SOCK_STREAM) client.connect(( '127.0.0.1' , 8090 )) while True : client.send(( '%s hello' % currentThread().getName()).encode( 'utf-8' )) data = client.recv( 1024 ) print (data.decode( 'utf-8' )) client.close() if __name__ = = '__main__' : for i in range ( 500 ): t = Thread(target = client) t.start() |