一、回顾
1、线程池
队列:同一进程内的队列。(先进先出,后进后出,优先级队列)
2、线程池中的回调函数是谁在调用?
线程池中的回调函数是子线程调用的,和父线程没有关系。
进程池中的回调函数是父进程调用的,和子进程没有关系。
二、协程
1、yield实现状态保存
1 import time 2 3 def func(): 4 print(123) 5 sum = 0 6 print(6666) 7 yield sum 8 print(7777) 9 yield sum 10 print(8888) 11 yield sum 12 13 def fff(): 14 g = func() 15 print('这是在ffff函数中') 16 print(next(g)) 17 time.sleep(1) 18 print('这是在ffff函数中') 19 print(next(g)) 20 time.sleep(1) 21 print('这是在ffff函数中') 22 print(next(g)) 23 24 fff()
2、yield实现并发的假象
yield只能实现单纯的切换函数和保存函数状态的功能;不能实现当某一个函数遇到IO阻塞时,自动切换到另一个函数去执行。
但是,协程的本质还是主要依靠yield去实现的。
如果只拿yield去单纯的实现一个切换假象,你会发现根本没有程序串行执行效率高。
1 import time 2 3 # 在单线程中,如果存在多个函数,如果有某个函数发生IO操作,我想让程序马上切换到另一个函数去执行 4 # 以此来实现一个假的并发现象。 5 # 总结: 6 # yield 只能实现单纯的切换函数和保存函数状态的功能 7 # 不能实现:当某一个函数遇到io阻塞时,自动的切换到另一个函数去执行 8 # 目标是:当某一个函数中遇到IO阻塞时,程序能自动的切换到另一个函数去执行 9 # 如果能实现这个功能,那么每个函数都是一个协程 10 # 11 # 但是 协程的本质还是主要依靠于yield去实现的。 12 # 13 # 如果只是拿yield去单纯的实现一个切换的现象,你会发现,跟本没有程序串行执行效率高 14 15 def consumer(): 16 while 1: 17 x = yield 18 # print(x) 19 20 def producer(): 21 g = consumer() 22 next(g) 23 for i in range(100000000): 24 g.send(i) 25 26 start = time.time() 27 producer() 28 print('yield:',time.time() - start) 29 30 31 32 def consumer(l): 33 # for i in l: 34 # print(i) 35 pass 36 37 def producer(): 38 l = [] 39 for i in range(100000000): 40 l.append(i) 41 return l 42 43 start = time.time() 44 l = producer() 45 consumer(l) 46 print(time.time() - start)
3、协程
协程是一个比线程更加轻量级的单位,是组成线程的各个函数。
协程本身没有实体。
并发的本质:切换+保存状态
4、greenlet模块
能简单的实现函数与函数之间的切换,但是遇到IO操作,不能自动切换到其他函数中。
(1)注册一下函数func,将函数注册成一个对象f1
f1 = greenlet(func)
(2)调用func,使用f1.switch( ),如果func需要转参,就在switch这里传即可。
grenlet模块可以实现在某函数内部遇到IO操作,就自动的切换到其他函数内部去执行。
1 from greenlet import greenlet 2 import time 3 # greenlet 只是可以实现一个简单的切换功能,还是不能做到遇到IO就切换 4 # g1 = greenlet(func) 实例化一个对象 5 # g1.switch() 用这种方式去调用func函数 6 # 当使用switch调用func的时候,什么时候func会停止运行? 7 # 1 要么return 2 要么在func内部又遇到 switch 8 9 def eat(name): 10 print('%s吃炸鸡'%name) 11 time.sleep(2) 12 f2.switch('小雪2') 13 print('%s吃雪糕'%name) 14 f2.switch() 15 16 def drink(name): 17 print('%s喝啤酒'%name) 18 f1.switch() 19 print('%s喝可乐'%name) 20 21 22 f1 = greenlet(eat) 23 f2 = greenlet(drink) 24 f1.switch('小雪')
5、gevent模块
可以实现在某函数内部遇到IO操作,就自动的切换到其他函数内部去执行。
g = gevent(func)
gevent.join(g) 等待g指向的函数func执行完毕,如果在执行过程中,遇到IO,就切换
gevent.joinall([g1,g2,g3]) 等待g1 g2 g3指向的函数func执行完成。
(1)简单应用
1 import gevent 2 import time 3 # gevent 可以实现 当函数中遇到io操作时,就自动的切换到另一个函数 4 # g1 = gevent.spawn(func,参数) 5 # gevent.join(g1) 让func执行完毕 6 # gevent.joinall([g1,g2,g3,g4]) 7 # func停止的原因: 1 func执行完了 2 遇到IO操作了 8 9 # def func1(): 10 # print('1 2 3 4') 11 # # gevent.sleep(1) 12 # time.sleep(1) 13 # print('3 2 3 4') 14 # # gevent.sleep(1) 15 # 16 # def func2(): 17 # print('2 2 3 4') 18 # # gevent.sleep(1) 19 # time.sleep(1)# gevent不能识别其他的IO操作,只能识别自己认识的IO 20 # print('再来一次') 21 # 22 # g1 = gevent.spawn(func1) 23 # g2 = gevent.spawn(func2) 24 # g1.join()# 等待g1指向的任务执行结束 25 26 27 ######################################### 以下解决gevent不能识别其他IO操作的事情 28 from gevent import monkey 29 monkey.patch_all()# 可以让gevent识别大部分常用的IO操作 30 import time 31 32 def func1(): 33 print('1 2 3 4') 34 time.sleep(1) 35 print('3 2 3 4') 36 # gevent.sleep(1) 37 38 def func2(): 39 print('2 2 3 4') 40 time.sleep(1) 41 print('再来一次') 42 43 g1 = gevent.spawn(func1) 44 g2 = gevent.spawn(func2) 45 g1.join()# 等待g1指向的任务执行结束 46 g2.join()
(2)串行和并发的效率对比
1 from gevent import monkey 2 monkey.patch_all() 3 import gevent 4 import time 5 6 def func1(num): 7 time.sleep(1) 8 print(num) 9 # 串行 10 start = time.time() 11 for i in range(10): 12 func1(i) 13 print(time.time() - start) 14 15 # 并发 16 start = time.time() 17 l = [] 18 for i in range(10): 19 g = gevent.spawn(func1,i) 20 l.append(g) 21 gevent.joinall(l) 22 print(time.time() - start)
(3)爬虫示例
1 from gevent import monkey 2 monkey.patch_all() 3 import gevent 4 import time 5 import requests 6 7 def get_result(url):# 任务函数 8 res = requests.get(url) 9 print(url,res.status_code,len(res.text)) 10 11 url_l = ['http://www.baidu.com', 12 'https://www.jd.com', 13 'http://www.taobao.com', 14 'http://www.qq.com', 15 'http://www.mi.com', 16 'http://www.cnblogs.com'] 17 18 def sync_func(url_l): 19 '''同步调用''' 20 for url in url_l:# 串行执行任务函数 21 get_result(url) 22 23 def async_func(url_l): 24 '''异步''' 25 l = [] 26 for url in url_l: 27 g = gevent.spawn(get_result,url)# 使用gevent,协程去并发实现执行任务函数 28 # 当遇见请求某个网页发生比较大的网络延迟(IO),马上会切换到其他的任务函数 29 l.append(g) 30 gevent.joinall(l)# 等待所有任务函数执行结束 31 32 start = time.time() 33 sync_func(url_l) 34 print('sync:',time.time() - start) 35 36 start = time.time() 37 async_func(url_l) 38 print('async:',time.time() - start)
【问题】为什么要有协程?
因为想要在单线程内实现并发的效果。
因为Cpython有GIL锁,限制了在同一个时间点只能执行一个线程。
所以想要在执行一个线程的期间,充分的利用CPU的性能。
所以才有了想在线程内实现并发的效果。
6、IO多路复用(解决问题的思想)
(1)用非阻塞IO解决阻塞IO问题
1 import socket 2 3 sk = socket.socket() 4 sk.setblocking(False) 5 sk.bind(('127.0.0.1',8080)) 6 sk.listen() 7 8 l = [] 9 del_l = [] 10 while 1: 11 try: 12 conn,addr = sk.accept()# 如果是阻塞IO模型,在这里程序会一直等待。 13 l.append(conn)# 将每个请求连接的客户端的conn添加到列表中 14 except BlockingIOError: 15 for conn in l:# 去遍历所有客户端的conn,看看有没有客户端给我发送数据了 16 17 try: 18 info = conn.recv(1024).decode('utf-8')# 尝试接收,看看有没有客户端给我发数据 19 if not info:# 如果客户端正常执行了close,服务器会接收到一个空 20 del_l.append(conn)# 将已经结束的客户端的conn,添加到要删除的列表中 21 print('客户端正常退出了!') 22 conn.close()# 因为客户端已经主动close,所以服务器端的conn也要close 23 else: 24 print(info) 25 conn.send(info.upper().encode('utf-8')) 26 except BlockingIOError: 27 continue# 是没有接受到客户端发来的数据而报错 28 except ConnectionResetError: 29 pass# 是因为客户端强制退出而报错 30 if del_l: 31 for conn in del_l: 32 l.remove(conn) 33 del_l = []# 在删除完主动关闭的客户端的连接之后,应该把此列表清空,否则报错
1 import socket 2 sk = socket.socket() 3 sk.connect(('127.0.0.1',8080)) 4 5 while 1: 6 msg_s = input('>>>') 7 if not msg_s:continue 8 if msg_s == 'q':break 9 sk.send(msg_s.encode('utf-8')) 10 print(sk.recv(1024).decode('utf-8')) 11 sk.close()
【不推荐使用】
循环使用recv( )将大幅占用CPU占用率;任务完成的响应延迟增大了。
(2)多路IO复用解决阻塞IO问题
1 import select 2 import socket 3 4 sk = socket.socket() 5 sk.bind(('127.0.0.1',8888)) 6 sk.listen() 7 del_l = [] 8 rlist = [sk]# 是用来让select帮忙监听的 所有 接口 9 # select:windows/linux是监听事件有没有数据到来 10 # poll: linux 也可以做select的工作 11 # epoll: linux 也可以做类似的工作 12 while 1: 13 r,w,x = select.select(rlist,[],[])# 传参给select,当rlist列表中哪个接口有反应,就返回给r这个列表 14 if r: 15 for i in r:# 循环遍历r,看看有反应的接口到底是sk 还是conn 16 if i == sk: 17 # 如果是sk,那就表示有客户端的连接请求 18 '''sk有数据要接收,代表着有客户端要来连接''' 19 conn,addr = i.accept() 20 rlist.append(conn)# 把新的客户端的连接,添加到rlist,继续让select帮忙监听 21 else: 22 # 如果是conn,就表示有客户端给我发数据了 23 '''conn有数据要接收,代表要使用recv''' 24 try: 25 msg_r = i.recv(1024).decode('utf-8') 26 if not msg_r: 27 '''客户端执行了close,客户端主动正常关闭连接''' 28 del_l.append(i) 29 i.close() 30 else: 31 print(msg_r) 32 i.send(msg_r.upper().encode('utf-8')) 33 except ConnectionResetError: 34 pass 35 if del_l:# 删除那些主动断开连接的客户端的conn 36 for conn in del_l: 37 rlist.remove(conn) 38 del_l.clear()
1 import socket 2 sk = socket.socket() 3 sk.connect(('127.0.0.1',8888)) 4 5 while 1: 6 msg_s = input('>>>') 7 if not msg_s:continue 8 if msg_s == 'q':break 9 sk.send(msg_s.encode('utf-8')) 10 print(sk.recv(1024).decode('utf-8')) 11 sk.close()
(3)异步IOpython实现不了
(4)select与poll和epoll的区别
select和poll有一个共同的机制,都是采用轮询的方式询问内核,有没有数据准备好了。
select有一个最大监听事件的限制,32位机限制1024,64位机限制2048;poll没有,理论上poll可以开启无限大,1G内存大概够开启10W个事件去监听。
epoll是最好的,采用的是回调机制,解决了select和poll共同存在的问题,而且epoll理论上也可以开启无限多个监听事件。