一 : 死锁现象与递归锁
死锁现象: 根本就走不了,因为堵住啦
from threading import Thread
from threading import Lock
import time
lock_A = Lock()
lock_B = Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f"{self.name}拿到了 A 锁")
lock_B.acquire()
print(f"{self.name}拿到了 B锁")
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f"{self.name}拿到了 B 锁")
lock_A.acquire()
print(f"{self.name}拿到了A锁")
time.sleep(0.1)
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
过程分析 : t1 先抢到了 A锁,此时t2 t3 也想抢 A锁,但是没有办法,只能等着.等待 t1 释放 A锁,t1 又抢了 B 锁,此时 t1 有 A B 两把锁,没有释放,t2 t3 继续等待
当 t1 依次释放 BA 锁时,线程 t2 t3 争抢 A锁,t1 争抢 B锁,按照结果分析,t2 抢到了 A 锁,t1 拿到了 B 锁,但是接下来,t1 睡了0.1秒,t1 拥有 B 锁,想要 A 锁,但是 t2 拥有 A 锁,想要 B 锁就矛盾了
这样就造成了死锁的现象
于是引出了下面的递归锁,解决这一个问题
递归锁
应用 : 可以解决死锁问题,业务需要锁时,先要考虑递归锁
from threading import Thread
from threading import RLock
import time
lock_A = lock_B = RLock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f"{self.name}拿到了 A 锁")
lock_B.acquire()
print(f"{self.name}拿到了 B锁")
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f"{self.name}拿到了 B 锁")
lock_A.acquire()
print(f"{self.name}拿到了A锁")
time.sleep(0.1)
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
**原因分析 : 递归锁有一个计数功能,原数字为 0,上一次锁,计数加一,释放一次锁,计数-1,只要递归锁上面的数字不为 0,其他线程就不能抢锁 **
本质的原因还是在于,只能有一个执行一个线程,只要存在,其他的进不来,坤坤说的
二 : 信号量(不重要)
也是一种锁,控制并发数量
就像是上厕所,有一个坑,就会被下一个占用
from threading import Thread ,Semaphore,current_thread
import time
import random
sem = Semaphore(5)
def task():
sem.acquire()
print(f"{current_thread().name}厕所 ing")
time.sleep(random.randint(1,3))
sem.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target = task)
t.start()
三 : GIL 全局解释器锁(重要,好好的看一下)
好多自称大神的说,GIL 锁就是 Python 的知名缺陷,Python 不能多核,并发不行等等其他问题
一个线程的执行流程
这里我口述一下:当你打开一个 py 文件,我们首先在cpython 中打开一个进程空间,进程空间包括 Python 解释器和文件,Python 解释器又包含虚拟机和编译器.
编译器:将 python 语言转化为 C语言能识别的字节码
虚拟机:字节码转成机器码,交由 CPU 去运行
理论上来说:单个进程的多线程可以利用多核(CPU)
但是开发 Cpython 解释器,给解释器加了一把锁
CPython 规定:同一个时刻,只能允许一个线程进解释器
为什么加锁??
- 当时都是单核时代,而且 CPU 的价格非常贵
- 如果不加全局解释器锁,开发 Cpython 解释器的程序员就会在源码内部各种主动加锁,解锁,非常麻烦,各种死锁现象等等,他为了省事,直接进入解释器时给线程加了一个锁.
优点:保证了 Cpython 解释器的数据资源的安全
缺点:单个进程的多线程不能利用多核
Jython 没有 GIL 锁
pypy 也没有 GIL 锁
提问:现在多核时代,我将 cpython 的 GIL 锁去掉行吗?
不可以,因为 cpython 解释器所有的业务逻辑都是围绕单个线程去实现的,去掉这个 GIL锁,几乎不可能
单个进程的多线程可以并发,但是不能利用多核,不能并发
多个进程可以并发,并行
**IO 密集型 : **单个进程的多线程合适,并发执行
**计算密集型 : **多进程的并行
四 : GIL 与 lock 锁的区别
相同点:
都是同种锁,互斥锁
**不同点 : **
- GIL 全局解释器锁,保护解释器内部的资源数据安全
- GIL 锁, 上锁 释放无需动手
- 自己代码中定义的互斥锁保护资源数据的安全
- 自己定义的互斥锁必须自己手动上锁,释放锁
五 : 验证计算密集型 IO 密集型的效率
IO 密集型
计算密集型
代码方的实现 :
计算密集型: 单个进程的多线程并发 VS 多个进程的并发执行
from threading import Thread
from multiprocessing import Process
import time
import flask
import random
def task():
count = 1
for i in range(1,100000):
count += 1
if __name__ == '__main__':
###多进程的并发 并行
start_time = time.time()
# l1 = []
# for i in range(4):
# p = Process(target = task)
# l1.append(p)
# p.start()
# for i in l1:
# i.join()
# print(f"执行效率:{time.time() - start_time}") # 执行效率:0.023988962173461914
# 多线程的并发
l1 = []
for i in range(4):
p = Thread (target = task)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f"执行效率:{time.time() - start_time}") # 执行效率:0.026134014129638672
由上方代码可知,计算密集型采用多进程的并发效率更快
IO 密集型 : 多进程的并发,并行 VS 多线程的并发
多进程的并发执行
def task():
count = 0
time.sleep(random.randint(1,3))
count += 1
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(50):
p = Process(target = task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f"执行效率:{time.time() - start_time}") #执行效率:3.0838990211486816
多线程的并发执行
def task():
count = 0
time.sleep(random.randint(1,3))
count += 1
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(50):
p = Thread(target = task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f"执行效率:{time.time() - start_time}") #执行效率:3.009322166442871
两个对比结果可以得出 对于 IO 密集型来说 单个进程的多线程的效率更好一些
六 : 多进程实现 socket 通信
无论是多线程还是多进程,如果按照上面的写法,来一个客户端请求,我就开一个线程,来一个请求开一个线程
应该是这样 : 你的计算机允许范围内,开启线程进程数量越多越好
服务端
import socket
from threading import Thread
def communicate(conn,addr):
while 1:
try:
from_client_data = conn.recv(1024)
print(f"来自{addr[1]}的消息:{from_client_data.decode('utf-8')}")
to_client_data = input(">>>>")
conn.send(to_client_data.encode('utf-8'))
except Exception :
break
conn.close()
def _accept():
server = socket.socket()
server.bind(('127.0.0.1',8848))
server.listen(5)
while 1:
conn,addr = server.accept()
t = Thread(target = communicate,args = (conn,addr))
t.start()
if __name__ == '__main__':
_accept()
客户端
import socket
client = socket.socket()
client.connect(('127.0.0.1',8848))
while 1:
try:
to_server_data = input(">>>>").strip()
client.send(to_server_data.encode('utf-8'))
from_server_data = client.recv(1024)
print(f"来自服务端的消息:{from_server_data.decode('utf-8')}")
except Exception :
break
client.close()
七 : 进程池,线程池
线程池 : 一个容器,这个容器限制住你开启线程的数量,比如 4 个,第一次肯定只能并发的处理 4 个任务,只要有任务完成,线程就会接下一个任务
又一个以时间换空间的例子
进程池:放置进程的一个容器
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import random
def task(n):
print(f"{os.getpid()}接客")
time.sleep(random.randint(1,3))
if __name__ == '__main__':
p = ProcessPoolExecutor() 默认不写,进程池里面的进程数与cpu个数相等
for i in range(20):
p.submit(task,i)
线程池 : 当是线程的一个容器,用来限制系统中线程的数量
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import random
def task(n):
print(f"{os.getpid()}接客") # 线程来说 pid 都是一样的
time.sleep(random.randint(1,3))
if __name__ == '__main__':
t = ThreadPoolExecutor() # 默认不写, cpu个数*5 线程数
for i in range(20):
t.submit(task,i)
线程即使开销小,电脑也不能无限的开线程,我们应该对线程和进程的数量做限制,在计算机能承受最大的情况下,尽可能的多开线程和进程
今日总结
递归锁:RLock 同一把锁,引用一次计数加一,释放一次计数减一,只要计数不为 0 ,其他线程进程就抢不到,他能解决死锁问题
信号量:同一时刻,可以设置抢锁的线程,或者进程数量
GIL 锁:全局解释器锁,进入解释器的时候,同一时刻只能有一个线程CPython 解释器录入的
优点: 保证了解释器的资源数据安全
缺点:单进程的多线程不能利用多核优势
GIL 锁与自定义锁的异同:
GIL 锁保证了解释器数据的安全
自定义锁保证了进程线程中的数据安全线程池,进程池:控制开启线程或者 进程的数量
IO 密集型:利用单进程的多线程并发
计算密集型:多进程的并发或者并行