进程池与线程池 开进程开线程都需要消耗资源,只不过两者比较的情况线程消耗的资源比较少 在计算机能够承受范围之内最大限度的利用计算机 什么是池? 在保证计算机硬件安全的情况下最大限度的利用计算机 池其实是降低了程序的运行效率 但是保证了计算机硬件的安全 (硬件的发展跟不上软件的速度) 用于存储线程/进程的容器 管理了,线程的创建于销毁,以及任务的分配 1.创建池子 2.submit 提交任务 3.shutdown 可以用于等待所有任务完成后销毁池

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time pool = ThreadPoolExecutor(5) # 括号内可以传参数指定线程池内的线程个数 #也可以不传 不传默认是当前所在计算机的cpu个数乘5 def task(n): print(n) time.sleep(2) return n**3 """ 提交任务的方式 同步:提交任务后,还在原地等待任务的返回结果,期间不做任何事 异步:会交任务后,不等待任务的返回结果,直接去执行下一段代码 异步的返回结果怎么拿到??? 通过异步回调机制: 当异步提交的任务有返回结果后,会自动触发回调函数的执性 """ # pool.submit(task,1) # 向线程池中提交任务 提交方式为 异步提交 # 测试提交方式 若为同步,则会先打印1,等待2秒再打印主 # print('主') # 结果显示一块打印,说明提交方式为异步 # for i in range(10): # res = pool.submit(task,i) # print(res.result()) #原地等待任务的返回结果 # 获取对象 join方法 将并发变成了串行 t_list = [] for i in range(10): res = pool.submit(task,i) t_list.append(res) pool.shutdown() # 关闭池子 等待池子中所有的任务执行完毕后,代码才会往后走 for p in t_list: print('>>>>',p.result()) # 保证了任务是并发的提交数据
进程池

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os ,time # pool = ThreadPoolExecutor(5) pool = ProcessPoolExecutor() # 开进程需要放在__main__下 # 括号内可以传参数指定线程池内的线程个数 #也可以不传 不传默认是当前所在计算机的cpu个数乘5 def task(n): print(n) time.sleep(2) return n**3 """ 提交任务的方式 同步:提交任务后,还在原地等待任务的返回结果,期间不做任何事 异步:会交任务后,不等待任务的返回结果,直接去执行下一段代码 异步的返回结果怎么拿到??? 通过异步回调机制: 当异步提交的任务有返回结果后,会自动触发回调函数的执性 """ # pool.submit(task,1) # 向线程池中提交任务 提交方式为 异步提交 # 测试提交方式 若为同步,则会先打印1,等待2秒再打印主 # print('主') # 结果显示一块打印,说明提交方式为异步 # for i in range(10): # res = pool.submit(task,i) # print(res.result()) #原地等待任务的返回结果 # 获取对象 join方法 将并发变成了串行 if __name__ == '__main__': t_list = [] for i in range(10): res = pool.submit(task,i) t_list.append(res) pool.shutdown() # 关闭池子 等待池子中所有的任务执行完毕后,代码才会往后走 for p in t_list: print('>>>>',p.result()) # 保证了任务是并发的提交数据

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time import os # pool = ThreadPoolExecutor(5) # 括号内可以传参数指定线程池内的线程个数 # # 也可以不传 不传默认是当前所在计算机的cpu个数乘5 pool = ProcessPoolExecutor() # 默认是当前计算机cpu的个数 """ 池子中创建的进程/线程创建一次就不会再创建了 至始至终用的都是最初的那几个 这样的话节省了反复开辟进程/线程的资源 """ def task(n): print(n,os.getpid()) # 查看当前进程号 time.sleep(2) return n**2 """ 提交任务的方式 同步:提交任务之后 原地等待任务的返回结果 期间不做任何事 异步:提交任务之后 不等待任务的返回结果(异步的结果怎么拿???) 直接执行下一行代码 """ # pool.submit(task,1) # 朝线程池中提交任务 异步提交 # print('主') if __name__ == '__main__': t_list = [] for i in range(20): res = pool.submit(task,i).add_done_callback(call_back) # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数 # print(res.result()) # 原地等待任务的返回结果 t_list.append(res) # pool.shutdown() # 关闭池子 等待池子中所有的任务执行完毕之后 才会往下运行代码 # for p in t_list: # print('>>>:',p.result())
拿到结果的方法:
2.异步回调机制:add_done_callback(

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os ,time # pool = ThreadPoolExecutor(5) pool = ProcessPoolExecutor(5) #进程池不传,默认CPU的个数 # 开进程需要放在__main__下 # 括号内可以传参数指定线程池内的线程个数 #也可以不传 不传默认是当前所在计算机的cpu个数乘5 """ 池子中创建的进程/线程创建一次就不会再创建了 至始至终用的都是最初的那几个 这样的话节省了反复开辟进程/线程的资源 """ # os.getpid() 查看当前进程号 def task(n): print(n,os.getpid()) time.sleep(2) return n**3 def call_back(n): print('拿到了异步提交任务的返回结果:',n.result()) """ 提交任务的方式 同步:提交任务后,还在原地等待任务的返回结果,期间不做任何事 异步:会交任务后,不等待任务的返回结果,直接去执行下一段代码 异步的返回结果怎么拿到??? 通过异步回调机制: 当异步提交的任务有返回结果后,会自动触发回调函数的执性 """ # pool.submit(task,1) # 向线程池中提交任务 提交方式为 异步提交 # 测试提交方式 若为同步,则会先打印1,等待2秒再打印主 # print('主') # 结果显示一块打印,说明提交方式为异步 # for i in range(10): # res = pool.submit(task,i) # print(res.result()) #原地等待任务的返回结果 # 获取对象 join方法 将并发变成了串行 if __name__ == '__main__': t_list = [] for i in range(10): # 给每个对象添加异步回调机制 res = pool.submit(task,i).add_done_callback(call_back) # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数 t_list.append(res) # pool.shutdown() # # 关闭池子 等待池子中所有的任务执行完毕后,代码才会往后走 # for p in t_list: # print('>>>>',p.result()) # 保证了任务是并发的提交数据
协程
协程 进程:资源单位 线程:执行单位 协程:单线程下实现并发 并发 切换+保存状态 ps:看起来像同时执行的 就可以称之为并发 协程:完全是程序员自己意淫出来的名词 单线程下实现并发 并发的条件? 多道技术 空间上的复用 时间上的复用 切换+保存状态 程序员自己通过代码自己检测程序中的IO 一旦遇到IO自己通过代码切换 给操作系统的感觉是你这个线程没有任何的IO ps:欺骗操作系统 让它误认为你这个程序一直没有IO 从而保证程序在运行态和就绪态来回切换 提升代码的运行效率 yield 保存上一次的结果 切换+保存状态就一定能够提升效率吗??? 当你的任务是iO密集型的情况下 提升效率 如果你的任务是计算密集型的 降低效率 解决高并发: 多进程 + 线程(协程)

import time def func1(): for i in range(1200000): i+1 def fun2(): for i in range(100000): i+1 start = time.time() func1() fun2() stop = time.time() print(stop - start)

import time def func1(): while True: yield def func2(): g = func1() for i in range(10000): i +1 next(g) start = time.time() func2() stop = time.time() print(stop - start)
需要找到一个能够识别IO的一个工具

from gevent import monkey;monkey.patch_all() from gevent import spawn import time """ 注意gevent模块没办法自动识别time.sleep等io情况 需要你手动再配置一个参数 """ def heng(): print('哼') time.sleep(2) print('哼') def ha(): print('哈') time.sleep(2) print('ha') def heihei(): print('嗨嗨') time.sleep(2) print('嘿嘿') start = time.time() # spawn会检测所有的任务 执行任务 a1 = spawn(heng) a2 = spawn(ha) a3 = spawn(heihei) spqaw 是一个列表,括号来一个,就把他放在括号里,再来,再放. 检测:当哼遇到IO时,就执行ha,立即执行无效果,得等待代码执行完毕后,在来执行代码.spawn有返回值,用值接收. a1.join() a2.join() a3.join() 得等待spawn代码运行完成后,在执行下面的代码 stop = time.time() print(stop - start)

import socket from gevent import monkey;monkey.patch_all() from gevent import spawn server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn): while True: try: data = conn.recv(1024) if not len(data): break print(data.decode('utf')) conn.send(data.upper()) except ConnectionResetError as a: print(a) break conn.close() def server11(): while True: conn, addr = server.accept() spawn(talk,conn) if __name__ == '__main__': g1 = spawn(server11) g1.join()
客户端

