多线程(二)
1.死锁现象与递归锁
死锁: 是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
from threading import Thread,Lock
import time
lock_A = Lock()
lock_B = Lock()
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到了A锁')
lock_B.acquire()
print(f'{self.name}拿到了B锁')
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到了B锁')
time.sleep(0.1)
lock_A.acquire()
print(f'{self.name}拿到了A锁')
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
#Thread-1拿到了A锁
#Thread-1拿到了B锁
#Thread-1拿到了B锁
#Thread-2拿到了A锁
解决方法,递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁:
from threading import Thread,RLock
import time
lock_A = lock_B = RLock()
#递归锁有一个计数的功能,原数字为0,上一次锁,计数+1,释放一次锁,计数-1
#只要递归锁上面的数字不为0,其他线程就不能抢锁
class MyThread(Thread):
def run(self):
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到了A锁')
lock_B.acquire()
print(f'{self.name}拿到了B锁')
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到了B锁')
time.sleep(0.1)
lock_A.acquire()
print(f'{self.name}拿到了A锁')
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t = MyThread()
t.start()
#Thread-1拿到了A锁
#Thread-1拿到了B锁
#Thread-1拿到了B锁
#Thread-1拿到了A锁
#Thread-2拿到了A锁
#Thread-2拿到了B锁
#Thread-2拿到了B锁
#Thread-2拿到了A锁
#Thread-3拿到了A锁
#Thread-3拿到了B锁
#Thread-3拿到了B锁
#Thread-3拿到了A锁
2.信号量
Semaphore管理一个内置的计数器,
每当调用acquire()时内置计数器-1;
调用release() 时内置计数器+1;
计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
实例:(同时只有5个线程可以获得semaphore,即可以限制最大连接数为5):
from threading import Thread, Semaphore, current_thread
import time
import random
sem = Semaphore(5)
def task():
sem.acquire()
print(f'{current_thread().name} 厕所ing')
time.sleep(random.randint(1,3))
sem.release()
if __name__ == '__main__':
for i in range(20):
t = Thread(target=task,)
t.start()
3.GIL全局解释器锁
理论上:单个进程的多线程可以利用多核,但是Cpython解释器给进入解释器的线程加了锁
加锁的原因:
1.当时处于单核时代,且cpu价格昂贵
2.如果不加全局解释器锁,则程序员需要在源码内部各种主动加锁,解锁,出现死锁现象,则直接在进入解释器时给线程加了锁
优点:保证了Cpython解释器的数据资源的安全
缺点:单个进程的多线程不能利用多核
结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython,PyPy就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL
GIL介绍:
GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。
可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。
要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。例如python test.py,python aaa.py,python bbb.py会产生3个不同的python进程
#验证python test.py只会产生一个进程
#test.py内容
import os,time
print(os.getpid())
time.sleep(1000)
python3 test.py
#在windows下
tasklist |findstr python
#在linux下
ps aux |grep python
在一个python的进程内,不仅有test.py的主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内,毫无疑问
#1 所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)
例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
#2 所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。
综上:
如果多个线程的target=work,那么执行流程是
多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行
解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码
IO密集型:适合单个进程的多线程,并发执行
计算密集型:适合多进程的并行
4.GIL锁与lock锁的区别
GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理
相同点:都是同种锁(互斥锁)
不同点:
1.GIL全局解释器锁保护解释器内部的资源数据安全
2.GIL锁的上锁释放无需手动操作
3.自己代码中定义的互斥锁保护进程中的资源数据的安全
4.自己定义的互斥锁必须手动上锁和释放
5.验证计算密集型与IO密集型的效率
计算密集型:
from threading import Thread
from multiprocessing import Process
import time,random
def task():
count = 0
for i in range(100000000):
count += 1
#多进程的并发,并行
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(4):
p = Process(target = task)
l1.append(p)
p.start()
for i in l1:
p.join()
print(f"执行效率:{time.time() - start_time}")
#执行效率:6.620556354522705
#多线程的并发
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(4):
p = Thread(target = task)
l1.append(p)
p.start()
for i in l1:
p.join()
print(f"执行效率:{time.time() - start_time}")
#执行效率:17.61694884300232
总结:计算密集型:多进程的并发并行执行效率高
IO密集型:
from threading import Thread
from multiprocessing import Process
import time,random
def task():
count = 0
time.sleep(random.randint(1,3))
count += 1
#多进程的并发,并行
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(50):
p =Process(target = task)
l1.append(p)
p.start()
for i in l1:
p.join()
print(f"执行效率:{time.time() - start_time}")
#执行效率:3.101606607437134
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(50):
p =Thread(target = task)
l1.append(p)
p.start()
for i in l1:
p.join()
print(f"执行效率:{time.time() - start_time}")
#执行效率:2.010244846343994
总结:IO密集型:单个进程的多线程的并发执行效率高
6.多线程实现socket通信
server端:
import socket
from threading import Thread
def communicate(conn,addr):
while 1:
try:
from_client_data = conn.recv(1024)
print(f'来自客户端{addr[1]}的消息: {from_client_data.decode("utf-8")}')
to_client_data = input('>>>').strip()
conn.send(to_client_data.encode('utf-8'))
except Exception:
break
conn.close()
def _accept():
server = socket.socket()
server.bind(('127.0.0.1', 8848))
server.listen(5)
while 1:
conn, addr = server.accept()
t = Thread(target=communicate,args=(conn,addr))
t.start()
if __name__ == '__main__':
_accept()
client端:
import socket
client = socket.socket()
client.connect(('127.0.0.1',8848))
while 1:
try:
to_server_data = input('>>>').strip()
client.send(to_server_data.encode('utf-8'))
from_server_data = client.recv(1024)
print(f'来自服务端的消息: {from_server_data.decode("utf-8")}')
except Exception:
break
client.close()
7.进程池和线程池
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务(高级一些的进程池可以根据你的并发量,搞成动态增加或减少进程池中的进程数量的操作),不会开启其他进程,提高操作系统效率,减少空间的占用等。
线程池: 一个容器,这个容器限制住你开启线程的数量,比如4个,第一次肯定只能并发的处理4个任务,只要有任务完成,线程马上就会接下一个任务.
#以时间换空间
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def task():
print(f"{os.getpid()}工作")
time.sleep(random.randint(1,3))
#开启进程池(并发或并行)
if __name__ == '__main__':
p = ProcessPoolExecutor() #默认不写,进程池里面的进程数与本机cpu数量相等
for i in range(20):
p.submit(task)
#开启线程池(并发)
if __name__ == '__main__':
t = ThreadPoolExecutor() #默认不写,线程池里的线程数为本机cpu个数*5
for i in range(20):
t.submit(task)