day27 线程同步
今日内容概要
- 互斥(Mutex)
- 死锁
- 信号量(Semaphore)
- 全局解释器锁(GIL)
- 同步和异步
- 协程
昨日回顾
- 进程池
- 多线程
今日内容详细
互斥锁(Mutex)
我们从前看到过,如果多个进程同时修改同一个共享数据的时候(例如一百万错误),如果不进行任何操作,会造成数据处理的混乱。为了避免类似的错误,我们就要进行同步控制。
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是使用互斥锁。
互斥锁 保证每次只有一个线程修改数据,从而保证多线程情况下数据的正确性(原子性)。
原子性指的是,要么将线程中的代码全部执行完,要么就干脆不执行。每个线程是不可分割的,不可以执行这个线程还没有执行完,就执行下一个线程。
互斥锁为资源引入一组状态:锁定和非锁定。
当某个线程要更改公共的数据时,先将其锁定。此时资源的状态为锁定 ,其他线程不能对其进行修改。知道该线程释放资源,将资源的状态定为非锁定,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行修改数据的操作,从而保证了多线程情况下数据的安全性和正确性。
threading模块定义了Lock类,可以方便地处理锁定。
互斥锁主要用到三个方法:
# 创建锁
mutex = threading.Lock()
# 锁定
mutex.acquire()
# 释放,解锁
mutex.release()
我们可以使用互斥锁解决一百万问题:
import threading
import time
num = 0
def add_num1():
global num
mutex.acquire()
for i in range(1000000):
num += 1
mutex.release()
print('num in add_num1', num)
def add_num2():
global num
mutex.acquire()
for j in range(1000000):
num += 1
mutex.release()
print('num in add_num2', num)
mutex = threading.Lock()
th1 = threading.Thread(target=add_num1)
th2 = threading.Thread(target=add_num2)
th1.start()
th2.start()
time.sleep(1)
print(num)
上面的代码输出的结果为:
num in add_num1 1000000
num in add_num2 2000000
2000000
当线程一调用Lock对象的acquire()
方法获得锁时,这把锁就进入locked状态。因为每次只有一个线程可以获得锁。此时如果另一个线程二试图获得这个锁,将无法实现而进入同步阻塞状态。这个阻塞状态会一直持续,直到当线程一调用锁的release()
方法之后,该锁进入unlocked状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。一个线程有锁时,别的线程只能在外面等着。
死锁
死锁是一种错误的情况,我们日后一定要尽量避免。
造成死锁的原因很好理解:想象一下我们把钥匙锁在了屋子里。我们需要进入屋子才能拿到钥匙,但是只有拿到钥匙我们才能进去屋子。这样进入屋子和拿到钥匙两个进程构成了死锁,两件事都不能完成。
另一个例子是张三想要李四的画,李四想要张三的书。但是张三和李四从前不认识对方,彼此不信任,他们都担心对方拿到自己的东西之后对方不要自己想要的东西交出来。于是张三要求,只有拿到李四的画才能交出自己手中的书;而李四要求,只有拿到张三的数才能把画给他。这样,他们就僵在了那里,交易无法进行。
我们现在可以写出一个双线程死锁的例子:
import threading
import time
class MyThread1(threading.Thread):
def run(self):
mutexA.acquire()
time.sleep(1)
print(self.name + '---do1---up---')
mutexB.acquire()
print(self.name + '---do1---down---')
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
mutexB.acquire()
time.sleep(1)
print(self.name + '---do2---up---')
mutexA.acquire()
print(self.name + '---do2---down---')
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
th1 = MyThread1()
th2 = MyThread2()
th1.start()
th2.start()
上面的代码输出两行代码之后便不再继续执行,其输出的内容如下:
Thread-2---do2---up---
Thread-1---do1---up---
在多线程中,死锁问题很大一部分是由于线程同时获取多个锁造成的。如果一个线程获取了第一个所,然后再获取第二个锁的时候发生阻塞,那么这个线程就有可能阻塞其他线程的执行,从而导致整个程序假死。
信号量(Semaphore)
信号量的作用与互斥锁类似。不同的是,互斥锁限制同时只能有一个线程使用数据,但是信号量可以控制同时运行的线程数目。
信号量是用于控制一个时间点内线程数量的锁。信号量用来控制并发数。
使用场景举例:在读写文件的时候,一般只有一个线程再写,而读可以有多个线程同时进行。如果需要限制同时读文件的线程数目,就要用到信号量。因为如果使用互斥锁,同一时刻将只能有一个线程读取文件。
信号量用来将信号量限制在能够最大化效率的同时,又不会让计算机因过载而崩溃的数目下。
如果我们不做任何限制,下面代码中的100个线程将同时运行:
import time
import threading
def foo():
time.sleep(1)
print('ok', time.ctime())
for i in range(100):
th = threading.Thread(target=foo)
th.start()
上面的代码在等待大约1秒后几乎同时打印出来。100个程序虽然没有搞垮计算机,但可想而知,这个过程耗费了大量的内存。如果我们的任务更多,程序更复杂,计算机就未必吃得消了。
这时,我们就可以通过信号量来控制同时运行的线程数目:
import time
import threading
def foo():
sem.acquire()
time.sleep(1)
print('ok', time.ctime())
sem.release()
sem = threading.Semaphore(5)
for i in range(100):
th = threading.Thread(target=foo)
th.start()
上面的代码每隔一秒会打印出五条内容,明显降低了计算机的压力。
全局解释器锁(GIL)
全局解释器锁(Global Interpreter Lock,GIL)是CPython独有的锁。引入它的初衷是为了保证数据安全而牺牲效率。但GIL是一柄双刃剑,它带来优势的同时也带来很多问题。
GIL应该是Python最为人诟病的一个特性了:正是因为GIL的存在,Python中的单线程只能使用CPU的一个核心,即便使用的是多核处理器。这极大限制了Python多线程的应用场景。很多计算集中的代码只能用多进程来实现。而多进程因为数据不共享,还要用到队列来实现进程间的通信,这又增加了代码的复杂度。
要了解Python的全局解释器锁,我们首先要了解Python文件的执行过程:
- 操作系统将应用程序从硬盘加载到内存。Python文件运行时,会在内存开辟一个进程空间,将Python解释器以及py文件加载进去,解释器运行py文件
- Python解释器编译Python代码一共分成两个步骤:首先将代码编译成C的字节码,然后虚拟机将C的字节码编译成机器码。随后,操作系统会将机器码交给CPU去执行
- py文件中有一个主线程,主线程做的就是这个过程。如果开多线程,每个线程也都要进行上述的过程(Python --> C --> 机器码)
假设我们现在有三个线程,那么这三个线程在计算机中理想的,效率最高的运行情况应该是这样的:
我们有三个线程,就得到三组机器码。把他们交给操作系统。操作系统分配三个CPU分别执行这三组机器码。它们同时执行,最大限度地提高效率。
然而,CPython却不是这样执行的。CPython的多线程不能使用多核。
CPython在所有线程进入解释器前加了一个全局解释器锁(Global Interpreter Lock,GIL)。这个锁是互斥锁,是加在解释器上的。这就导致同一时间只有一个线程在执行,也就无法使用多核。而且我们没有任何办法能解掉这个锁。
既然GIL锁的负面效应这么大,为什么还要在Python的解释器上加一个GIL锁呢?
这是因为Python最开始编写的时候,CPU还只有一个核心。那时的人们没能预料到未来会有多核CPU出线的可能,也就没有专门针对多核预留优化的空间。对于单核CPU而言,加一个锁既保证了数据安全,又让编写Python解释器更加容易。
而现在,虽然人们意识到多线程使用多核处理器的重要性,但是因为解释器内部的管理全部是针对单线程写的。牵一发而动全身,要想去掉GIL锁,就要从根本上修改Python解释器的代码。
然而,经过这么多年的发展,Python已经是一个非常庞大的系统工程,修改代码谈何容易。再加上Python是开源免费的编程语言,Python组织的盈利能力十分有限。而修改源代码又是一项需要消耗巨大资源的事情,所以这么多年来,GIL一直没能被取消掉。
读者或许还会问,既然只有CPython有GIL锁,我们可不可以换一种解释器,比如不使用CPython,而是使用PyPy呢?
答案也是不合适的。首先,官方更推荐使用CPython。因为Python本就是个面向对象的编程语言,效率相对不高。不管是使用Java编写的JPython,还是使用Python编写的PyPy,其运行效率都要低于使用C语言编写的CPython。而且,其他解释器,比如PyPy,规则和漏洞都很多,转成PyPy还要重写代码。我们为了追求提高多核效率,却牺牲了运行效率并且带来了漏洞,还要重写代码,似乎有些得不偿失了。
那么Python就不能使用多核了吗?
也不是的。虽然多线程无法是用多核,多进程还是能用到多核的。如果我们进行一些计算密集型的任务,就可以使用多进程,使用多个CPU来提高运算效率。不过多进程的缺点是会开辟较多的内存空间,开销比较大。
还有一个问题,Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么还要用到互斥锁呢?
我们已经知道,使用锁的目的是为了保护共享的数据,确保同一时刻,只能有一个线程修改共享的数据,避免发生混乱。
但是我们会有不同类型的数据,这样我们就要使用不同的锁来分别对它们进行保护。
GIL和互斥锁是两把锁,保护的数据是不一样的。GIL是解释器级别的,保护的是解释器级别的数据,比如垃圾回收的数据;互斥锁则是保护程序员自己开发的应用程序数据。GIL负责保护宏观数据,互斥锁负责保护微观具体的数据。
这就好比一间屋子。屋子除了有一个正门锁,在屋子内部还有很多小房间也有各自的门和锁。正门锁,只要是我们的家人都可以进来,而坏人却都进不来(理想中的门锁)。但是对于家人来说,也会有一些私密的空间。比如我们自己的房间是不想让爸妈进来看的,于是我们给自己的房间也上了一把锁。正门锁是为了防止坏人进来,我们房间的门锁是为了保留一些小秘密。我们不能说,既然有了正门锁,就没有必要给小房间上锁了。正门锁和小房间的锁保护的东西不同,但都有必要。
在这个例子里面,正门锁就是GIL,小房间的锁就是我们创建的互斥锁。
同步和异步
同步调用:确定调用顺序。多个任务依此按照顺序执行,每次只执行一个任务,直到该任务结束,才会开始下一个任务。如果任务未结束,后面的任务将处于等待状态,而不会提前执行。
异步调用:不确定调用顺序。一次提交多个任务,这些任务同时处于就绪状态,共同执行。如果有一个任务没有结束,并不会影响其他任务的进展。
例如,我们现在要让三个人分配任务,让他们每人写一本书:
- 同步:先告知第一个人写书,等他写完之后,告知第二个人。然后等第二个人写完之后,在告知第三个人。等第三个人写完,任务结束。如果前面有一个人耽误了日期,后面的人也会一直等待下去。
- 异步:把三个任务告知三个人,让他们每人写一本书。三个人同时开始写书。即便他们中有一个人没能按时完成,另两个人的进度不会受影响。
再如,烧水泡茶的例子。我们要请客人喝茶,首先一共要分洗水壶、烧开水、洗茶壶、洗茶杯、拿茶叶和泡茶等六个步骤。其中,洗水壶要1分钟;烧开水要15分钟;洗茶壶要1分钟;洗茶杯要2分钟;拿茶叶要1分钟;泡茶要3分钟。
- 同步:按照顺序,先洗水壶,然后烧开水。水烧开之后洗茶壶,再洗茶杯,拿茶叶,最后泡茶。总共花费了23分钟
- 异步:先洗水壶。然后在烧开水的同时洗茶壶,洗茶杯,拿茶叶。最后等水烧开,泡茶。总共要花费19分钟
同步(sync)
同步意味着各个任务有序执行,它们处在统一的时间轴中。有两个典型的场景可能会用到同步:
- 任务的逻辑十分清晰,必须要先执行某一个任务。如果第一个任务发生了阻塞,会一直等待,直到这个任务完成,再执行第二个任务。这样才能协同步调,按照预定逻辑的先后次序进行,避免因果倒置,逻辑混乱,产生异常
- 一个任务的完成需要依赖另外一个任务。只有等待被依赖的任务完成后,依赖的任务才能开始执行。这时一种可靠的任务序列
我们可以通过设置互斥锁的方式来让多个线程同步有序执行:
import threading
import time
class Task1(threading.Thread):
def run(self):
while Ture:
mutex1.acquire()
print('Task1')
time.sleep(1)
mutex2.release()
class Task2(threading.Thread):
def run(self):
while True:
mutex2.acquire()
print('Task2')
time.sleep(1)
mutex3.release()
class Task3(threading.Thread):
def run(self):
while True:
mutex3.acquire()
print('Task3')
time.sleep(1)
mutex1.release()
th1 = Task1()
th2 = Task2()
th3 = Task3()
mutex1 = threading.Lock()
mutex2 = threading.Lock()
mutex3 = threading.Lock()
mutex2.acquire()
mutex3.acquire()
th1.start()
th2.start()
th3.start()
上述代码会依此循环输出Task1,Task2,Task3,每秒输出一个。
上面的例子中,三个线程之间相互关联,耦合性极强。这就会带来一个问题:如果我们修改其中一个任务函数,就有可能会影响到另外两个函数的顺利运行。这给日后的开发扩展带来很多不确定,埋下了隐患。
这时,我们就需要用到生产者消费者模式来解决这个问题。
在线程世界中,生产者就是生产数据的线程,消费者就是消费数据的线程(就好比做包子和吃包子)。经常会出现生产数据的速度大于消费数据的速度,或者生产速度跟不上消费速度的情况。
这就需要我们在生产者和消费者之间放一个容器,也就是缓冲区,来解决生产者和消费者强耦合的问题。生产者统一往容器中放入数据,消费者统一从容器中提取数据。但是生产者和消费者之间没有直接的数据交换。
生产者和消费者彼此间不直接通讯,他们之间通过容器进行数据交互。这个容器,就可以是阻塞队列。
Python的quene模块实现三种类型的队列可以实现线程同步:
- FIFO(先入先出)队列,Queue
- LIFO(后入先出)栈,LifoQueue
- 优先级队列,PriorityQueue
这三种队列的区别在于检索条目的顺序不同:
- Queue类会按照先进先出的顺序检索条目
- LifoQueue按照后进先出的顺序检索,类似于一个栈
- PriorityQueue则是通过使用heapq模块使得条目有序,最小值的条目最先被检索到
这些队列都实现了锁原语(可以理解为原子操作,要么不做,要么做完),能够在多线程中直接使用。
有了queue模块的Queue类,我们就可以构造一个队列对象。其用法为:
q = queque.Queue(maxsize=0)
其参数可以不写,默认为0。其参数表示队列的最大条目,可以用来限制内存的使用。当参数为0或负数时,队列大小为无穷大。
一旦队列被占满,插入操作将被阻塞,直到队列中存在空闲空间。
这样我们就可以根据生产者消费者模式写出以下代码:
import threading
import queue
import time
class Pro(threading.Thread):
def run(self):
count = 0
while True:
if q.qsize() < 100:
for i in range(3):
count += 1
msg = f'生产商品{count}'
q.put(count)
print(msg)
time.sleep(2)
class Con(threading.Thread):
def run(self):
while True:
if q.qsize() > 2:
for i in range(2):
msg = f'消费商品{q.get()}'
print(msg)
time.sleep(3)
q = queue.Queue()
for i in range(2):
pr = Pro()
pr.start()
for j in range(3):
co = Con()
co.start()
上面的代码运行过后,生产与消费交替进行。虽然生产者与消费者的进程数目并不相同,每个进程的生产速度与消费速度也不一致,但程序还是有序地运转着。这就是通过队列容器实现了生产者与消费者间的间接通信,从而降低了耦合性,提高了代码的灵活性。
异步(async)
异步意味着代码执行无序,各个线程拥有独立的时间轴,其目的是追求效率。
处理和调用一个任务之后,不会等待这个任务的处理结果,而是同时处理其他更多的任务。这些任务通过状态、回调等方式将处理结果反馈给调用者。
对于I/O相关的程序来说,异步编程可以大幅度提高任务的执行效率。因为在某个I/O操作的读写过程中,系统可以先去处理其他操作(通常是其他的I/O操作)。
用我们之前的烧水泡茶的例子来讲就是,在烧水的同时,我们可以洗茶壶、洗茶具、找茶叶,而不是等水少开。
异步执行任务是不确定执行顺序的,所以要注意判断当前任务是否适用于异步执行。
我们从前学的多线程或多进程,都是异步的例子:
import threading
import time
def th(num):
print('线程%s开始执行' % num)
time.sleep(1)
print('线程%s执行完毕' % num)
for i in range(0, 3)
th = threading.Thread(target=th, args=(i,))
th.start()
print('主方法执行完毕')
上面的代码执行的结果为:
线程0开始执行
线程1开始执行
线程2开始执行
主方法执行完毕
线程0执行完毕
线程1执行完毕
线程2执行完毕
前面四行先被打印出来,后面三行在一秒后才被打印。
但是我们发现,主方法要在每个线程结束前就已经结束。而且,线程间没有任何的数据交换,对于复杂些的任务,处理起来就不太适用了。
如果我们使用条件控制,就可以实现进程间的交互。避免出错的同时,最大限度提高效率:
import threading
import time
num = 0
def num_add_1():
global num
# 当参数为False时,如果锁不可得,不会阻塞,而是返回False,如果锁可得,返回True
while True:
if mutex.acquire(False):
for i in range(1000000):
num += 1
print('线程一执行完毕,此时num的值为:', num)
mutex.release()
break
else:
print('线程一该干嘛干嘛')
time.sleep(1)
def num_add_2():
global num
while True:
if mutex.acquire(False):
for i in range(1000000):
num += 1
print('线程二执行完毕,此时num的值为:', num)
mutex.release()
break
else:
print('线程二该干嘛干嘛')
time.sleep(1)
mutex = threading.Lock()
th1 = threading.Thread(target=num_add_1)
th2 = threading.Thread(target=num_add_2)
th1.start()
th2.start()
上面的代码运行结果为:
线程二该干嘛干嘛
线程一执行完毕,此时num的值为: 1000000
线程二执行完毕,此时num的值为: 2000000
通过条件控制线程的执行,虽然实现了线程间的通信,但是实现的方式比较复杂,也会增加线程间的耦合性,并不是一种理想的解决方案。
这就要用到回调机制来实现进程将执行结果反馈给调用者:
from multiprocessing import Pool
import random
import time
def download(name):
for i in range(1, 4):
print(f'{name}正在下载文件{i}')
time.sleep(random.randint(1, 3))
return f'{name}下载完成'
def alarm_user(msg):
print(msg)
if __name__ == '__main__':
p = Pool(3)
for i in range(1, 4):
p.apply_async(func=download, args=(f'线程{i}',), callback=alarm_user)
p.close()
p.join()
print('程序执行完毕')
上面的代码运行结果为:
线程1正在下载文件1
线程2正在下载文件1
线程3正在下载文件1
线程1正在下载文件2
线程3正在下载文件2
线程1正在下载文件3
线程2正在下载文件2
线程1下载完成
线程3正在下载文件3
线程2正在下载文件3
线程3下载完成
线程2下载完成
程序执行完毕
因为有随机数,所以每次运行的结果可能会不同。
当func执行完毕后,其返回值会传递给回调函数callback。
协程
协程时比线程更小的执行单元,也被称作微线程。
一个线程作为容器,里面可以放置多个协程。
只切换函数调用即可完成协程,从而减少CPU的切换,利于计算密集型的任务
协程自己主动让出CPU。
进程与线程的任务是由操作系统自行切换的,程序员无法控制
协程可以通过程序员编写的程序(代码)来进行切换,程序员可以对其进行控制和调配
greenlet
我们使用greenlet模块来实现协程。但是Python本没有内置greenlet,我们需要自己安装。
安装有很多种方法:
-
在PyCharm的终端,输入命令:
pip install greenlet
-
在菜单栏中找到
file
->settings
在interpreter选项中,会出现所有已安装的包。可以看到,并没有greenlet。我们可以点击右边的加号,添加包
在搜索框中输入greenlet,找到下面搜索到的greenlet包。注意勾选下面的把包安装到用户的包路径中,然后安装包。
-
当然也可以直接使用终端导入greenlet包:
将greenlet包安装好之后,我们就可以使用它实现协程了:
from greenlet import greenlet
import time
def func_a():
while True:
print('---A---')
gr2.switch()
time.sleep(1)
def func_b():
while True:
print('---B---')
gr1.switch() # 会从上次暂停的西方开始执行
time.sleep(1)
gr1 = greenlet(func_a)
gr2 = greenlet(func_b)
gr1.switch() # 执行协程gr1
与线程不同的是,协程是通过命令手动切换到CPU中执行的任务,而不是交给操作系统自动分配。
gevent
Python还有一个比greenlet更强大的模块,gevent。它可以自动切换任务。
其原理是,当一个greenlet遇到I/O(指的是input/output,输入输入)操作时,比如访问网络,就自动切换到其他的greenlet。等到I/O操作完成,再在适当的时候切换回来,继续执行。
gevent只有遇到模块能使变得I/O操作的时候,程序才会进行任务切换,实现并发的效果。如果所有程序都没有I/O操作,那么就基本属于串行执行了。
同样地gevent也需要我们安装gevent包。
安装好gevent包之后,就可以使用了:
import gevent
def func_a():
while True:
print('---A---')
gevent.sleep(1) # 模拟一个耗时操作
# gevent中,当一个协程遇到耗时的I/O操作时,会自动将CPU的使用权交给其他协程
def func_b():
while True:
print('---B---')
gevent.sleep(1) # 遇到耗时操作,转交CPU使用权
gt1 = gevent.spawn(func_a) # 创建gevent协程对象,此时协程已经开始执行
gt2 = gevent.spawn(func_b)
gt1.join() # 等待协程执行结束
gt2.join()
A和B同时输出,然后每过一秒,输出一组A和B。