一、锁
当线程安全,多线程操作时,内部会让所有线程排队处理。如:list、dict、Queue
当线程不安全时,那么我们就需要:不安全线程 + 锁 =》 排队处理
例如:
a. 创建100个线程,在列表中追加8 : 线程安全
b. 创建100个线程
v = []
锁
- 把自己的添加到列表中
- 在读取列表中的最后一个
(锁住以后可以保证最后一个是自己放进去的那一个)
解锁
1. 锁:Lock(同步锁)
# 一次放一个 # 锁 Lock import threading import time v = [] lock = threading.Lock() def func(arg): lock.acquire() v.append(arg) time.sleep(0.1) m = v[-1] print(arg, m) lock.release() for i in range(10): t = threading.Thread(target=func, args=(i, )) t.start() time.sleep(0.5) print(v) >>> 0 0 [0] 1 1 [0, 1] 2 2 [0, 1, 2] 3 3 [0, 1, 2, 3] 4 4 [0, 1, 2, 3, 4] 5 5 [0, 1, 2, 3, 4, 5] 6 6 [0, 1, 2, 3, 4, 5, 6] 7 7 [0, 1, 2, 3, 4, 5, 6, 7] 8 8 [0, 1, 2, 3, 4, 5, 6, 7, 8] 9 9 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Lock多次“上锁”会造成死锁状态,不能继续执行(不会报错)
所谓死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。
v = [] lock = threading.Lock() def func(arg): lock.acquire() lock.acquire() # 多次锁 就会死锁 v.append(arg) time.sleep(arg) m = v[-1] print(arg, m) lock.release() lock.release() for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
2. 锁:RLock(递归锁)
RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。
# 锁 RLock v = [] lock = threading.RLock() def func(arg): lock.acquire() lock.acquire() # 可加多个锁 v.append(arg) time.sleep(0.1) m = v[-1] print(m, arg) lock.release() lock.release() # 用多少锁,就要释放几次 for i in range(10): t = threading.Thread(target=func, args=(i, )) t.start()
3. 锁:BoundedSemaphore(信号量)
一个工厂函数,返回一个新的有界信号量对象。有界信号量会确保他的值不会超过初始值;如果超出则会抛出ValueError异常。初始值默认为1。
# 一次放n个 v = [] lock = threading.BoundedSemaphore(3) # 一次放n个 def func(arg): lock.acquire() print(arg) time.sleep(1) lock.release() for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
4.锁:Condition(条件)
使得线程等待,只有满足某条件时,才释放n个线程
# 一次方指定个,并可循环指定 # notify通知其他线程,其他处于wait状态的线程接到通知后会重新判断条件 v = [] lock = threading.Condition() i = 0 def func(arg): ident = threading.current_thread() print(ident) lock.acquire() lock.wait() # 在此处加锁 print(arg) num = threading.get_ident() print(num) time.sleep(0.1) lock.release() for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() # 主线程 while True: inp = int(input("请输入:")) lock.acquire() lock.notify(inp) lock.release() # wait进行条件判断 是否为真 v = [] lock = threading.Condition() def cond(): ct = threading.current_thread() # 获取当前线程 cn = ct.getName() # 获取当前线程名称 print("进入cond函数,线程%s准备好了!" %cn) input("按任意键继续! ") return True def func(arg): print("线程进来了") lock.wait_for(cond) # 在此处加锁 当函数返回为真 才能解锁 print(arg) time.sleep(1) for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
5.锁:Event(事件)
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。
# 一次放所有 lock = threading.Event() def func(arg): ct = threading.current_thread() # 获取当前线程 print(ct.getName()) lock.wait() # 加锁 print(arg) for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() input(">>>") lock.set() # 此处解锁 释放所有 # wait 条件加锁 lock = threading.Event() def func(arg): ct = threading.current_thread() # 获取当前线程 print(ct.getName()) lock.wait() # 加锁 print(arg) # 第一次循环还没有将锁解开,只能显示线程名称 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() input(">>>") lock.set() # 开锁 释放全部 time.sleep(3) # 睡3秒是为了给时间把上一步骤全部释放完再执行下一步 input("<<<") # 第二次执行的时候线程名称和结果同时出现,因为在上一步骤中set已经将锁打开 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() # 再次加锁 lock = threading.Event() def func(arg): print("线程进来了") lock.wait() # 加锁 print(arg) for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() input(">>>") lock.set() time.sleep(1) lock.clear() # 再次加速 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start() input(">>>") lock.set()
总结:
线程安全,列表和字典线程安全
为什么加锁?
- 非线程安全
- 控制一段代码
二、threading.local(本地线程)
- 不同的线程来了,内部会为不同的线程创建不同的空间用于存储
作用:
内部自动为每个线程维护一个空间(字典),用于当前线程存取属于自己的值。保证线程之间的数据隔离。
v = threading.local() def func(arg): # 内部会为当前吸纳从创建一个空间用于存储, phone = 自己的值 v.phone = arg time.sleep(2) print(v.phone, arg) # 去当前线程自己空间取值 for i in range(10): t = threading.Thread(target=func, args=(i,)) t.start()
三. 线程池
# 线程池` from concurrent.futures import ThreadPoolExecutor import time def task(a1,a2): time.sleep(1) print(a1, a2) # 创建一个线程池(最多3个) pool = ThreadPoolExecutor(3) for i in range(40): # 去线程池中申请一个线程,让线程执行task函数 pool.submit(task, i, i+1)
四. 生产者消费者模型
生产者
队列、栈
消费者
# 生产者消费者模型 import queue q = queue.Queue() q.put(3) q.put(2) q.put(1) while 1: v1 = q.get() print(v1) q = queue.Queue() # 线程安全 def producer(id): """ 生产者 :return: """ while True: time.sleep(2) q.put('包子') print("厨师%s生产了一个包子" %id) for i in range(1,4): t = threading.Thread(target=producer, args=(i,)) t.start() def consumer(id): """ 消费者 :return: """ while True: time.sleep(1) q.get() print("顾客%s 吃了了一个包子" %id) for i in range(1, 4): t = threading.Thread(target=consumer, args=(i,)) t.start()