前言
多进程中讲到了锁/信号量/事件/进程池,同样多线程中也一样存在这些东西。
锁:Lock,多个进程中同一时间,只能排序执行程序,这里会多讲一个RLock递归锁。
信号量:多个进程中同一时间,同时被N个进程执行。
事件:Event,就好像红绿灯事件一样,通过一个信号控制多个进程,同时执行或阻塞。
线程池:一个池子里面同一时间只运行N个线程。
一、线程锁
1、Lock给线程加锁
# 1、Lock给线程加锁 from threading import Thread,Lock import time def func(l): l.acquire() # 拿钥匙 global num t=num time.sleep(0.1) num=t-1 l.release() # 还钥匙 num=5 t_lst=[] l=Lock() for i in range(5): t=Thread(target=func,args=(l,)) t.start() t_lst.append(t) for i in t_lst:i.join() print(num)
①未解锁前:按正常逻辑,num=5,每个线程执行都-1,执行了5次,最后打印结果为0才准确。
但是由于线程是对数据不安全的,在num未被修改前已经被另外的线程拿到了原来的值,所以永远num=4。
②加锁后:约束了每个线程必须还钥匙后才能另一线程进入。
2、递归锁,用来解决Lock互斥锁造成死锁问题。
①互斥锁形成的死锁:被堵塞了,原因多次acquire()造成。
# 2、递归锁,用来解决Lock互斥锁造成死锁问题 # ①死锁:被堵塞了 from multiprocessing import Lock rl=Lock() rl.acquire() rl.acquire() rl.acquire() rl.acquire() rl.acquire() print(6666)
②RLock:递归锁,可以在一个线程中acquire()多次。所以能直接打印出结果,而不像互斥锁被堵塞了。
# ②RLock:递归锁,可以在一个线程中acquire()多次。 from multiprocessing import RLock rl=RLock() # 递归锁:RLock rl.acquire() rl.acquire() rl.acquire() rl.acquire() rl.acquire() print(6666)
二、信号量
1、Semaphore的两个方法:acquire()、release()。
from threading import Semaphore,Thread import time def func(s): s.acquire() print(111) time.sleep(2) s.release() s=Semaphore(2) # 2个线程,既每次运行2个线程。 for i in range(10): t=Thread(target=func,args=(s,)) t.start()
三、事件
1、Event:事件,实例连接三次
from threading import Event,Thread import time,random def connect(e): count=0 while count<3: e.wait(0.5) # 如果is_set()为False,等待0.5s就结束。 if e.is_set()==True: print('连接成功!') break else: count+=1 print('第%s次连接失败~') else:print('TimeoutError 连接超时') def check(e): time.sleep(random.randint(0,3)) e.set() e=Event() t=Thread(target=connect,args=(e,)) t1=Thread(target=check,args=(e,)) t.start() t1.start()
2、Condition:条件
wait()等待notify,又数值就执行,但是一次性的。
# 2、Condition:条件 # wait()等待,钥匙是一次性的。 from threading import Thread,Condition def func(con,i): con.acquire() con.wait() # 等钥匙,不会归还,用了就没有了。 print('我是:',i) con.release() if __name__ == '__main__': con=Condition() for i in range(10): t=Thread(target=func,args=(con,i)) t.start() while 1: num=int(input('>>>')) con.acquire() con.notify(num) # 造钥匙数量 con.release()
3、Timer定时器
from threading import Timer def func(): print('等待了3s') Timer(3,func).start() # 异步执行,非阻塞。 print(111) print(222)
成为同步:Timer几秒执行,就等待几秒。
4、队列
# ①普通队列,先进先出 import queue q=queue.Queue() q.put(1) q.put(2) print(q.get()) print(q.get()) # ②栈:先进后出 q=queue.LifoQueue() q.put(1) q.put(2) print(q.get()) # get取到的是2,先进后出。 print(q.get()) # ③优先级队列 q=queue.PriorityQueue() q.put(11) q.put(10) q.put(20) print(q.get())
5、线程池与回调函数
①线程池:ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor import time def func(i): time.sleep(1) return i def func1(i): msg=i.result() # 拿返回值 print(msg+1) tp=ThreadPoolExecutor(max_workers=5) # 默认为计算机本身的cpu个数 t_lst=[] for i in range(10): t=tp.submit(func,i) # 提交异步任务执行 t_lst.append(t) tp.shutdown() # 相当于close+join:关闭接受任务,一次性返回结果 for t in t_lst:print(t.result()) # result 获取返回值
②回调函数:.add_done_callback(func1)
from concurrent.futures import ThreadPoolExecutor import time def func(i): time.sleep(1) return i def func1(i): msg=i.result() # 拿返回值 print(msg+1) tp=ThreadPoolExecutor(max_workers=5) # 默认为计算机本身的cpu个数 # 回调函数 for i in range(10): t=tp.submit(func,i).add_done_callback(func1)
欢迎来大家QQ交流群一起学习:482713805