什么是池?
- 保证计算机硬件安全的情况下最大限度利用计算机
- 降低了程序的运行效率,但保证了硬件的安全
- 受限于硬件的物理极限,硬件的发展跟不上软件的速度,迫不得已提出了池的概念
进程池、线程池
提交任务的方式:
同步:提交任务之后,原地等待任务的返回结果,期间不做任何事
异步:提交任务之后,不等待任务的返回结果(异步的结果怎么拿???),直接执行其他代码 (线程池、进程池)
异步回调机制(重点):当异步提交的任务有返回结果之后,会自动触发回调函数的执行。
pool.submit(task, 1).add_done_callback(callback回调函数)
线程池、进程池
使用方式大同小异,
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
#创建池
pool = ThreadPoolExecutor(5) # 5 线程个数,也可以不传,默认是CPU个数乘以5
#pool = ProcessPoolExecutor()
#进程池不传参数,默认是当前cpu的个数
def task(n):
print(n)
time.sleep()
#提交任务:异步提交
# pool.submit(task, 1) 朝线程池提交任务
#回调机制:pool.submit(task, 1).add_done_callback(callback回调函数)
#定义一个列表,实现线程的异步提交的并发
t_list = []
# 线程池只有五个,所以五个五个的接,类似缓冲区域
for i in range(20):
res = pool.submit(task, i) # res 是任务 task 的返回值
print(res.result()) # 原地等待任务返回结果,将并发变成了串行。
t_list.append(res)
#需求,让20个线程运行结束,再拿结果
pool.shutdown() # 关闭池子,等待所有任务完成之后,才会往下运行。
for t in t_list:
print('>>>', t.result())
池子中创建的进程创建一次就不会再创建了,至始至终用的都是最初的那几个,节省了反复开辟进程的资源。
进程线程都是一样的。
协程
进程:资源单位
线程:执行单位
协程:单线程下实现并发(程序员意淫出来的名词)
并发:切换+保存状态
看起来像同时执行,就可以称之为并发
遇到IO,程序员自己检测并通过代码切换,欺骗操作系统,让他误以为你这个程序没有IO
并发的条件?
多道技术:
空间上的复用
时间上的复用:切换+保存状态
什么时候需要用到切换+保存状态:
1、遇到IO 2、占用cpu时间过长
切换+保存状态一定能提升效率吗?
当你的任务是IO密集型的,提升效率
当你的任务是计算密集型的,降低效率
使用spawn实现单线程并发
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import time
def fun1():
print('fun1 start')
time.sleep(2)
print('fun1 end')
def fun2():
print('fun2 start')
time.sleep(1)
print('fun2 end')
def fun3():
print('fun3 start')
time.sleep(3)
print('fun3 end')
start = time.time()
g1 = spawn(fun1)
g2 = spawn(fun2)
g3 = spawn(fun3)
g1.join()
g2.join()
g3.join()
print(time.time() - start) # 3.0008487701416016
TCP单线程实现并发:
服务端:
# gevent没法自动识别time.sleep等io情况,所以需要配monkey
from gevent import monkey;monkey.patch_all() # 由于该模块经常被使用,所以建议写成一行
from gevent import spawn
from socket import SOL_SOCKET, SO_REUSEADDR
import socket
server = socket.socket()
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8081))
server.listen(5)
def my_server():
while True:
conn, addr = server.accept()
spawn(talk, conn)
def talk(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0: break
print(data.decode('utf-8'))
conn.send(b'收到')
except ConnectionRefuseError as e:
print(e)
break
conn.close()
if __name__ == '__main__':
g = spawn(my_server)
g.join()
客户端:
from threading import Thread, current_thread
import socket
import time
def connect():
client = socket.socket()
client.connect(('127.0.0.1', 8081))
n = 0
while True:
data = '{} {}'.format(current_thread().name, n)
client.send(b'hello')
res = client.recv(1024)
print(res.decode('utf-8'))
n += 1
for i in range(400):
#创建线程
t = Thread(target=connect)
t.start()
如何并发?多进程下开多线程,多线程下再开多协程