一、锁
Lock(1次放1个)
什么时候用到锁:
线程安全,多线程操作时,内部会让所有线程排队处理。如:list、dict、queue
线程不安全,
import threading import time v = [] lock = threading.Lock() #实例化了一个对象****** def func(arg): lock.acquire() #加锁 v.append(arg) time.sleep(0.5) m = v[-1] print(arg,m) lock.release() #加锁就要有解锁对应 for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start()
RLock(1次放1个)
与Lock用法一致,但是RLock可以锁多次(必须有响应的解锁次数),Lock只能锁一次
import threading import time v = [] lock = threading.RLock() def func(arg): lock.acquire() #锁了两次 lock.acquire() v.append(arg) time.sleep(0.1) m = v[-1] print(arg,m) lock.release() #解锁两次 lock.release() for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start()
BoundedSemaphore(1次方固定个数个)
import time import threading lock = threading.BoundedSemaphore(3) #参数是多少就一次放过去多少个线程 def func(arg): lock.acquire() print(arg) time.sleep(1) lock.release() for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start()
Condition(1次放N个)
import time import threading lock = threading.Condition() def func(arg): print("start") lock.acquire() lock.wait() #**** print(arg) time.sleep(1) lock.release() for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start() while 1: num = int(input(">>>>>")) #输入多少本次就会放多少个线程 lock.acquire() #**** lock.notify(num) lock.release() #****
#也可以通过函数逻辑判断的返回值 def xx(): print("来执行函数了") input(">>>>") return True def func(arg): print("线程来了") lock.wait_for(xx) print(arg) time.sleep(1) for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start()
Event(1次放所有)
import threading lock = threading.Event() def func(arg): print("线程来了") lock.wait()#加锁 print(arg) for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start() input(">>>>") lock.set() #解锁,如果后面不加锁上面的wait就失效了 input(">>>>") lock.clear() #再次上锁 for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start() input(">>>>") lock.set()
总结:
线程安全,列表和字典线程安全
为什么要加锁:
非线程安全
控制一段代码
二、threading.local
作用:
内部自动为每个线程维护一个空间(字典),用于当前存取属于自己的值。保证线程之间的数据隔离
{
线程id:{......}
线程id:{......}
线程id:{......}
}
import time import threading v = threading.local() def func(arg): # 内部会为当前线程创建一个空间用于存储:phone=自己的值 v.phone = arg time.sleep(1) print(v.phone,arg) # 去当前线程自己空间取值 for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start()
import time import threading DATA_DICT = {} def func(arg): ident = threading.get_ident() DATA_DICT[ident] = arg time.sleep(1) print(DATA_DICT[ident],arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg #对象.xx="xxx" 调用了__setattr__方法 time.sleep(1) print(obj.phone,arg) #对象.xx 调用了__getattr__方法 for i in range(1,11): t = threading.Thread(target=func,args=(i,)) t.start()
三、线程池
from concurrent.futures import ThreadPoolExecutor import time def func(a1,a2): time.sleep(1) print(a1,a2) #创建了一个线程池(最多5个线程) pool = ThreadPoolExecutor(5) for i in range(1,21): #去线程池中申请一个线程 pool.submit(func,i,"a")
四、生产者消费者模型
三部件:
生产者
队列,先进先出
栈,后进先出
消费者
生产者消费者模型解决了什么问题:不用一直等待的问题
import time import threading import queue q = queue.Queue()#线程安全 def producer(id): while 1: time.sleep(2) q.put("包子") print("厨师%s生产了一个包子"%id) for i in range(1,3): t = threading.Thread(target=producer,args=(i,)) t.start() def consumer(id): while 1: time.sleep(1) q.get("包子") print("顾客%s吃了一个包子"%id) for i in range(1,4): t = threading.Thread(target=consumer,args=(i,)) t.start()