import socket from threading import Thread,current_thread def client(): client = socket.socket() client.connect(('127.0.0.1',8080)) while True: n = 0 data = '%s %s'%(current_thread().name,n) client.send(data.encode('utf-8')) res = client.recv(1024) print(res.decode('utf-8')) n += 1 for i in range(400): t = Thread(target=client) t.start()

阻塞IO模型
我用一个用户名用来执行登陆操作,问题用户名需要用户输入,输入需要耗时, 如果输入没有完成,后续逻辑无法继续,所以默认的处理方式就是 等
将当前进程阻塞住,切换至其他进程执行,等到按下回车键,拿到了一个用户名,再唤醒刚才的进程,将状态调整为就绪态
以上处理方案 就称之为阻塞IO模型
存在的问题:
当执行到recv时,如果对象并没有发送数据,程序阻塞了,无法执行其他任务
解决方案:
多线程或多进程,
当客户端并发量非常大的时候,服务器可能就无法开启新的线程或进程,如果不对数量加以限制 服务器就崩溃了
线程池或进程池
首先限制了数量 保证服务器正常运行,但是问题是,如果客户端都处于阻塞状态,这些线程也阻塞了
协程:
使用一个线程处理所有客户端,当一个客户端处于阻塞状态时可以切换至其他客户端任务

