三、协程
3.1协程概念
协程:又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程
协程的好处:
-
无需线程上下文切换的开销
-
无需原子操作锁定及同步的开销方便切换控制流,简化编程模型
"原子操作(atomic operation)是不需要synchronized",所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
-
高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理。
协程的缺点:
-
无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
-
进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
协程定义或标准(满足1,2,3就可称为协程):
-
必须在只有一个单线程里实现并发
-
修改共享数据不需加锁
-
用户程序里自己保存多个控制流的上下文栈
-
一个协程遇到IO操作自动切换到其它协程
“上下文”,指的是程序在执行中的一个状态。通常我们会用调用栈来表示这个状态——栈记载了每个调用层级执行到哪里,还有执行时的环境情况等所有有关的信息。
“上下文切换”,表达的是一种从一个上下文切换到另一个上下文执行的技术。而“调度”指的是决定哪个上下文可以获得接下去的CPU时间的方法。
与线程比较:
1. python的线程属于内核级别的,即由操作系统控制调度(如单线程一旦遇到io就被迫交出cpu执行权限,切换其他线程运行)
2. 单线程内开启协程,一旦遇到io,从应用程序级别(而非操作系统)控制切换
对比操作系统控制线程的切换,用户在单线程内控制协程的切换,优点如下:
1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
2. 单线程内就可以实现并发的效果,最大限度地利用cpu
用yield生成器函数实现单线程下保存程序的运行状态:
import time def consumer(): r = '' while True: n = yield r print('[CONSUMER] ←← Consuming %s...' % n) time.sleep(1) r = '200 OK' def produce(c): next(c) n = 0 while n < 5: n = n + 1 print('[PRODUCER] →→ Producing %s...' % n) cr = c.send(n) # cr="200 ok" print('[PRODUCER] Consumer return: %s' % cr) c.close() if __name__=='__main__': c=consumer() # c:生成器对象 produce(c)
3.2 greenlet类实现协程
greenlet机制的主要思想是:生成器函数或者协程函数中的yield语句挂起函数的执行,直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的"Coroutine(协程)"的一个基础库.
用greenlet类实现协程举例:
from greenlet import greenlet def test1(): print (12) gr2.switch() print (34) gr2.switch() def test2(): print (56) gr1.switch() print (78) gr1 = greenlet(test1) gr2 = greenlet(test2) gr1.switch() >>:12 56 34 78
3.3 基于greenlet类用 gevent模块实现协程
Python通过yield提供了对协程的基本支持,但是不完全。而第三方的gevent为Python提供了比较完善的协程支持。
gevent是第三方库,通过greenlet实现协程,其基本思想是:
当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。
由于切换是在IO操作时自动完成,所以gevent需要修改Python自带的一些标准库,这一过程在启动时通过monkey patch完成:
用gevent模块实现爬虫
from gevent import monkey monkey.patch_all() import requests,gevent,time def foo(url): respnse=requests.get(url) respnse_str=respnse.text print("GET data %s"%len(respnse_str)) s=time.time() gevent.joinall([gevent.spawn(foo,"https://itk.org/"), gevent.spawn(foo, "https://www.github.com/"), gevent.spawn(foo, "https://baidu.com/")]) print(time.time()-s)
上例中还可以用gevent.sleep(2)来模拟gevent可以识别的i/o阻塞
而time.sleep(2)或其他的阻塞 gevent是不能直接识别的,需要添加补丁,添加补丁代码如下:
from gevent import monkey
monkey.patch_all()
补丁代码必须放在导入其他模块之前,及放在文件开头
附:用进程池、多线程、协程爬虫时间比较
from gevent import monkey monkey.patch_all() import requests import re from multiprocessing import Pool import time,threading import gevent def getpage(res): response_str=requests.get(res) print('ecdoing is :',response_str.encoding) return response_str.text def js(ret): li=[] for item in ret: dic={'title':item[2],'date':item[1],'评论数':item[0]} li.append(dic) f=open('acfun.txt','a',encoding='utf-8') for i in li: f.write(str(i)) f.write(' ') f.close() def run(n): url='http://www.acfun.cn/v/list73/index_%s.htm'%n print(url) response=getpage(url) # response=response.encode('ISO-8859-1').decode('utf-8') obj=re.compile('<span class="a">(d+)</span>.*?<a href=.*? target=".*?" title="发布于 (.*?)" class="title">(.*?)</a>',re.S) # obj = re.compile(r'<img.*?src=.(S+.jpg).*?', re.S) ret=obj.findall(response) # print(ret) return js(ret) if __name__ == '__main__': start_time=time.time() #顺序执行 # start_time=time.time() # for j in range(1,100): # run(j) # #顺序执行cost time: 51.30734419822693 #多线程并发执行 # li=[] # for j in range(1,100): # j = threading.Thread(target=run, args=(j,)) # j.start() # li.append(j) # for obj in li: # obj.join() # 并发执行不使用join cost time: 0.20418000221252441 # 并发执行使用join cost time: 4.524945974349976 #使用进程池 # p = Pool(5) # for i in range(1,100): # p.apply_async(func=run,args=(i,)) # p.close() # p.join() #使用进程池cost time: 6.876262426376343 #使用协程 li = [] for i in range(1, 100): li.append(gevent.spawn(run, i)) gevent.joinall(li) #使用协程第一次cost time: 4.432950973510742 #使用协程第二次cost time: 30.864907264709473 #使用协程第三次cost time: 13.472567558288574 end_time=time.time() print('cost time:', end_time-start_time)
四、I/O模型
Linux环境下的network IO Model分为:
- blocking IO
- nonblocking IO
- IO multiplexing
- signal driven IO
- asynchronous IO
由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。
再说一下IO发生时涉及的对象和步骤。
对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,它会经历两个阶段:
- 等待数据准备 (Waiting for the data to be ready)
- 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
记住这两点很重要,因为这些IO Model的区别就是在两个阶段上各有不同的情况。
4.1 blocking IO (阻塞IO)
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
blocking IO的特点:在IO执行的两个阶段都被block了,全程阻塞
4.2 non-blocking IO(非阻塞IO)
linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在同时执行)。
缺点:任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
4.3 IO multiplexing(IO多路复用)
IO multiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。
(所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
结论: select的优势在于可以处理多个连接,不适用于单个连接
4.4 Asynchronous I/O(异步IO)
linux下的asynchronous IO其实用得很少。先看一下它的流程:
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
4.5 IO模型比较分析
各个IO Model的比较如图所示:
4.6 selectors模块
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print('accepted', conn, 'from', addr) conn.setblocking(False) sel.register(conn, selectors.EVENT_READ, read) def read(conn, mask): data = conn.recv(1000) # Should be ready if data: print('echoing', repr(data), 'to', conn) conn.send(data) # Hope it won't block else: print('closing', conn) sel.unregister(conn) conn.close() sock = socket.socket() sock.bind(('localhost', 1234)) sock.listen(100) sock.setblocking(False) sel.register(sock, selectors.EVENT_READ, accept) while True: events = sel.select() for key, mask in events: callback = key.data callback(key.fileobj, mask)