摘录python核心编程
一般的,多线程代码中,总有一些特定的函数或者代码块不希望(或不应该)被多个线程同时执行(比如两个线程运行的顺序发生变化,就可能造成代码的执行轨迹或者行为不相同,或者产生不一致的数据),比如修改数据库、更新文件或其他会产生竞态条件的类似情况。此时就需要同步了。
同步:任意数量的线程可以访问临界区的代码,但在给定的时刻又只有一个线程可以通过时。
这里介绍两个基本的同步类型原语:锁/互斥、信号量
锁
锁有两种状态:锁定和未锁定。与之对应的是两个函数:获得锁和释放锁。
当多线程争夺锁时,允许第一个获得锁的线程进入临界区,并执行代码;所有之后到达的线程都将被阻塞,直到第一个线程执行结束,退出临界区,并释放锁。此时其他的线程可以获得锁并进入临界区。注意:那些被阻塞的线程是没有顺序的(并不是先到先得),意味着下一个获得锁的线程的顺序并不是确定的。
mtsleepF.py脚本中派生了随机数量的线程(没有使用锁):
from atexit import register from random import randrange from threading import Thread,currentThread from time import ctime,sleep #自定义一个集合对象,重写__str__方法 class CleanOutputSet(set): def __str__(self): return ', '.join(x for x in self) #列表生成式 randrange()用于生成一个随机数,range()返回一个列表 loops = (randrange(2,5) for x in range(randrange(3,7))) remaining = CleanOutputSet() def loop(nsec): myname = currentThread().name remaining.add(myname) print('[%s] 开始了 %s' % (ctime(),myname)) sleep(nsec) remaining.remove(myname) print('[%s] 结束了 %s (%s second)' % (ctime(),myname,nsec)) print(' (还存在:%s)' % (remaining or 'NONE')) def _main(): #创建3~6个线程,每个线程睡眠2~4秒 for pause in loops: Thread(target = loop,args = (pause,)).start() #装饰器,在脚本的最后执行 @register def _atexit(): print('所有的完成于:',ctime()) if __name__ == '__main__': _main()
正常情况下,执行的结果:
PS C:UsersWC> python E:Python3.6.3workspacemtsleepF.py [Mon Apr 16 17:47:31 2018] 开始了 Thread-1 [Mon Apr 16 17:47:31 2018] 开始了 Thread-2 [Mon Apr 16 17:47:31 2018] 开始了 Thread-3 [Mon Apr 16 17:47:31 2018] 开始了 Thread-4 [Mon Apr 16 17:47:33 2018] 结束了 Thread-1 (2 second) (还存在:Thread-4, Thread-3, Thread-2) [Mon Apr 16 17:47:33 2018] 结束了 Thread-2 (2 second) (还存在:Thread-4, Thread-3) [Mon Apr 16 17:47:34 2018] 结束了 Thread-3 (3 second) (还存在:Thread-4) [Mon Apr 16 17:47:34 2018] 结束了 Thread-4 (3 second) (还存在:NONE) 所有的完成于: Mon Apr 16 17:47:34 2018
我们多运行几次,有时会得到下面错乱的结果:
PS C:UsersWC> python E:Python3.6.3workspacemtsleepF.py [Mon Apr 16 17:50:09 2018] 开始了 Thread-1 [Mon Apr 16 17:50:09 2018] 开始了 Thread-2 [Mon Apr 16 17:50:09 2018] 开始了 Thread-3 [Mon Apr 16 17:50:12 2018] 结束了 Thread-3 (3 second) (还存在:Thread-2, Thread-1) [Mon Apr 16 17:50:13 2018] 结束了 Thread-1 (4 second) [Mon Apr 16 17:50:13 2018] 结束了 Thread-2 (4 second) (还存在:NONE) (还存在:NONE) 所有的完成于: Mon Apr 16 17:50:13 2018
我们发现输出存在部分混乱的情况(多个线程可能并行执行IO),还有就是两个线程修改同一个变量(剩余线程名集合)。IO和访问相同的数据结构都属于临界区,因此需要引入锁防止多个线程同时进入临界区。
下面是引入锁的脚本实例(mtsleepG.py):
# python 3.6 from atexit import register from random import randrange from threading import Thread,Lock,currentThread #2.6版本后重命名为current_thread() from time import ctime,sleep #自定义一个集合类,重写—__str__方法,将默认输出改变为将其所有元素按照逗号分隔的字符串 class CleanOutputSet(set): def __str__(self): return ', '.join(x for x in self) #三个全局变量 lock = Lock()#锁 loops = (randrange(2,5) for x in range(randrange(3,7)))#随机数量的线程(3~6个),每个线程暂停2~4秒 remaining = CleanOutputSet()#自定义集合类的实例 def loop(nsec): myname = currentThread().name#获得当前线程的名称 lock.acquire()#获取锁,阻止其他线程进入到临界区 remaining.add(myname)#将线程名添加到集合中 print('[%s] 开始 %s' % (ctime(),myname)) lock.release()#释放锁 sleep(nsec)#线程睡眠操作 lock.acquire()#重新获得锁 remaining.remove(myname)#从集合中删除当前线程 print('[%s] 完成 %s (%s secs)' % (ctime(),myname,nsec)) print(' (remaining: %s )' % (remaining or 'NONE')) lock.release()#最后释放锁 def _main(): #main函数前面添加‘_’是为了不在其他地方使用而导入。_main只能在命令行模式下才能执行 for pause in loops: Thread(target = loop,args = (pause,)).start() #循环派生并执行每个线程 #装饰器,注册_atexit()函数,使得解释器在脚本退出的时候执行此函数 @register def _atexit(): print('所有线程完成于:',ctime()) if __name__ == '__main__': _main()
多次执行,结果没有再出现混乱的情况:
PS C:UsersWC> python E:Python3.6.3workspacemtsleepG.py [Tue Apr 17 19:54:31 2018] 开始 Thread-1 [Tue Apr 17 19:54:31 2018] 开始 Thread-2 [Tue Apr 17 19:54:31 2018] 开始 Thread-3 [Tue Apr 17 19:54:31 2018] 开始 Thread-4 [Tue Apr 17 19:54:31 2018] 开始 Thread-5 [Tue Apr 17 19:54:31 2018] 开始 Thread-6 [Tue Apr 17 19:54:33 2018] 完成 Thread-1 (2 secs) (remaining: Thread-5, Thread-3, Thread-4, Thread-6, Thread-2 ) [Tue Apr 17 19:54:33 2018] 完成 Thread-5 (2 secs) (remaining: Thread-3, Thread-4, Thread-6, Thread-2 ) [Tue Apr 17 19:54:34 2018] 完成 Thread-3 (3 secs) (remaining: Thread-4, Thread-6, Thread-2 ) [Tue Apr 17 19:54:34 2018] 完成 Thread-2 (3 secs) (remaining: Thread-4, Thread-6 ) [Tue Apr 17 19:54:35 2018] 完成 Thread-4 (4 secs) (remaining: Thread-6 ) [Tue Apr 17 19:54:35 2018] 完成 Thread-6 (4 secs) (remaining: NONE ) 所有线程完成于: Tue Apr 17 19:54:35 2018
信号量
当情况更加复杂的时候,还可以考虑使用信号量这个同步原语来代替锁。
信号量(Semaphore),是一个计数器,当资源消耗时递减(调用acquire),计数器会减1;当资源释放是递增(调用release),计数器会加1。计数器的值不会小于0;当等于0的时候,再调用acquire会阻塞,直到其他线程调用release为止。可以认为信号量代表他们的资源可用或不可用。
两个函数简介如下:
acquire(blocking=布尔值,timeout=None):
- 本方法用于获得Semaphore
- blocking默认值是True,此时,如果内部计数器值大于0,则减一,并返回;如果等于0,则阻塞,等待其他线程调用release()以使计数器加1;本方法返回True,或无线阻塞
- 如果blocking=False,则不阻塞,如若获取失败,则返回False
- 当设定了timeout的值,最多阻塞timeout秒,如果超时,返回False。
release():
- 释放Semaphore,内部计数器加1,可以唤醒等待的线程
BoundedSemaphore正好和Semaphore相反:一个工厂函数,返回一个新的有界信号量对象。有界信号量会确保他的值不会超过初始值;如果超出则会抛出ValueError异常。初始值默认为1。
消耗资源使计数器递减的操作习惯上成为P(),也称为wait、try、acquire、pend、procure.
相对的,当一个线程对一个资源完成操作时,该资源需要返回资源池中,这种操作一般称为V(),也称为signal、increment、release、post、Vacate。
python简化了所有的命名,使用和锁的函数一样的名字:acquire和release。信号量比锁更加灵活,因为可以有多个线程,每个线程拥有有限资源的一个实例
下面,我们模仿一个简化的糖果机:该糖果机中只有5个可用的槽来保持库存(糖果),如果所有的槽都满了,糖果就不能再加到这个机器中了;相似的,如果每个槽都空了,消费者就不能再购买到了。我们使用信号量来追踪这些有限的资源(糖果槽)。
脚本实例candy.py:
#python 3.6 from atexit import register from random import randrange from threading import BoundedSemaphore,Lock,Thread#增加了信号量 from time import sleep,ctime #3个全局变量 lock = Lock()#锁 MAX = 5 #表示库存糖果最大值的常量 candytray = BoundedSemaphore(MAX)#‘糖果托盘’,一个信号量 #向库存中添加糖果。这段代码是一个临界区,输出用户的行动,并在糖果超过最大库存的时候给出警告 def refill(): lock.acquire() print('重装糖果……') try: candytray.release() except ValueError: print('满了,跳过') else: print('成功') lock.release() #购买糖果。也是一个临界区,效果和refill函数相反 def buy(): lock.acquire() print('购买糖果中………') #检查是否所有的资源都已经消费完。 #计数器的值不能小于0,所以这个调用一般会在计数器再次增加之前被阻塞。传入非阻塞标志False,让调用不再阻塞,而在应当阻塞的时候返回一个false,表示没有更多资源了。 if candytray.acquire(False): print('成功') else: print('空,跳过') lock.release() #模拟糖果机的所有者 def producer(loops): for i in range(loops): refill() sleep(randrange(3)) #模拟消费者 def consumer(loops): for i in range(loops): buy() sleep(randrange(3)) #_main表示从命令行执行 def _main(): print('开始于:',ctime()) nloops = randrange(2,6) print('糖果机(一共 %s 个槽)' % MAX) #创建消费者和所有者线程 #其中消费者线程中,增加了额外的操作,用于随机给出正偏差,使得消费者真正消费的糖果数可能会比供应者放入机器的更多;否则代码永远不会进入消费者尝试从空机器购买糖果的情况 Thread(target = consumer,args = (randrange(nloops,nloops+MAX+2),)).start()# Thread(target = producer,args = (nloops,)).start() #注册退出函数 @register def _atexit(): print('结束于:',ctime()) if __name__ == '__main__': _main()
执行结果类似:
PS C:UsersWC> python E:Python3.6.3workspacecandy.py 开始于: Wed Apr 18 19:56:19 2018 糖果机(一共 5 个槽) 购买糖果中……… 成功 购买糖果中……… 成功 重装糖果…… 成功 重装糖果…… 成功 重装糖果…… 满了,跳过 购买糖果中……… 成功 购买糖果中……… 成功 购买糖果中……… 成功 购买糖果中……… 成功 购买糖果中……… 成功 购买糖果中……… 空,跳过 购买糖果中……… 空,跳过 结束于: Wed Apr 18 19:56:27 2018