阻塞IO模型在执行recv 和 accept 时 都需要经历wait_data
非阻塞IO即 在执行recv 和accept时 不会阻塞 可以继续往下执行
如何使用:
将server的blocking设置为False 即设置非阻塞
存在的问题 :
这样一来 你的进程 效率 非常高 没有任何的阻塞
很多情况下 并没有数据需要处理,但是我们的进程也需要不停的询问操作系统 会导致CPU占用过高
而且是无意义的占用
案例:

import socket import time server = socket.socket() server.bind(("192.168.13.103",1688)) server.listen() server.setblocking(False) # 默认为阻塞 设置为False 表示非阻塞 # 用来存储客户端的列表 clients = [] # 连接客户端的循环 while True: try: client,addr = server.accept() # 接受三次握手信息 # print("来了一个客户端了.... %s" % addr[1]) # 有人链接成功了 clients.append(client) except BlockingIOError as e: # print("还没有人连过来.....") # time.sleep(0.5) # 服务你的客人去 for c in clients[:]: try: # 可能这个客户端还没有数据过来 # 开始通讯任务 data = c.recv(2048) c.send(data.upper()) except BlockingIOError as e: print("这个客户端还不需要处理.....",) except ConnectionResetError: # 断开后删除这个客户端 clients.remove(c) print(len(clients))

import socket client = socket.socket() client.connect(('127.0.0.1',1266)) while True: msg = input('msg') if not msg: continue client.send(msg.encode('utf-8')) print(client.recv(2048).decode('utf-8'))
多路指的是:多个socket对象 一个socket就是一个传输通道 复用:意思是指使用同一个线程处理所有socket 原理: 在非阻塞IO模型中我们需要自己不断的询问操作系统是否有数据需要处理 多路复用,使用select来监测是否有socket可以被使用

import socket import select server = socket.socket() server.bind(('127.0.0.1', 1688)) server.listen() # select最多检测1024个socket,超出直接报错,这是socket自身的问题,最终解决方案epoll rlist = [server] # 将需要检测(是否可读recv)的socket对象放到该列表中 # accept也是一个读数据的操作,默认也会阻塞,也需要select检测 wlist = [] # 将需要检测(是否可写send)的socket对象放到该列表中 msgs = [] while True: r_list, w_list, _ = select.select(rlist, wlist, []) # 会阻塞,等到有一个或者多个socket,可以被处理 print(r_list, w_list) for soc in r_list: if soc == server: client, addr = server.accept() rlist.append(client) else: try: data = soc.recv(2048) if not data: soc.close() rlist.remove(soc) continue msgs.append((soc,data)) except ConnectionResetError as e: soc.close() rlist.remove(soc) print('这个客户端下线') # client.recv() for soc in w_list: for i in msgs[:]: if i[0] == soc: soc.send(i[1]) msgs.remove(i) wlist.remove(soc) 使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。 缺点: 因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。