1.socket服务端实现并发
现在回来想网络编程服务端需要满足哪几点需求
-
固定的ip和port
-
24小时提供服务
-
能够实现并发
2.进程池线程池介绍
线程不可能无限制的开下去,总要消耗和占用资源
进程池线程池概念:硬件有极限,为了减轻硬件压力,所以有了池的概念
-
concurrent.futures模块导入
-
线程池创建(线程数=cpu核数*5左右)
-
submit提交任务(提交任务的两种方式)
-
异步提交的submit返回值对象
-
shutdown关闭池并等待所有任务运行结束
-
对象获取任务返回值
-
进程池的使用,验证进程池在创建的时候里面固定有指定的进程数
-
异步提交回调函数的使用
1.协程
-
进程:资源单位
-
线程:执行单位
-
协程:单线程下实现并发(能够在多个任务之间切换和保存状态来节省IO),这里注意区分操作系统的切换+保存状态是针对多个线程而言,而我们现在是想在单个线程下自己手动实现操作系统的切换+保存状态的功能
注意协程这个概念完全是程序员自己想出来的东西,它对于操作系统来说根本不存在。操作系统只知道进程和线程。并且需要注意的是并不是单个线程下实现切换+保存状态就能提升效率,因为你可能是没有遇到io也切,那反而会降低效率
再回过头来想上面的socket服务端实现并发的例子,单个线程服务端在建立连接的时候无法去干通信的活,在干通信的时候也无法去干连接的活。这两者肯定都会有IO,如果能够实现通信io了我就去干建连接,建连接io了我就去干通信,那其实我们就可以实现单线程下实现并发
将单个线程的效率提升到最高,多进程下开多线程,多线程下用协程>>> 实现高并发!!!
三者都是实现并发的手段
# yield能够实现保存上次运行状态,但是无法识别遇到io才切换 #串行执行 import time def func1(): for i in range(10000000): i+1 def func2(): for i in range(10000000): i+1 start = time.time() func1() func2() stop = time.time() print(stop - start) #基于yield并发执行 import time def func1(): while True: 10000000+1 yield def func2(): g=func1() for i in range(10000000): # time.sleep(100) # 模拟IO,yield并不会捕捉到并自动切换 i+1 next(g) start=time.time() func2() stop=time.time() print(stop-start)
yield并不能帮我们自动捕获到io行为才切换,那什么模块可以呢?
3.gevent模块
一个spawn就是一个帮你管理任务的对象
gevent模块不能识别它本身以外的所有的IO行为,但是它内部封装了一个模块,能够帮助我们识别所有的IO行为
from gevent import monkey;monkey.patch_all() # 检测所有的IO行为 from gevent import spawn,joinall # joinall列表里面放多个对象,实现join效果 import time def play(name): print('%s play 1' %name) time.sleep(5) print('%s play 2' %name) def eat(name): print('%s eat 1' %name) time.sleep(3) print('%s eat 2' %name) start=time.time() g1=spawn(play,'刘清正') g2=spawn(eat,'刘清正') # g1.join() # g2.join() joinall([g1,g2]) print('主',time.time()-start) # 单线程下实现并发,提升效率
4.协程实现服务端客户端通信
链接和通信都是io密集型操作,我们只需要在这两者之间来回切换其实就能实现并发的效果
服务端监测链接和通信任务,客户端起多线程同时链接服务端
# 服务端 from gevent import monkey;monkey.patch_all() from socket import * from gevent import spawn def communicate(conn): while True: try: data = conn.recv(1024) if len(data) == 0: break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip, port, backlog=5): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(backlog) while True: # 链接循环 conn, client_addr = server.accept() print(client_addr) # 通信 spawn(comunicate,conn) if __name__ == '__main__': g1=spawn(server,'127.0.0.1',8080) g1.join() # 客户端 from threading import Thread, current_thread from socket import * def client(): client = socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1', 8080)) n = 0 while True: msg = '%s say hello %s' % (current_thread().name, n) n += 1 client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) if __name__ == '__main__': for i in range(500): t = Thread(target=client) t.start() # 原本服务端需要开启500个线程才能跟500个客户端通信,现在只需要一个线程就可以扛住500客户端 # 进程下面开多个线程,线程下面再开多个协程,最大化提升软件运行效率
IO模型
-
阻塞IO
-
非阻塞IO(服务端通信针对accept用s.setblocking(False)加异常捕获,cpu占用率过高)
-
IO多路复用
在只检测一个套接字的情况下,他的效率连阻塞IO都比不上。因为select这个中间人增加了环节。
但是在检测多个套接字的情况下,就能省去wait for data过程
-
异步IO