线程池与进程池
池:容器
- 用来避免频繁的创建和销毁(进程/线程)所带来的资源开销
- 可以限制同时存在的线程数量,以保证服务器不会因为资源不足而导致崩溃
- 帮我们管理了线程的生命周期
- 管理了任务的分配
import os
import time
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
from threading import activeCount,enumerate,currentThread
# # 创建一个线程池,默认是(CPU核心数或者1) * 5
# pool = ThreadPoolExecutor(2) # 这里指定最多可以容纳两个线程
#
# def task():
# print(currentThread().name)
#
# # 提交任务到池子中
# pool.submit(task) # 传参直接在后面接逗号然后填参数就可以
# pool.submit(task)
# pool.shutdown() # 关闭这个线程池,会一直阻塞到所有任务执行完毕
def task():
time.sleep(1)
print(os.getpid())
if __name__ == '__main__':
# 创建一个进程池,默认是CPU核心数或者1
pool = ProcessPoolExecutor(2) # 这里指定最多可以容纳两个进程
pool.submit(task)
pool.submit(task)
pool.submit(task)
思考:不用线程池,创造线程池
如果进程不结束,那么进程池/线程池中的进程/线程就会一直存活
同步异步
名称 | 类型 |
---|---|
阻塞 非阻塞 | 程序的状态 |
串行 并发 并行 | 程序处理任务的方式 |
同步 异步 | 程序提交任务的方式 |
同步
- 提交任务后必须在原地等待,直到任务结束
- 程序卡住了,但不是阻塞,因为没有进入IO,而是在进行大量的计算
异步
- 提交任务后不需要在原地等待 可以继续往下执行代码 (进程/线程)
异步效率高于同步 ,异步任务将导致一个问题,就是任务的发起方不知道任务何时处理完毕
解决方法
- 轮询:每隔一段时间就问一次
- 效率低,无法及时获得结果,不推荐
- 让任务的执行方主动通知(异步回调)
- 可以及时拿到任务的结果,推荐
- 异步回调案例
# 异步回调
from threading import Thread
# 具体的任务
def task(callback):
print("run")
for i in range(100000000):
1+1
callback("ok") # 这样写很灵活
#回调函数 参数为任务的结果
def finished(res):
print("任务完成!",res)
print("start")
t = Thread(target=task,args=(finished,))
t.start() #执行task时 没有导致主线程卡主 而是继续运行
print("over")
线程池中回调的使用
from
# 使用案例:
def task(num):
time.sleep(1)
print(num)
return "hello python"
def callback(obj):
print(obj.result())
pool = ThreadPoolExecutor()
res = pool.submit(task, 123)
# print(res.result()) # result是阻塞的,会直到拿到返回值才会释放
res.add_done_callback(callback) # 为这个函数绑定回调函数
print("over")
Event事件
线程间状态同步
把一个任务丢到子线程中,这个任务将异步执行,那如何获取到这个任务的执行状态
(执行状态和执行结果不是一个概念)
如果需要拿到执行结果,可以采用异步回调
假设
一个线程 负责启动服务器 **启动服务器需要花一定的时间 **
另一个线程作为客户端 要连接服务器 **必须保证服务器已经启动 **
要获取状态可以采用轮询的方法 但是浪费了CPU资源 而且可能会造成延迟 不能立即获取状态
就可以使用事件来完成状态同步
事件本质就是 一个标志 可以是False 或是True
特殊之处在于 其包含一个wait函数 可以阻塞当前线程 直到状态从False变为True
- 轮询
from threading import Thread
import time
# 轮询
is_bool = False # 轮询
def start_server():
global is_bool
print("starting server......")
time.sleep(3)
print("server started!")
is_bool = True # 修改事件的值为Tru
def connect_server():
while True:
if is_bool:
print('连接服务器成功')
break
else:
print('失败,服务器未启动')
# 死循环一直循环占用CPU资源,所以sleep一小会。
# 不sleep又占用资源,sleep了又不能立即获取状态
time.sleep(0.5)
t1 = Thread(target=start_server)
t2 = Thread(target=connect_server)
t1.start()
t2.start()
- 异步回调
from threading import Thread,Event
import time
e = Event() # 刚创建时flag为False
def start_server():
print("starting server......")
time.sleep(3)
print("server started!")
e.set() # 将flag变成True,另外clear是把flag变成False
def connect_server():
e.wait() # 阻塞,等待事件从False 变为true
if e.is_set(): # 获取当前flag状态
print("连接服务器成功!")
t1 = Thread(target=start_server)
t2 = Thread(target=connect_server)
t1.start()
t2.start()