1.线程为什么要有锁:
全局解释器锁GIL 不能完全确保数据的安全(时间片轮转法)
线程之间等的数据安全问题:
+=,-=赋值操作不安全
不涉及赋值操作的数据是安全的
不安全:
from threading import Lock,Thread n = 1500000 def func(): global n for i in range(1500000): n -= 1 def func2(): global n for i in range(1500000): n += 1 if __name__ == '__main__': t_lst = [] for i in range(10): t = Thread(target=func) t2 = Thread(target=func2) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('>>>>',n)
加锁:
from threading import Lock,Thread n = 150000 def func(lock): global n for i in range(150000): lock.acquire() n -= 1 lock.release() def func2(lock): global n for i in range(150000): lock.acquire() n += 1 lock.release() if __name__ == '__main__': lock = Lock() t_lst = [] for i in range(10): t = Thread(target=func,args=(lock,)) t2 = Thread(target=func2,args=(lock,)) t.start() t2.start() t_lst.append(t) t_lst.append(t2) for t in t_lst: t.join() print('>>>>',n)
2:互斥锁与递归锁
死锁现象:
两把锁
异步的
操作的时候,一个线程抢到一把锁之后还要再去抢第二把锁
一个线程抢到一把锁
另一个线程抢到了另一把锁
3:递归锁可以解决互斥锁的问题
互斥锁:
两把锁
多个线程抢
递归锁:
一把钥匙
多个线程抢
递归锁好不好?
递归锁并不是一个好的解决方案
死锁现象的发生不是互斥锁的问题
而是程序员的逻辑有问题导致的
递归锁能够快速的解决死锁问题
递归锁:
迅速恢复服务 ,递归锁替换互斥锁
在接下来的时间中慢慢把递归锁替换成互斥锁
完善代码的逻辑
提高代码的效率
多个线程之间,用完一个资源再用另一个资源
先释放一个资源,再去获取一个资源的锁
死锁现象:
import time from threading import Thread,Lock noodle_lock = Lock() fork_lock = Lock() def eat1(name): noodle_lock.acquire() print('%s拿到面条了' % name) fork_lock.acquire() print('%s拿到叉子了' % name) print('%s吃面' % name) time.sleep(0.3) fork_lock.release() print('%s放下叉子' % name) noodle_lock.release() print('%s放下面' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s吃面' % name) time.sleep(0.3) noodle_lock.release() print('%s放下面' % name) fork_lock.release() print('%s放下叉子' % name) if __name__ == '__main__': name_list = ['liming','lining'] name_list2 = ['lijing', 'liling'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start()
递归锁
from threading import Thread,RLock rlock = RLock() def func(name): rlock.acquire() print(name,1) rlock.acquire() print(name,2) rlock.acquire() print(name,3) rlock.release() rlock.release() rlock.release() for i in range(10): Thread(target=func,args=('liming%s'%i,)).start()
解决死锁问题,互斥锁修改位递归锁
import time from threading import Thread,RLock#Lock # noodle_lock = Lock() # fork_lock = Lock() fork_lock = noodle_lock = RLock() #互斥锁改为递归锁 def eat1(name): noodle_lock.acquire() print('%s拿到面条了' % name) fork_lock.acquire() print('%s拿到叉子了' % name) print('%s吃面' % name) time.sleep(0.3) fork_lock.release() print('%s放下叉子' % name) noodle_lock.release() print('%s放下面' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s吃面' % name) time.sleep(0.3) noodle_lock.release() print('%s放下面' % name) fork_lock.release() print('%s放下叉子' % name) if __name__ == '__main__': name_list = ['liming','lining'] name_list2 = ['lijing', 'liling'] for name in name_list: Thread(target=eat1,args=(name,)).start() for name in name_list2: Thread(target=eat2,args=(name,)).start()
4:信号量
import time
from threading import Semaphore,Thread
def func(index,sem):
sem.acquire()
print(index)
time.sleep(1)
sem.release()
if __name__ == '__main__':
sem = Semaphore(3)
for i in range(30):
Thread(target=func,args=(i,sem)).start()
5.事件:检测数据库连接
wait() 等待事件内的信号变为True
set() 把信号变为True
clear() 把信号变为False
is_set 查看信号是否Ture
import time import random from threading import Event,Thread def check(): print('开始检测数据库连接') time.sleep(random.randint(1,5)) #模拟,真实测试时替换 e.set() # def connect(e): for i in range(3): e.wait(0.5) if e.is_set(): print('数据库连接成功') else: print('尝试连接数据库%s次失败' % (i+1)) else: raise TimeoutError e = Event() Thread(target=connect,args=(e,)).start() Thread(target=check,args=(e,)).start()
6.条件
notify 控制流量, 通知有多少人可以通过了
wait 在门口等待的所有人
wait 使用前后都需要加锁
notify 使用前后都要加锁
from threading import Condition,Thread def func(con,index): print('%s在等待' % index) con.acquire() con.wait() print('%sdo something' % index) con.release() con = Condition() for i in range(100): t = Thread(target=func,args=(con,i)) t.start() count = 100 while count > 0: num = int(input('>>>:')) con.acquire() con.notify(num) count -= num con.release() # con.acquire() # con.notify_all() #一次放完 # con.release()
7.定时器:
from threading import Timer def func(): print('执行了') t = Timer(5,func) #5妙后执行func这个函数 t.start() print('主线程')
8.队列:
#线程是安全 队列 :排队相关的逻辑(先进先出) import queue q = queue.Queue() q.put() q.get() q.put_nowait() q.get_nowait()
qps 每秒接受的请求数:
帮助维持程序相应的顺序
后进先出
from queue import LifoQueue #后进先出 #栈 完成算法 lq = LifoQueue() lq.put(1) lq.put(2) lq.put(3) lq.put('a') lq.put('b') print(lq.get()) print(lq.get()) print(lq.get())
优先级队列:应用VIP.
#优先级队列, from queue import PriorityQueue pq = PriorityQueue() pq.put((15,'abc')) pq.put((5,'ghj')) pq.put((25,'ftr')) pq.put((25,'atr')) print(pq.get()) print(pq.get()) print(pq.get()) print(pq.get())
from queue import PriorityQueue pq = PriorityQueue() pq.put(35) pq.put(15) pq.put(25) print(pq.get()) print(pq.get()) print(pq.get())
from queue import PriorityQueue pq = PriorityQueue() pq.put('c') pq.put('a') pq.put('b') print(pq.get()) print(pq.get()) print(pq.get())