为了保证线程的安全,和保证程序的有序执行,通过给线程加锁的方式来进行限制.
锁:控制线程
1 lock 锁(会有死锁现象):一次放一个
import threading import time v=[] lock = threading.Lock()#创建锁 def func(arg): lock.acquire()#添加锁 v.append(arg)#给列表添加元素 time.sleep(0.01)#等待0.01秒 m=v[-1]#取列表V 最后一位 print(arg,m)#打印列表中刚添加进去的值和最后一位的值 lock.release()#释放锁 for i in range(10):#循环10次 t=threading.Thread(target=func,args=(i,))#每循环一次创建一个线程 t.start()#告诉CPU准备就绪,可以运行 执行效果: 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
2 Rlock 锁(递归锁):一次放一个,与lock的使用方法一致,但是支持多次加锁
import threading import time v=[] lock=threading.Rlock() def func(arg): lock.acquire() lock.acquire() #二次加锁 v.apped(arg) time.sleep(0.01) m=v[-1] print(arg,m) lock.release() lock.release() #二次解锁 for i in range(10): t=threading.Thread(target=func,args=(i,)) t.start() 执行结果为: 0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9
3 semaphore 锁:一次放n个 (信号量)
import time import threading lock=threading.BoundedSemaphore(3) #引入锁的时候 已经进行了数量的限制,一次只允许三个线程运行,运行完这三个线程以后才可以继续向下走 def func(arg): lock.acquire() print(arg) time.sleep(1) lock.release() for i in range(10): t=threading.Thread(target=func,args=(i,) t.start()
4 Condition 锁:一次方法放n个(每循环一次可以修改放出的数量)
import time import threading lock= threading.Condition()# 创建Condition锁 =======方式一==== def func(arg): print("县城进来了") lock.acquire() lock.wait() print(arg) time.sleep(1) lock.release() for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start() while True: inp=int(input("请设定一次输出线程数量:")) lock.acquire() lock.notify(inp) lock.release() ++++++++方式二++++++ def xxxx(): print("来执行函数了") input(">>>") ct=threading.current_thread() ct.getName() return True def func(arg): print("线程进来了") lock.wair_for(xxxx)#调用xxxx函数 print(arg) time.sleep(1) for i in range(10): t=treading.Thread(target=func,args=(i,)) t.start
5 Event 锁:一次放所有
import threading lock=threading.Event() def func(arg): print("线程来了") lock.wait() #红灯 print(arg) for i in range(10): t = threading.Thread(target=func,args=(i,)) t.start() input(">>>") lock.set() #绿灯 lock.clear() #再次变红灯 for i in range(10): t=threading.Thread(target=func,args=(i)) t.start() input(">>>") lock.set() #绿灯
threading.local:内部为当前线程创建一个空间用于存储相应的值
import time import threading v=threading.local() def func(arg): #内部会为当前线程创建一个空间用于存储:phone=自己的值 v.phone=arg time.sleep(2) print(v.phone,arg) #去当前空间取自己的值 for i in range(10): t=threading.Thread(target=func,args=(i,)) t.start()
import time import threading INFO = {} class Local(object): def __getattr__(self, item): ident = threading.get_ident() return INFO[ident][item] def __setattr__(self, key, value): ident = threading.get_ident() if ident in INFO: INFO[ident][key] = value else: INFO[ident] = {key:value} obj = Local() def func(arg): obj.phone = arg # 调用对象的 __setattr__方法(“phone”,1) time.sleep(2) print(obj.phone,arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
# by luffycity.com import time import threading DATA_DICT = {} def func(arg): ident = threading.get_ident() #内存中存储地址编号 DATA_DICT[ident] = arg time.sleep(1) print(DATA_DICT[ident],arg) for i in range(10): t =threading.Thread(target=func,args=(i,)) t.start()
线程池:
from concurrent.futures import ThreadPOOLExecutor import time def task(a1,a2): time.sleep(2) print(a1,a2) pool=ThreadPoolExecutor(5) #创建一个线程池 最多5个线程 for i in range(40): pool.sunmit(task,i,8)
生产者消费者模型:
import time import queue import threading q = queue.Queue() # 线程安全 def producer(id): """ 生产者 :return: """ while True: time.sleep(2) q.put('包子') print('厨师%s 生产了一个包子' %id ) for i in range(1,4): t = threading.Thread(target=producer,args=(i,)) t.start() def consumer(id): """ 消费者 :return: """ while True: time.sleep(1) v1 = q.get() print('顾客 %s 吃了一个包子' % id) for i in range(1,3): t = threading.Thread(target=consumer,args=(i,)) t.start()