2.线程篇¶
在线预览:https://github.lesschina.com/python/base/concurrency/3.并发编程-线程篇.html
示例代码:https://github.com/lotapp/BaseCode/tree/master/python/5.concurrent/Thread
终于说道线程了,心酸啊,进程还有点东西下次接着聊,这周4天外出,所以注定发文少了+_+
用过Java或者Net的重点都在线程这块,Python的重点其实在上篇,但线程自有其独到之处~比如资源共享(更轻量级)
这次采用循序渐进的方式讲解,先使用,再深入,然后扩展,最后来个案例
,呃.呃.呃.先这样计划~欢迎纠正错误
2.1.入门篇¶
官方文档:https://docs.python.org/3/library/threading.html
进程是由若干线程组成的(一个进程至少有一个线程)
2.1.1.线程案例¶
用法和Process
差不多,咱先看个案例:Thread(target=test, args=(i, ))
import os
from threading import Thread, current_thread
def test(name):
# current_thread()返回当前线程的实例
thread_name = current_thread().name # 获取线程名
print(f"[编号:{name}],ThreadName:{thread_name}
PID:{os.getpid()},PPID:{os.getppid()}")
def main():
t_list = [Thread(target=test, args=(i, )) for i in range(5)]
for t in t_list:
t.start() # 批量启动
for t in t_list:
t.join() # 批量回收
# 主线程
print(f"[Main]ThreadName:{current_thread().name}
PID:{os.getpid()},PPID:{os.getppid()}")
if __name__ == '__main__':
main()
输出:(同一个进程ID)
[编号:0],ThreadName:Thread-1
PID:20533,PPID:19830
[编号:1],ThreadName:Thread-2
PID:20533,PPID:19830
[编号:2],ThreadName:Thread-3
PID:20533,PPID:19830
[编号:3],ThreadName:Thread-4
PID:20533,PPID:19830
[编号:4],ThreadName:Thread-5
PID:20533,PPID:19830
[Main]ThreadName:MainThread
PID:22636,PPID:19830
注意一点:Python里面的线程是Posix Thread
2.1.2.指定线程名¶
如果想给线程设置一个Div的名字呢?:
from threading import Thread, current_thread
def test():
# current_thread()返回当前线程的实例
print(f"ThreadName:{current_thread().name}")
def main():
t1 = Thread(target=test, name="小明")
t2 = Thread(target=test)
t1.start()
t2.start()
t1.join()
t2.join()
# 主线程
print(f"[Main],ThreadName:{current_thread().name}")
if __name__ == '__main__':
main()
输出:(你指定有特点的名字,没指定就使用默认命令【联想古时候奴隶名字都是编号,主人赐名就有名了】)
ThreadName:小明
ThreadName:Thread-1
[Main],ThreadName:MainThread
类的方式创建线程
from threading import Thread
class MyThread(Thread):
def __init__(self, name):
# 设个坑,你可以自行研究下
super().__init__() # 放在后面就报错了
self.name = name
def run(self):
print(self.name)
def main():
t = MyThread(name="小明")
t.start()
t.join()
if __name__ == '__main__':
main()
输出:(和Thread初始化的name冲突了【变量名得注意哦】)
小明
2.1.3.线程池案例¶
from multiprocessing.dummy import Pool as ThreadPool, current_process
def test(i):
# 本质调用了:threading.current_thread
print(f"[编号{i}]{current_process().name}")
def main():
p = ThreadPool()
for i in range(5):
p.apply_async(test, args=(i, ))
p.close()
p.join()
print(f"{current_process().name}")
if __name__ == '__main__':
main()
输出:
[编号0]Thread-3
[编号1]Thread-4
[编号3]Thread-2
[编号2]Thread-1
[编号4]Thread-3
MainThread
微微扩展一下¶
对上面代码,项目里面一般都会这么优化:(并行这块线程后面会讲,不急)
from multiprocessing.dummy import Pool as ThreadPool, current_process
def test(i):
# 源码:current_process = threading.current_thread
print(f"[编号{i}]{current_process().name}")
def main():
p = ThreadPool()
p.map_async(test, range(5))
p.close()
p.join()
print(f"{current_process().name}")
if __name__ == '__main__':
main()
输出:
[编号0]Thread-2
[编号1]Thread-4
[编号2]Thread-3
[编号4]Thread-2
[编号3]Thread-1
MainThread
代码改动很小(循环换成了map)性能提升很明显(密集型操作)
2.1.4.其他扩展¶
Thread初始化参数:
- daemon:是否为后台线程(主线程退出后,后台线程就退出了)
Thread实例对象的方法:
- isAlive(): 返回线程是否活动的
- getName(): 返回线程名
- setName(): 设置线程名
- isDaemon():是否为后台线程
- setDaemon(True):设置后台线程
threading模块提供的一些方法:
- threading.currentThread(): 返回当前的线程实例
- threading.enumerate(): 返回一个包含正在运行的线程List(线程启动后、结束前)
- threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
看一个小案例:
import time
from threading import Thread, active_count
def test1():
print("test1")
time.sleep(1)
print("test1 ok")
def test2():
print("test2")
time.sleep(2)
print("test2 ok")
def main():
t1 = Thread(target=test1)
t2 = Thread(target=test2, daemon=True)
t1.start()
t2.start()
t1.join()
print(active_count())
print(t1.is_alive)
print(t2.is_alive)
# 除非加这一句才等daemon线程,不然主线程退出的时候后台线程就退出了
# t2.join()
if __name__ == '__main__':
main()
下次就以multiprocessing.dummy
模块为例了,API和threading
几乎一样,进行了一些并发的封装,性价比更高
2.2.加强篇¶
其实以前的Linux中
是没有线程这个概念的,Windows
程序员经常使用线程,这一看~方便啊,然后可能是当时程序员偷懒了,就把进程模块改了改(这就是为什么之前说Linux下的多进程编程其实没有Win下那么“重量级”),弄了个精简版进程==>线程
(内核是分不出进程和线程
的,反正PCB
个数都是一样)
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享(全局变量和堆 ==> 线程间共享。进程的栈 ==> 线程平分而独占)
还记得通过current_thread()
获取的线程信息吗?难道线程也没个id啥的?一起看看:(通过ps -Lf pid 来查看LWP
)
回顾:进程共享的内容:(回顾:http://www.cnblogs.com/dotnetcrazy/p/9363810.html)
- 代码(.text)
- 文件描述符(fd)
- 内存映射(mmap)
2.2.1.线程同步~互斥锁Lock¶
线程之间共享数据的确方便,但是也容易出现数据混乱的现象,来看个例子:
from multiprocessing.dummy import threading
num = 0 # def global num
def test(i):
print(f"子进程:{i}")
global num
for i in range(100000):
num += 1
def main():
p_list = [threading.Thread(target=test, args=(i, )) for i in range(5)]
for i in p_list:
i.start()
for i in p_list:
i.join()
print(num) # 应该是500000,发生了数据混乱,结果少了很多
if __name__ == '__main__':
main()
输出:(应该是500000
,发生了数据混乱,只剩下358615
)
子进程:0
子进程:1
子进程:2
子进程:3
子进程:4
452238
Lock案例¶
共享资源+CPU调度==>数据混乱==解决==>线程同步
这时候Lock
就该上场了
互斥锁是实现线程同步最简单的一种方式,读写都加锁(读写都会串行)
先看看上面例子怎么解决调:
from multiprocessing.dummy import threading, Lock
num = 0 # def global num
def test(i, lock):
print(f"子进程:{i}")
global num
for i in range(100000):
with lock:
num += 1
def main():
lock = Lock()
p_list = [threading.Thread(target=test, args=(i, lock)) for i in range(5)]
for i in p_list:
i.start()
for i in p_list:
i.join()
print(num)
if __name__ == '__main__':
main()
输出:time python3 1.thread.2.py
子进程:0
子进程:1
子进程:2
子进程:3
子进程:4
500000
real 0m2.846s
user 0m1.897s
sys 0m3.159s
优化下¶
lock设置为全局或者局部,性能几乎一样。循环换成map后性能有所提升(测试案例在Code中)
from multiprocessing.dummy import Pool as ThreadPool, Lock
num = 0 # def global num
lock = Lock()
def test(i):
print(f"子进程:{i}")
global num
global lock
for i in range(100000):
with lock:
num += 1
def main():
p = ThreadPool()
p.map_async(test, list(range(5)))
p.close()
p.join()
print(num)
if __name__ == '__main__':
main()
输出:
time python3 1.thread.2.py
子进程:0
子进程:1
子进程:3
子进程:2
子进程:4
500000
real 0m2.468s
user 0m1.667s
sys 0m2.644s
本来多线程访问共享资源的时候可以并行,加锁后就部分串行了(没获取到的线程就阻塞等了)
【项目中可以多次加锁,每次加锁只对修改部分加(尽量少的代码) 】(以后会说协程和Actor模型)
补充:以前都是这么写的,现在支持with
托管了(有时候还会用到,所以了解下):【net是直接lock大括号包起来
】
#### 以前写法:
lock.acquire() # 获取锁
try:
num += 1
finally:
lock.release() # 释放锁
#### 等价简写
with lock:
num += 1
扩展知识:(GIL在扩展篇会详说)
- GIL的作用:多线程情况下必须存在资源的竞争,GIL是为了保证在解释器级别的线程唯一使用共享资源(cpu)。
- 同步锁的作用:为了保证解释器级别下的自己编写的程序唯一使用共享资源产生了同步锁
- lock.locked():判断 lock 当前是否上锁,如果上锁,返回True,否则返回False【上锁失败时候的处理】
2.2.2.线程同步~可重入锁RLock¶
看个场景:小明欠小张2000,欠小周5000,现在需要同时转账给他们:(规定:几次转账加几次锁)
小明啥也没管,直接撸起袖子就写Code了:(错误Code示意)
from multiprocessing.dummy import Pool as ThreadPool, Lock
xiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000
def test(lock):
global xiaoming
global xiaozhang
global xiaozhou
# 小明想一次搞定:
with lock:
# 小明转账2000给小张
xiaoming -= 2000
xiaozhang += 2000
with lock:
# 小明转账5000给小周
xiaoming -= 5000
xiaozhou += 5000
def main():
print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")
lock = Lock()
p = ThreadPool()
p.apply_async(test, args=(lock, ))
p.close()
p.join()
print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")
if __name__ == '__main__':
main()
小明写完代码就出去了,这可把小周和小张等急了,打了N个电话来催,小明心想啥情况?
一看代码楞住了,改了改代码,轻轻松松把钱转出去了:
from multiprocessing.dummy import Pool as ThreadPool, Lock
xiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000
# 小明转账2000给小张
def a_to_b(lock):
global xiaoming
global xiaozhang
with lock:
xiaoming -= 2000
xiaozhang += 2000
# 小明转账5000给小周
def a_to_c(lock):
global xiaoming
global xiaozhou
with lock:
xiaoming -= 5000
xiaozhou += 5000
def main():
print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")
lock = Lock()
p = ThreadPool()
p.apply_async(a_to_b, args=(lock, ))
p.apply_async(a_to_c, args=(lock, ))
p.close()
p.join()
print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")
if __name__ == '__main__':
main()
输出:
[还钱前]小明8000,小张3000,小周5000
[还钱后]小明1000,小张5000,小周10000
就这么算了吗?不不不,不符合小明性格,于是小明研究了下,发现~还有个递归锁RLock
呢,正好解决他的问题:
from multiprocessing.dummy import Pool as ThreadPool, RLock # 就把这边换了下
xiaoming = 8000
xiaozhang = 3000
xiaozhou = 5000
def test(lock):
global xiaoming
global xiaozhang
global xiaozhou
# 小明想一次搞定:
with lock:
# 小明转账2000给小张
xiaoming -= 2000
xiaozhang += 2000
with lock:
# 小明转账5000给小周
xiaoming -= 5000
xiaozhou += 5000
def main():
print(f"[还钱前]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")
lock = RLock() # 就把这边换了下
p = ThreadPool()
p.apply_async(test, args=(lock, ))
p.close()
p.join()
print(f"[还钱后]小明{xiaoming},小张{xiaozhang},小周{xiaozhou}")
if __name__ == '__main__':
main()
RLock内部维护着一个Lock和一个counter
变量,counter记录了acquire
的次数,从而使得资源可以被多次require
。直到一个线程所有的acquire都被release
,其他的线程才能获得资源
2.2.3.死锁引入¶
1.多次获取导致死锁¶
小明想到了之前说的(互斥锁Lock
读写都加锁)就把代码拆分研究了下:
print("[开始]小明转账2000给小张")
lock.acquire() # 获取锁
xiaoming -= 2000
xiaozhang += 2000
print("[开始]小明转账5000给小周")
lock.acquire() # 获取锁(互斥锁第二次加锁)
xiaoming -= 5000
xiaozhou += 5000
lock.release() # 释放锁
print("[结束]小明转账5000给小周")
lock.release() # 释放锁
print("[开始]小明转账2000给小张")
输出发现:(第二次加锁的时候,变成阻塞等了【死锁】)
[还钱前]小明8000,小张3000,小周5000
[开始]小明转账2000给小张
[开始]小明转账5000给小周
这种方式,Python提供的RLock就可以解决了
2.常见的死锁¶
看个场景:小明和小张需要流水帐,经常互刷~小明给小张转账1000,小张给小明转账1000
一般来说,有几个共享资源就加几把锁(小张、小明就是两个共享资源,所以需要两把Lock
)
先描述下然后再看代码:
正常流程 小明给小张转1000:小明自己先加个锁==>小明-1000==>获取小张的锁==>小张+1000==>转账完毕
死锁情况 小明给小张转1000:小明自己先加个锁==>小明-1000==>准备获取小张的锁。可是这时候小张准备转账给小明,已经把自己的锁获取了,在等小明的锁(两个人相互等,于是就一直死锁了)
代码模拟一下过程:
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lock
xiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的锁
z_lock = Lock() # 小张的锁
# 小明转账1000给小张
def a_to_b():
global xiaoming
global xiaozhang
global m_lock
global z_lock
with m_lock:
xiaoming -= 1000
sleep(0.01)
with z_lock:
xiaozhang += 1000
# 小张转账1000给小明
def b_to_a():
global xiaoming
global xiaozhang
global m_lock
global z_lock
with z_lock:
xiaozhang -= 1000
sleep(0.01)
with m_lock:
xiaoming += 1000
def main():
print(f"[还钱前]小明{xiaoming},小张{xiaozhang}")
p = ThreadPool()
p.apply_async(a_to_b)
p.apply_async(b_to_a)
p.close()
p.join()
print(f"[还钱后]小明{xiaoming},小张{xiaozhang}")
if __name__ == '__main__':
main()
输出:(卡在这边了)
[转账前]小明5000,小张8000
项目中像这类的情况,一般都是这几种解决方法:(还有其他解决方案,后面会继续说)
- 按指定顺序去访问共享资源
- trylock的重试机制(
Lock(False)
) - 在访问其他锁的时候,先把自己锁解了
- 得不到全部锁就先放弃已经获取的资源
比如上面的情况,我们如果规定,不管是谁先转账,先从小明开始,然后再小张,那么就没问题了。或者谁钱多就谁(权重高的优先)
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lock
xiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的锁
z_lock = Lock() # 小张的锁
# 小明转账1000给小张
def a_to_b():
global xiaoming
global xiaozhang
global m_lock
global z_lock
# 以上次代码为例,这边只修改了这块
with z_lock: # 小张权重高,大家都先获取小张的锁
xiaozhang += 1000
sleep(0.01)
with m_lock:
xiaoming -= 1000
# 小张转账1000给小明
def b_to_a():
global xiaoming
global xiaozhang
global m_lock
global z_lock
with z_lock:
xiaozhang -= 1000
sleep(0.01)
with m_lock:
xiaoming += 1000
def main():
print(f"[转账前]小明{xiaoming},小张{xiaozhang}")
p = ThreadPool()
p.apply_async(a_to_b)
p.apply_async(b_to_a)
p.close()
p.join()
print(f"[转账后]小明{xiaoming},小张{xiaozhang}")
if __name__ == '__main__':
main()
输出:
[转账前]小明5000,小张8000
[转账后]小明5000,小张8000
PS:lock.locked()
:判断 lock 当前是否上锁,如果上锁,返回True,否则返回False【上锁失败时候的处理】
2.2.4.线程同步~条件变量Condition¶
条件变量一般都不是锁,能阻塞线程,从而减少不必要的竞争,Python内置了RLock
(不指定就是RLock)
看看源码:
class Condition:
"""
实现条件变量的类。
条件变量允许一个或多个线程等到另一个线程通知它们为止
如果给出了lock参数而不是None,那必须是Lock或RLock对象作底层锁。
否则,一个新的RLock对象被创建并用作底层锁。
"""
def __init__(self, lock=None):
if lock is None:
lock = RLock()
self._lock = lock
# 设置lock的acquire()和release()方法
self.acquire = lock.acquire
self.release = lock.release
再看看可不可以进行with托管:(支持)
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
看个生产消费者的简单例子:(生产完就通知消费者)
from multiprocessing.dummy import Pool as ThreadPool, Condition
s_list = []
con = Condition()
def Shop(i):
global con
global s_list
# 加锁保护共享资源
for x in range(5):
with con:
s_list.append(x)
print(f"[生产者{i}]生产商品{x}")
con.notify_all() # 通知消费者有货了
def User(i):
global con
global s_list
while True:
with con:
if s_list:
print(f"列表商品:{s_list}")
name = s_list.pop() # 消费商品
print(f"[消费者{i}]消费商品{name}")
print(f"列表剩余:{s_list}")
else:
con.wait()
def main():
p = ThreadPool()
# 两个生产者
p.map_async(Shop, range(2))
# 五个消费者
p.map_async(User, range(5))
p.close()
p.join()
if __name__ == '__main__':
main()
输出:(list之类的虽然可以不加global标示,但是为了后期维护方便,建议加上)
[生产者0]生产商品0
[生产者0]生产商品1
列表商品:[0, 1]
[消费者0]消费商品1
列表剩余:[0]
列表商品:[0]
[消费者0]消费商品0
列表剩余:[]
[生产者0]生产商品2
列表商品:[2]
[消费者1]消费商品2
列表剩余:[]
[生产者0]生产商品3
[生产者1]生产商品0
[生产者0]生产商品4
列表商品:[3, 0, 4]
[消费者1]消费商品4
列表剩余:[3, 0]
[生产者1]生产商品1
[生产者1]生产商品2
[生产者1]生产商品3
[生产者1]生产商品4
列表商品:[3, 0, 1, 2, 3, 4]
[消费者2]消费商品4
列表剩余:[3, 0, 1, 2, 3]
列表商品:[3, 0, 1, 2, 3]
[消费者0]消费商品3
列表剩余:[3, 0, 1, 2]
列表商品:[3, 0, 1, 2]
[消费者1]消费商品2
列表剩余:[3, 0, 1]
列表商品:[3, 0, 1]
[消费者3]消费商品1
列表剩余:[3, 0]
列表商品:[3, 0]
[消费者3]消费商品0
列表剩余:[3]
列表商品:[3]
[消费者3]消费商品3
列表剩余:[]
通知方法:
- notify() :发出资源可用的信号,唤醒任意一条因 wait()阻塞的进程
- notifyAll() :发出资源可用信号,唤醒所有因wait()阻塞的进程
2.2.5.线程同步~信号量Semaphore(互斥锁的高级版)¶
记得当时在分析multiprocessing.Queue
源码的时候,有提到过(点我回顾)
同进程的一样,semaphore
管理一个内置的计数器,每当调用acquire()
时内置函数-1
,每当调用release()
时内置函数+1
通俗讲就是:在互斥锁的基础上封装了下,实现一定程度的并行
举个例子,以前使用互斥锁的时候:(厕所就一个坑位,必须等里面的人出来才能让另一个人上厕所)
使用信号量之后:厕所坑位增加到5个(自己指定),这样可以5个人一起上厕所了==>实现了一定程度的并发
举个例子:(Python在语法这点特别爽,不用你记太多异同,功能差不多基本上代码也就差不多)
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Semaphore
sem = Semaphore(5) # 限制最大连接数为5
def goto_wc(i):
global sem
with sem:
print(f"[线程{i}]上厕所")
sleep(0.1)
def main():
p = ThreadPool()
p.map_async(goto_wc, range(50))
p.close()
p.join()
if __name__ == '__main__':
main()
输出:
可能看了上节回顾的会疑惑:源码里面明明是BoundedSemaphore
,搞啥呢?
其实BoundedSemaphore
就比Semaphore
多了个在调用release()
时检查计数器的值是否超过了计数器的初始值,如果超过了将抛出一个异常
以上一个案例说事:你换成BoundedSemaphore
和上面效果一样==>sem = BoundedSemaphore(5)
Semaphore
补充¶
之前有人问Semaphore
信号量在项目中有什么应用?(⊙o⊙)…额
,这个其实从概念就推出场景了,控制并发嘛~举个例子:
- 比如说我们调用免费API的时候经常看见单位时间内限制并发数在30以内,想高并发==>给钱
( ⊙ o ⊙ )捂脸
- 再比如我们去爬数据的时候控制一下爬虫的并发数(
避免触发反爬虫的一种方式
,其他部分后面会逐步引入)
这些虚的说完了,来个控制并发数的案例,然后咱们就继续并发编程的衍生了:
import time
from multiprocessing.dummy import threading, Semaphore
class MyThread(threading.Thread):
def __init__(self, id, sem):
super().__init__()
self.__id = id
self.__sem = sem
def run(self):
self.__sem.acquire() # 获取
self.api_test()
def api_test(self):
"""模拟api请求"""
time.sleep(1)
print(f"id={self.__id}")
self.__sem.release() # 释放
def main():
sem = Semaphore(10) # 控制并发数
t_list = [MyThread(i, sem) for i in range(1000)]
for t in t_list:
t.start()
for t in t_list:
t.join()
if __name__ == '__main__':
main()
输出图示:
运行分析:
性能全图:
锁专题扩展¶
1.加锁机制¶
在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的,eg:一个线程获取了第一个锁,然后在获取第二个锁的 时候发生阻塞,那么这个线程就可能阻塞其他线程的执行,从而导致整个程序假死。
解决死锁问题的一种方案是为程序中的每一个锁分配一个唯一的id,然后只允许按照升序规则来使用多个锁,当时举了个小明小张转账的简单例子,来避免死锁,这次咱们再看一个案例:(这个规则使用上下文管理器非常简单)
先看看源码,咱们怎么使用:
# 装饰器方法
def contextmanager(func):
"""
方法格式
@contextmanager
def some_generator(<arguments>):
<setup>
try:
yield <value>
finally:
<cleanup>
然后就可以直接使用with托管了
with some_generator(<arguments>) as <variable>:
<body>
"""
@wraps(func)
def helper(*args, **kwds):
return _GeneratorContextManager(func, args, kwds)
return helper
翻译成代码就是这样了:(简化)
from contextlib import contextmanager # 引入上下文管理器
@contextmanager
def lock_manager(*args):
# 先排个序(按照id排序)
args = sorted(args, key=lambda x: id(x))
try:
for lock in args:
lock.acquire()
yield
finally:
# 先释放最后加的锁(倒序释放)
for lock in reversed(args):
lock.release()
基础忘记了可以点我(lambda)
以上面小明小张转账案例为例子:(不用再管锁顺序之类的了,直接全部丢进去:with lock_manager(...)
)
from contextlib import contextmanager # 引入上下文管理器
from multiprocessing.dummy import Pool as ThreadPool, Lock
@contextmanager
def lock_manager(*args):
# 先排个序(按照id排序)
args = sorted(args, key=lambda x: id(x))
try:
for lock in args:
lock.acquire()
yield
finally:
# 先释放最后加的锁(倒序释放)
for lock in reversed(args):
lock.release()
xiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的锁
z_lock = Lock() # 小张的锁
# 小明转账1000给小张
def a_to_b():
global xiaoming
global xiaozhang
global m_lock
global z_lock
print(f"[转账前]小明{xiaoming},小张{xiaozhang}")
with lock_manager(m_lock, z_lock):
xiaoming -= 1000
xiaozhang += 1000
print(f"[转账后]小明{xiaoming},小张{xiaozhang}")
# 小张转账1000给小明
def b_to_a():
global xiaoming
global xiaozhang
global m_lock
global z_lock
print(f"[转账前]小明{xiaoming},小张{xiaozhang}")
with lock_manager(m_lock, z_lock):
xiaozhang -= 1000
xiaoming += 1000
print(f"[转账后]小明{xiaoming},小张{xiaozhang}")
def main():
print(f"[互刷之前]小明{xiaoming},小张{xiaozhang}")
p = ThreadPool()
for _ in range(5):
p.apply_async(a_to_b)
p.apply_async(b_to_a)
p.close()
p.join()
print(f"[互刷之后]小明{xiaoming},小张{xiaozhang}")
if __name__ == '__main__':
main()
输出:
[互刷之前]小明5000,小张8000
[转账前]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账前]小明5000,小张8000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账前]小明4000,小张9000
[转账后]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账前]小明4000,小张9000
[转账前]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账后]小明5000,小张8000
[互刷之后]小明5000,小张8000
再来个验证,在他们互刷的过程中,小潘还了1000元给小明
from time import sleep
from contextlib import contextmanager # 引入上下文管理器
from multiprocessing.dummy import Pool as ThreadPool, Lock
@contextmanager
def lock_manager(*args):
# 先排个序(按照id排序)
args = sorted(args, key=lambda x: id(x))
try:
for lock in args:
lock.acquire()
yield
finally:
# 先释放最后加的锁(倒序释放)
for lock in reversed(args):
lock.release()
xiaopan = 9000
xiaoming = 5000
xiaozhang = 8000
m_lock = Lock() # 小明的锁
z_lock = Lock() # 小张的锁
p_lock = Lock() # 小潘的锁
# 小明转账1000给小张
def a_to_b():
global xiaoming
global xiaozhang
global m_lock
global z_lock
print(f"[转账前]小明{xiaoming},小张{xiaozhang}")
with lock_manager(m_lock, z_lock):
xiaoming -= 1000
xiaozhang += 1000
print(f"[转账后]小明{xiaoming},小张{xiaozhang}")
# 小张转账1000给小明
def b_to_a():
global xiaoming
global xiaozhang
global m_lock
global z_lock
print(f"[转账前]小明{xiaoming},小张{xiaozhang}")
with lock_manager(m_lock, z_lock):
xiaozhang -= 1000
xiaoming += 1000
print(f"[转账后]小明{xiaoming},小张{xiaozhang}")
# 小潘还1000给小明
def c_to_a():
global xiaoming
global xiaopan
global m_lock
global p_lock
print(f"[转账前]小明{xiaoming},小潘{xiaopan}")
with lock_manager(m_lock, p_lock):
xiaopan -= 1000
xiaoming += 1000
print(f"[转账后]小明{xiaoming},小潘{xiaopan}")
def main():
print(f"[互刷之前]小明{xiaoming},小张{xiaozhang},小潘{xiaopan}")
p = ThreadPool()
for _ in range(5):
p.apply_async(a_to_b)
# 在他们互刷的过程中,小潘还了1000元给小明
if _ == 3:
p.apply_async(c_to_a)
p.apply_async(b_to_a)
p.close()
p.join()
print(f"[互刷之后]小明{xiaoming},小张{xiaozhang},小潘{xiaopan}")
if __name__ == '__main__':
main()
输出:
[互刷之前]小明5000,小张8000,小潘9000
[转账前]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账前]小明5000,小张8000
[转账前]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账前]小明4000,小张9000
[转账前]小明4000,小潘9000 # 注意下这个
[转账后]小明5000,小张8000
[转账前]小明5000,小张8000
[转账后]小明4000,小张9000
[转账后]小明5000,小潘8000 # 注意下这个
[转账前]小明5000,小张9000
[转账后]小明6000,小张8000
[转账后]小明5000,小张9000
[转账前]小明6000,小张8000
[转账后]小明6000,小张8000
[互刷之后]小明6000,小张8000,小潘8000
上下文管理器进一步完善¶
from contextlib import contextmanager
from multiprocessing.dummy import threading # or import threading
# ThreadLocal 下节会说
_local = threading.local()
@contextmanager
def acquire(*args):
# 以id将锁进行排序
args = sorted(args, key=lambda x: id(x))
# 确保不违反以前获取的锁顺序
acquired = getattr(_local, 'acquired', [])
if acquired and max(id(lock) for lock in acquired) >= id(args[0]):
raise RuntimeError('锁顺序有问题')
# 获取所有锁
acquired.extend(args)
_local.acquired = acquired # ThreadLocal:每个线程独享acquired
# 固定格式
try:
for lock in args:
lock.acquire()
yield
finally:
# 逆向释放锁资源
for lock in reversed(args):
lock.release()
# 把释放掉的锁给删了
del acquired[-len(args):]
2.哲学家吃面¶
先看看场景:五个外国哲学家到中国来吃饭了,因为不了解行情,每个人只拿了一双筷子,然后点了一大份的面。碍于面子,他们不想再去拿筷子了,于是就想通过脑子来解决这个问题。
每个哲学家吃面都是需要两只筷子的,这样问题就来了:(只能拿自己两手边的筷子)
- 如果大家都是先拿自己筷子,再去抢别人的筷子,那么就都等着饿死了(死锁)
- 如果有一个人打破这个常规,先拿别人的筷子再拿自己的,那么肯定有一个人可以吃到面了
- 5个筷子,意味着最好的情况 ==> 同一时刻有2人在吃(0人,1人,2人)
把现实问题转换成代码就是:
- 哲学家--线程
- 筷子--资源(几个资源对应几把锁)
- 吃完一口面就放下筷子--lock的释放
有了上面基础这个就简单了,使用死锁避免机制解决哲学家就餐问题的实现:(不用再操心锁顺序了)
from contextlib import contextmanager # 引入上下文管理器
from multiprocessing.dummy import Pool as ThreadPool, Lock, current_process as current_thread
# 使用简化版,便于你们理解
@contextmanager
def lock_manager(*args):
# 先排个序(按照id排序)
args = sorted(args, key=lambda x: id(x))
try:
# 依次加锁
for lock in args:
lock.acquire()
yield
finally:
# 先释放最后加的锁(倒序释放)
for lock in reversed(args):
lock.release()
#########################################
def eat(l_lock, r_lock):
while True:
with lock_manager(l_lock, r_lock):
# 获取当前线程的名字
print(f"{current_thread().name},正在吃面")
sleep(0.5)
def main():
resource = 5 # 5个筷子,5个哲学家
locks = [Lock() for i in range(resource)] # 几个资源几个锁
p = ThreadPool(resource) # 让线程池里面有5个线程(默认是cup核数)
for i in range(resource):
# 抢左手筷子(locks[i])和右手的筷子(locks[(i + 1) % resource])
# 举个例子更清楚:i=0 ==> 0,1;i=4 ==> 4,0
p.apply_async(eat, args=(locks[i], locks[(i + 1) % resource]))
p.close()
p.join()
if __name__ == '__main__':
main()
输出图示:
自行拓展¶
1.银行家算法¶
PS:这个一般都是操作系统的算法,了解下就可以了,上面哲学家吃面用的更多一点(欢迎投稿~)
我们可以把操作系统看作是银行家,操作系统管理的资源相当于银行家管理的资金,进程向操作系统请求分配资源相当于用户向银行家贷款。 为保证资金的安全,银行家规定:
- 当一个顾客对资金的最大需求量不超过银行家现有的资金时就可接纳该顾客;
- 顾客可以分期贷款,但贷款的总数不能超过最大需求量;
- 当银行家现有的资金不能满足顾客尚需的贷款数额时,对顾客的贷款可推迟支付,但总能使顾客在有限的时间里得到贷款;
- 当顾客得到所需的全部资金后,一定能在有限的时间里归还所有的资金.
操作系统按照银行家制定的规则为进程分配资源,当进程首次申请资源时,要测试该进程对资源的最大需求量,如果系统现存的资源可以满足它的最大需求量则按当前的申请量分配资源,否则就推迟分配。当进程在执行中继续申请资源时,先测试该进程本次申请的资源数是否超过了该资源所剩余的总量。若超过则拒绝分配资源,若能满足则按当前的申请量分配资源,否则也要推迟分配。
通俗讲就是:当一个进程申请使用资源的时候,银行家算法通过先试探分配给该进程资源,然后通过安全性算法判断分配后的系统是否处于安全状态,若不安全则试探分配作废,让该进程继续等待。
参考链接:
https://www.cnblogs.com/chuxiuhong/p/6103928.html
https://www.cnblogs.com/Lynn-Zhang/p/5672080.html
https://blog.csdn.net/qq_33414271/article/details/80245715
https://blog.csdn.net/qq_37315403/article/details/82179707
2.读写锁¶
Python里面没找到读写锁,这个应用场景也是有的,先简单说说这个概念,你可以结合RLock
实现读写锁(了解下,用到再研究)
读写锁(一把锁):
- 读共享:A加读锁,B、C想要加读锁==>成功(并行操作)
- 写独占:A加写锁,B、C想要读(写)==>阻塞等
- 读写不能同时(写优先级高):A读,B要写,C要读,D要写==>A读了,B在写,C等B写完读,D等C读完写(读写不能同时进行)
扩展参考:
https://www.jb51.net/article/82999.htm
扩展:线程安全¶
上次说了锁相关,把问题稍微汇聚提炼一下~重点在思想,语言无差别
1.安全终止线程¶
正常执行线程任务没什么好说的,可以通过isAlive
判断当前线程状态,对于耗时操作可以设置超时时间t.join(timeout=1)
+重试机制
但是后台线程Thread(daemon=True)
就没那么好控制了:这些线程会在主线程终止时自动销毁。除了如上所示的两个操作,并没有太多可以对线程做的事情(无法结束一个线程,无法给它发送信号,无法调整它的调度,也无法执行其他高级操作)
通用:寻常线程¶
比如说,如果你需要在不终止主线程的情况下杀死线程,那么这个线程就不能通过daemon
的方式了,必须通过编程在某个特定点轮询来退出:
from time import sleep
from multiprocessing.dummy import threading
class MyThread(threading.Thread):
def __init__(self):
self.__running = True
super().__init__()
def terminate(self):
self.__running = False
def run(self):
# 轮询方式必须根据业务来,不然没有意义
while self.__running:
print("do something")
sleep(2)
def main():
t = MyThread()
t.start()
t.terminate() # 调用的时候可以通过`terminate`来结束线程
t.join()
# t.join(timeout=1) # 超时时间
print("over")
if __name__ == '__main__':
main()
输出:(再提醒一下,轮循必须根据业务来,不管是重试机制还是其他,这边只是举个例子)
do something
over
上面这种方式,比较好理解,但是比较依赖threading.Thread
,项目里面一般这么改下:
from time import sleep
from multiprocessing.dummy import threading
class ShutdownTask(object):
def __init__(self):
self.__running = True
def terminate(self):
self.__running = False
def run(self):
# 轮询方式必须根据业务来,不然没有意义
while self.__running:
print("do something")
sleep(2)
def main():
task = ShutdownTask()
t = threading.Thread(target=task.run)
t.start()
task.terminate() # 结束线程
t.join()
print("over")
if __name__ == '__main__':
main()
输出:(ShutdownTask
就解耦了,不依赖threading
库了,你放在进程中使用也没事了)
do something
over
通用:阻塞线程¶
是不是心想着现在都妥妥的了?但是遗憾的是~如果遇到了IO阻塞的情况,轮循形同虚设,这时候就需要超时时间来解决了:
- join(timeout)是一种方式
- socket的超时机制也是一种方式(后面会探讨)
伪代码实现:(加上重试机制更完善)
class IOTask:
def __init__(self):
self.__running = True
def terminate(self):
self.__running = False
def run(self, socket):
socket.settimeout(3) # 设置超时时间
while self.__running:
try:
print("正在忙.....")
socket.recv(8192)
sleep(1)
break
except Exception:
print("超时处理")
break
由于全局解释锁(GIL)的原因,Python 的线程被限制到同一时刻只允许一个线程执行这样一个执行模型。所以,Python 的线程更适用于处理I/O和其他需要并发执行的阻塞操作(比如等待I/O、等待从数据库获取数据等等),而不是需要多处理器并行的计算密集型任务。【这也是为什么我说Python和其他语言并发编程的重点不一样:进程+协程
】
特有:进程安全退出¶
Python进程Process
可以通过:terminate()
or signal
的方式终止:(点我回顾)
terminate
联合signal
进行退出前处理:
from time import sleep
from signal import signal, SIGTERM
from multiprocessing import Process
# 可以释放锁、记录日记之类的操作
def save_data(signalnum, frame):
print(f"[退出前处理]signalnum:{signalnum},frame:{frame}")
exit(0)
def test():
# 信号处理
signal(SIGTERM, save_data)
print("subProcess start")
sleep(2)
print("subProcess over")
def main():
p = Process(target=test)
p.start()
sleep(1)
p.terminate() # 进程结束
p.join()
print("mainProcess over")
if __name__ == '__main__':
main()
输出:
subProcess start
[退出前处理]signalnum:15,frame:<frame object at 0x7f27df6c6210>
mainProcess over
还有一种方式,通过进程间状态共享(点我回顾),实现优雅的退出子进程
2.线程共享安全¶
这块上面说很多了,再介绍几种:
- CAS原子类(Java比较常用)
- Thread Local(常用场景:存各种的连接池)
- Lock,互斥锁,可重入锁(递归锁),信号量,条件变量(上面都在说这些)
在多线程环境下,每个线程都有自己的数据,想要互不干扰又不想定义成局部变量传来传去,怎么办?
一开始是这么解决的:
from multiprocessing.dummy import threading
global_dict = {}
def task1():
# 根据当前线程查找:
global_dict[threading.current_thread()] = 10
global_dict[threading.current_thread()] += 10
def task2():
# 根据当前线程查找:
global_dict[threading.current_thread()] = 10
global_dict[threading.current_thread()] -= 10
def main():
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
t1.join()
t2.join()
print(global_dict)
if __name__ == '__main__':
main()
但这么搞也很麻烦,于是就有了ThreadLocal
:
from multiprocessing.dummy import threading
global_local = threading.local()
def show_name():
print(f"[{threading.current_thread().name}]{global_local.name}")
def task1():
global_local.name = "小明"
show_name()
def task2():
global_local.name = "小张"
show_name()
def main():
t1 = threading.Thread(target=task1)
t2 = threading.Thread(target=task2)
t1.start()
t2.start()
t1.join()
t2.join()
if __name__ == '__main__':
main()
输出:(同样存的是name属性,不同线程间互不影响)
[Thread-1]小明
[Thread-2]小张
导航¶
再来谈谈常用的两种死锁解决思路:(这次不仅仅局限在Python
了)
- "顺序锁"
tryLock
说说顺序锁的算法:hash Sort
(3种情况),先看看几种hash的对比吧:
%time
from multiprocessing.dummy import Lock
m_lock = Lock()
z_lock = Lock()
print(f"是否相等:{m_lock==z_lock}
{m_lock}
{z_lock}") # 地址不一样
%time
m_code = hash(m_lock)
z_code = hash(z_lock)
print(f"是否相等:{m_code==z_code}
{m_code}
{z_code}") # 值一样
%time
from hashlib import sha1
# Java可以使用:identityhashcode
m_code = sha1(str(m_lock).encode("utf-8")).hexdigest()
z_code = sha1(str(z_code).encode("utf-8")).hexdigest()
print(f"是否相等:{m_code==z_code}
{m_code}
{z_code}") # 不相等
%time
m_code = id(m_lock)
z_code = id(z_lock)
print(f"是否相等:{m_code==z_code}
{m_code}
{z_code}") # 不相等
动态死锁¶
如果是一般的顺序死锁,那么程序代码改改逻辑基本上就可以避免了。比如调试的时候就知晓,或者借助类似于jstack
or 开发工具查看:
怕就怕在动态上==>举个例子:(还是小明小张互刷的案例)
有人实践后很多疑问,说明明我就按照顺序加锁了啊,先加转出账号,再加锁转入账号?
其实...换位思考就懂了==>伪代码
def transfer(p_from, p_to, money):
with p_from.lock:
p_from.money -= money
......
with p_to.lock:
p_to += money
这个虽然按照了所谓的顺序,但是转帐人其实在变,也就变成了动态的,所以也会出现死锁:
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lock
class People(object):
def __init__(self, name, money=5000):
self.name = name
self.lock = Lock()
self.money = money # 设置一个初始金额
def transfer(p_from, p_to, money):
with p_from.lock:
p_from.money -= money
sleep(1) # 模拟网络延迟
with p_to.lock:
p_to += money
def main():
xiaoming = People("小明")
xiaozhang = People("小张")
print(f"[互刷前]小明:{xiaoming.money},小张:{xiaozhang.money}")
p = ThreadPool()
p.apply_async(transfer, args=(xiaoming, xiaozhang, 1000))
p.apply_async(transfer, args=(xiaozhang, xiaoming, 1000))
p.close()
p.join()
print(f"[互刷后]小明:{xiaoming.money},小张:{xiaozhang.money}")
if __name__ == '__main__':
main()
输出:(死锁了,联想哲学家吃面~每个人先拿自己的筷子再抢人的筷子)
[互刷前]小明:5000,小张:5000
解决方案~伪代码思路:
def transfer(cls, p_from, p_to, money):
"""p_from:谁转账,p_to:转给谁,money:转多少"""
from_hash = get_hash(p_from)
to_hash = get_hash(p_to)
# 规定:谁大先锁谁
if from_hash > to_hash:
with p_from.lock:
p_from.money -= money
sleep(1) # 模拟网络延迟
with p_to.lock:
p_to.money += money
elif from_hash < to_hash:
with p_to.lock:
p_to.money += money
sleep(1) # 模拟网络延迟
with p_from.lock:
p_from.money -= money
# hash出现碰撞时处理:(可能性很低)
else:
# 平局的时候,大家一起抢一个中间锁,谁抢到谁先转账
with cls.tie_lock:
with p_from.lock:
p_from.money -= money
sleep(1) # 模拟网络延迟
with p_to.lock:
p_to.money += money
完整Python代码示意:
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Lock
class Account(object):
def __init__(self, name, money=5000):
self.name = name
self.lock = Lock()
self.money = money # 设置一个初始金额
class Bank(object):
tie_lock = Lock()
@classmethod
def __get_hash(cls, obj):
return id(obj) # hash_func(obj)
@classmethod
def transfer(cls, p_from, p_to, money):
"""p_from:谁转账,p_to:转给谁,money:转多少"""
from_hash = cls.__get_hash(p_from)
to_hash = cls.__get_hash(p_to)
print(f"from:{p_from.name}to{p_to.name}=>{money}")
# 规定:谁大先锁谁
if from_hash > to_hash:
print("from_hash > to_hash")
with p_from.lock:
p_from.money -= money
sleep(1) # 模拟网络延迟
with p_to.lock:
p_to.money += money
elif from_hash < to_hash:
print("from_hash < to_hash")
with p_to.lock:
p_to.money += money
sleep(1) # 模拟网络延迟
with p_from.lock:
p_from.money -= money
# hash出现碰撞时处理:(可能性很低)
else:
print("from_hash < to_hash")
# 平局的时候,大家一起抢一个中间锁,谁抢到谁先转账
with cls.tie_lock:
with p_from.lock:
p_from.money -= money
sleep(1) # 模拟网络延迟
with p_to.lock:
p_to.money += money
def main():
xiaoming = Account("小明")
xiaozhang = Account("小张")
xiaopan = Account("小潘")
print(f"[互刷前]小明:{xiaoming.money},小张:{xiaozhang.money},小潘{xiaopan.money}")
p = ThreadPool()
for i in range(3):
p.apply_async(Bank.transfer, args=(xiaoming, xiaozhang, 1000))
if i == 1: # 小潘突然间还了1000给小明
p.apply_async(Bank.transfer, args=(xiaopan, xiaoming, 1000))
p.apply_async(Bank.transfer, args=(xiaozhang, xiaoming, 1000))
p.close()
p.join()
print(f"[互刷后]小明:{xiaoming.money},小张:{xiaozhang.money},小潘{xiaopan.money}")
if __name__ == '__main__':
main()
输出:
[互刷前]小明:5000,小张:5000,小潘5000
from:小明to小张=>1000
from_hash < to_hash
from:小张to小明=>1000
from:小明to小张=>1000
from_hash > to_hash
from_hash < to_hash
from:小潘to小明=>1000
from_hash < to_hash
from:小张to小明=>1000
from:小明to小张=>1000
from_hash > to_hash
from_hash < to_hash
from:小张to小明=>1000
from_hash > to_hash
[互刷后]小明:6000,小张:5000,小潘4000
Python
上下文管理器我就不说了,上面说过了,思路和“顺序锁”基本一样:
from contextlib import contextmanager
from multiprocessing.dummy import threading # or import threading
_local = threading.local()
@contextmanager
def acquire(*args):
# 以id将锁进行排序
args = sorted(args, key=lambda x: id(x))
# 确保不违反以前获取的锁顺序
acquired = getattr(_local, 'acquired', [])
if acquired and max(id(lock) for lock in acquired) >= id(args[0]):
raise RuntimeError('锁顺序有问题')
# 获取所有锁
acquired.extend(args)
_local.acquired = acquired # ThreadLocal:每个线程独享acquired
# 固定格式
try:
for lock in args:
lock.acquire()
yield
finally:
# 逆向释放锁资源
for lock in reversed(args):
lock.release()
# 把释放掉的锁给删了
del acquired[-len(args):]
活锁¶
大家都听说过死锁deadlock,但是很少有人听说过活锁livelock。活锁主要由两个线程过度谦让造成,两个线程都想让对方先干话,结果反而都无法继续执行下去。因为两个线程都在活跃状态,故称活锁。
trylock¶
trylock
可以解决死锁问题,但是用不好也会出现少见的活锁问题:
from time import sleep
from random import random
from multiprocessing.dummy import Pool as ThreadPool, Lock
class People(object):
def __init__(self, name, money=5000):
self.name = name
self.lock = Lock() # 非阻塞等
self.money = money # 设置一个初始金额
def transfer(p_from, p_to, money):
flag = True
while flag:
# 尝试获取p_from.lock
if p_from.lock.acquire(False): # 非阻塞
try:
sleep(1) # 模拟网络延迟
# 尝试获取p_to.lock
if p_to.lock.acquire(False):
try:
p_from.money -= money
p_to.money += money
flag = False
finally:
print("p_to release")
p_to.lock.release() # 释放锁
finally:
p_from.lock.release() # 释放锁
sleep(random()) # 随机睡[0,1)s
def main():
xiaoming = People("小明")
xiaozhang = People("小张")
xiaopan = People("小潘")
print(f"[互刷前]小明:{xiaoming.money},小张:{xiaozhang.money},小潘:{xiaopan.money}")
p = ThreadPool()
for i in range(3):
p.apply_async(transfer, args=(xiaoming, xiaozhang, 1000))
if i == 1:
p.apply_async(transfer, args=(xiaopan, xiaoming, 1000))
p.apply_async(transfer, args=(xiaozhang, xiaoming, 1000))
p.close()
p.join()
print(f"[互刷后]小明:{xiaoming.money},小张:{xiaozhang.money},小潘:{xiaopan.money}")
if __name__ == '__main__':
main()
输出:(没有sleep(random()) # 随机睡[0,1)s
就是一个活锁了)
[互刷前]小明:5000,小张:5000,小潘:5000
p_to release
p_to release
p_to release
p_to release
p_to release
p_to release
[互刷后]小明:6000,小张:5000,小潘:4000
可以思考一下,为什么trylock
的时候p_from.money -= money
和p_to.money += money
都要放在code最里面
参考链接:
守护线程参考:https://www.cnblogs.com/brolanda/p/4709947.html
Posix Thread:https://www.cnblogs.com/randyniu/p/9189112.html
一句话实现并行:http://chriskiehl.com/article/parallelism-in-one-line
进程与线程的一个简单解释:http://www.ruanyifeng.com/blog/2013/04/processes_and_threads.html
线程分离方面的参考:
http://python.su/forum/topic/20403/
https://stackoverflow.com/questions/14175016/python-pthread-detach-analog
https://stackoverflow.com/questions/11904848/what-is-the-difference-between-a-detached-thread-and-a-daemon-thread
线程锁参考:
https://www.cnblogs.com/nuomin/p/7899675.html
https://blog.csdn.net/alina_catty/article/details/78792085
https://mozillazg.com/2016/09/python-threading-multiprocessing-logging-equal-deadlock.html
死锁调试参考:
https://blog.alswl.com/2013/11/python-gdb
https://wiki.python.org/moin/DebuggingWithGdb
http://www.blogjava.net/stone2083/archive/2013/08/19/403028.html
https://stackoverflow.com/questions/1289124/python-equivalent-of-jstack
https://mozillazg.com/2016/09/python-threading-multiprocessing-logging-equal-deadlock.html
https://stackoverflow.com/questions/132058/showing-the-stack-trace-from-a-running-python-application
使用ctypes强行杀掉线程:https://blog.csdn.net/vinsuan1993/article/details/78158589
老外对杀死子线程的探讨:
https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python
2.2.6.线程同步~Queue-引入¶
线程同步这块,之前讲了锁系列,现在把剩下的也说说
Queue大家都很熟悉,应用场景很多很多,不仅仅局限在线(进)程同步,很多业务场景都在使用。
在开始之前先看一个秒杀场景:一般都使用乐观锁,也就是大家经常提的CAS机制来实现,数据所在的内存值,预期值,新值。当需要更新时,判断当前内存值与之前取到的值是否相等,若相等,则用新值更新,若失败则不断重试(sleep(random)
)
从数据库层面控制就是这样:(原子性操作)
update table set amout=amout-#{buys}, version=version+1 where id=#{id} and version=#{version}
or
update table set amout=amout-#{buys} where id=#{id} and amout-#{buys}>=0
我们用代码模拟一下:(Python里面没有CountDownLatch
,我们用之前学的条件变量实现一个)
# 模拟Java里的CountDownLatch(条件变量模拟)
# 可以理解为赛跑,当运动员全部准备好了,裁判一枪下去,开始比赛
class CountDownLatch(object):
def __init__(self):
self.con = Condition() # 条件变量
def wait(self):
with self.con:
self.con.wait()
def countDown(self):
with self.con:
self.con.notify_all() # 开枪(唤醒所有线程)
模拟:
count = 100 # 库存100件
class MyThread(threading.Thread):
def __init__(self, id, con):
self.id = id
self.con = con
super().__init__()
def run(self):
global count
self.con.wait()
if count > 0: # if count - 1 >= 0:
count -= 1
print(f"线程{self.id}~抢到一件商品")
def main():
con = CountDownLatch() # 条件变量
t_list = [MyThread(id=i, con=con) for i in range(1000)]
for t in t_list:
t.start()
print("准备开抢")
con.countDown() # 唤醒所有
for t in t_list:
t.join()
print(f"剩余库存{count}")
if __name__ == '__main__':
main()
输出:(没错,没用锁一样高并发~)
线程42~抢到一件商品
线程49~抢到一件商品
线程50~抢到一件商品
线程63~抢到一件商品
线程84~抢到一件商品
线程113~抢到一件商品
线程135~抢到一件商品
线程161~抢到一件商品
线程183~抢到一件商品
线程220~抢到一件商品
线程271~抢到一件商品
线程278~抢到一件商品
线程302~抢到一件商品
线程359~抢到一件商品
线程379~抢到一件商品
....
线程10~抢到一件商品
线程18~抢到一件商品
线程23~抢到一件商品
线程26~抢到一件商品
线程33~抢到一件商品
线程44~抢到一件商品
线程52~抢到一件商品
线程53~抢到一件商品
线程158~抢到一件商品
线程177~抢到一件商品
线程227~抢到一件商品
线程289~抢到一件商品
线程15~抢到一件商品
线程37~抢到一件商品
线程134~抢到一件商品
线程212~抢到一件商品
线程72~抢到一件商品
线程305~抢到一件商品
线程365~抢到一件商品
剩余库存0
real 0m0.189s
user 0m0.161s
sys 0m0.101s
如果你把if count > 0:
注释掉:(瞬间呵呵哒了)
剩余库存-900
real 0m0.215s
user 0m0.188s
sys 0m0.088s
如果你在修改的时候加个锁:
real 0m0.195s
user 0m0.157s
sys 0m0.100s
在这里说,其实没有多大意义,了解下即可(数据库最大连接数是有瓶颈的,后端项目里面一般都是使用缓存的CAS机制
,比如Redis
的watch
、memcached
的gets
和cas
,还有就是我们下面要介绍的Queue
了)
后面会说,引入部分不用深究,记住两个即可:
- 数据库层面的CAS机制(乐观锁)
- Java里面
CountDownLatch
的模拟
2.2.7.线程同步~Queue¶
1.基本使用¶
Queue在讲进程的时候就有说过(进程间通信),线程用法也差不多,看个经典案例:
import time
import random
from multiprocessing.dummy import Pool as ThreadPool, Queue
def consumer(q, i):
while True:
data = q.get()
print(f"[消费者{i}]商品{data}抢光了")
def producer(q):
while True:
num = random.random()
q.put(num)
print(f"[生产者]商品{num}出厂了
")
time.sleep(num)
def main():
q = Queue(10) # 为了演示,我这边限制一下
pool = ThreadPool()
# 一个生产者
pool.apply_async(producer, args=(q,))
# 两个消费者
pool.apply_async(consumer, args=(q, 1))
pool.apply_async(consumer, args=(q, 2))
pool.close()
pool.join()
if __name__ == '__main__':
main()
输出图示:(非阻塞可以使用put_nowait
和get_nowait
)
2.源码探讨¶
Queue是线程安全的放心使用,我们来看看Queue源码:(条件变量Condition
和Lock
的综合使用)
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock() # 三个Condition公用
# get的时候使用,如果队列空了就等待
self.not_empty = threading.Condition(self.mutex)
# put的时候使用,如果队列满了就等待
self.not_full = threading.Condition(self.mutex)
# 每当未完成任务的数量降至零时,通知所有线程
self.all_tasks_done = threading.Condition(self.mutex)
self.unfinished_tasks = 0 # 未完成任务
def put(self, item, block=True, timeout=None):
# 如果队列满了就等待
# self.not_full = threading.Condition(self.mutex)
with self.not_full:
if self.maxsize > 0:
if not block:
if self._qsize() >= self.maxsize:
raise Full
elif timeout is None:
while self._qsize() >= self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while self._qsize() >= self.maxsize:
remaining = endtime - time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
def get(self, block=True, timeout=None):
# 如果队列空了就等待
# self.not_empty = threading.Condition(self.mutex)
with self.not_empty:
if not block:
if not self._qsize():
raise Empty
elif timeout is None:
while not self._qsize():
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a non-negative number")
else:
endtime = time() + timeout
while not self._qsize():
remaining = endtime - time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
item = self._get()
self.not_full.notify()
return item
3.多任务调度¶
来个场景,厂家倒闭(任务列表完成了)怎么通知消费者不用等待了?
回顾一下使用协程是怎么解决的:协程yield实现多任务调度
def consumer():
status = ""
while True:
tmp = yield status
if not tmp:
print("消费者已经睡觉了...")
return
print("消费者:获得商品%s号..." % tmp)
status = "ok"
def produce(c):
# 启动消费者
c.send(None)
for i in range(1, 3):
print("生产者:出产商品%s号..." % i)
# 生产商品,并提交给消费者
status = c.send(i)
print("生产者:生产者消费状态: %s" % status)
# c.send(None) 执行这个会引发StopIteration
c.close() # 使用close就可以避免了(手动关闭生成器函数,后面的调用会直接返回StopIteration异常)
if __name__ == '__main__':
# 创建消费者
c = consumer()
produce(c)
输出:
生产者:出产商品1号...
消费者:获得商品1号...
生产者:生产者消费状态: ok
生产者:出产商品2号...
消费者:获得商品2号...
生产者:生产者消费状态: ok
当使用Queue
时,协调生产者和消费者的关闭问题可以在队列中放置一个特殊的值,当消费者读到这个值的时候,终止执行:
import time, random, uuid
from multiprocessing.dummy import Pool as ThreadPool, Queue
stop_obj = uuid.uuid1() # 获取UUID(GUID)
def consumer(q, i):
while True:
data = q.get()
if data == stop_obj:
print(f"[消费者{i}]光荣退伍了")
q.put(data) # 如果不加这个,其他消费者就不知道了(Queue里面的数据取出来就没了)
break
print(f"[消费者{i}]商品{data}抢光了")
def producer(q):
for i in range(10):
num = random.random()
q.put(num)
print(f"[生产者]商品{num}出厂了")
time.sleep(num)
q.put(stop_obj) # 发送结束命令
def main():
q = Queue(10) # 为了演示,我这边限制一下
pool = ThreadPool()
# 一个生产者
pool.apply_async(producer, args=(q,))
# 两个消费者
pool.apply_async(consumer, args=(q, 1))
pool.apply_async(consumer, args=(q, 2))
pool.close()
pool.join()
if __name__ == '__main__':
main()
如果读到特殊值没有再放进队列就不能保证所有消费者都退出任务~Queue里面的数据取出来就没了 输出:(你可以把上面那句注释调看结果)
[生产者]商品0.33594145145041265出厂了
[消费者1]商品0.33594145145041265抢光了
[生产者]商品0.49907511942411487出厂了
[消费者1]商品0.49907511942411487抢光了
[生产者]商品0.6875075709064151出厂了
[消费者2]商品0.6875075709064151抢光了
[生产者]商品0.4039336126048405出厂了
[消费者1]商品0.4039336126048405抢光了
[生产者]商品0.4339014739644075出厂了
[消费者2]商品0.4339014739644075抢光了
[生产者]商品0.7101415304586235出厂了
[消费者1]商品0.7101415304586235抢光了
[生产者]商品0.39303515351899出厂了
[消费者2]商品0.39303515351899抢光了
[生产者]商品0.07572426360227902出厂了
[消费者1]商品0.07572426360227902抢光了
[生产者]商品0.8054064710812884出厂了
[消费者2]商品0.8054064710812884抢光了
[生产者]商品0.8085151230789658出厂了
[消费者1]商品0.8085151230789658抢光了
[消费者2]光荣退伍了
[消费者1]光荣退伍了
在上面案例里面,你把uuid.uuid1()
换成object()
,然后比较部分的==
换成is
也是可以的,但是分布式系统的话还是使用UUID
吧
4.自定义Queue¶
如果想在Queue
的基础上扩展,可以自定义数据结构并添加所需的锁和同步机制(eg:Condition
)来实现线程间通信(同步)
写demo前说说理论:
- 二叉树 ==> 每个节点最多有两个子树的树结构
- 满二叉树 ==> 除了最底层叶结点外,每一个结点都有左右子叶
- 二叉堆 ==> 本质上是一种完全二叉树,它分为两个类型:
- 最大堆:最大堆任何一个父节点的值,都大于等于它左右子节点的值,根节点是最大值
- 最小堆:最小堆任何一个父节点的值,都小于等于它左右子节点的值,根节点是最小值
以最小堆为例,画个图演示一下:
插入新节点
排序后的二叉树
准备删除节点2
把最后一个节点拿过来充数(维护二叉树稳定)
进行比较排序,把左右节点最小的拉上来
构建二叉堆:把一个无序的完全二叉树调整为二叉堆(让所有非叶子节点依次下沉
)
来个乱序的二叉树
从最后一个非叶子节点开始,和最小的子节点交换位置(8和1交换)
右边的也走一波(6和4交换)
节点5和1互换
现在根节点最小了(3和1互换)
从上往下再排个序,这时候就是最小堆了
看个完全二叉树
的规律:若从上至下、从左至右编号,则编号为i的结点:
- 左孩子编号为
2i+1
,其右孩子编号=2i+2
- 父节点编号=
i/2
(根节点没有父节点)
把上面二叉树转换成数组:
这时候再去理解优先队列就简单了:
- 最大优先队列,无论入队顺序,当前最大的元素优先出队
- 最小优先队列,无论入队顺序,当前最小的元素优先出队
Python提供了一个heapq
的模块:https://docs.python.org/3/library/heapq.html
来看个最小二叉堆的案例:
%time
import heapq
h_list = []
# 来个乱序的二叉树(和图示一样)
for i in [3, 5, 6, 8, 2, 4, 7, 1, 9]:
heapq.heappush(h_list, i) # 构建最小二叉堆
# 弹出最小值
heapq.heappop(h_list) # 查看堆中最小值,不弹出 heap[0]
%time
import heapq
h_list = []
# 堆元素可以是元组,可以拓展优先级的概念
heapq.heappush(h_list, (9,"小明"))
heapq.heappush(h_list, (5,"小张"))
heapq.heappush(h_list, (7,"小周"))
heapq.heappush(h_list, (3,"小潘"))
heapq.heappop(h_list) # 弹出优先级最低的
举个使用Condition
+二叉堆
实现一个优先级队列:
import heapq
from uuid import uuid1
from multiprocessing.dummy import Pool as ThreadPool, Condition
class MaxPriorityQueue(object):
"""自定义一个最大优先队列"""
def __init__(self):
self.__h_list = []
self.__con = Condition() # 条件变量
self.__index = 0 # 索引
def put(self, value, sort=0):
with self.__con:
# heapq是最小二叉堆,优先级取负就是最大二叉堆了
heapq.heappush(self.__h_list, (-sort, self.__index, value))
self.__index += 1
self.__con.notify() # 随机通知一个阻塞等的线程
def get(self):
with self.__con:
while 1:
# 0 => False
if not self.qsize():
self.__con.wait() # 列表为空则阻塞等
return heapq.heappop(self.__h_list)[-1] # 返回元组最后一个元素(value)
def qsize(self):
return len(self.__h_list)
stop_obj = uuid1() # 获取UUID(GUID)
def task_put(queue):
queue.put("小周", 5)
queue.put("小潘", 7)
queue.put("小明", 3)
queue.put("小张", 9)
global stop_obj
queue.put(stop_obj)
def task_get(queue):
global stop_obj
# 全部读出来
while 1:
data = queue.get()
if data == stop_obj:
print("光荣退伍了")
queue.put(stop_obj) # 保证其他消费者也能安全退出
break
print(data)
if __name__ == '__main__':
queue = MaxPriorityQueue()
pool = ThreadPool()
pool.apply_async(task_get, args=(queue,))
pool.apply_async(task_put, args=(queue,))
pool.close()
pool.join()
输出:
小张
小潘
小周
小明
光荣退伍了
浅谈multiprocessing
、multiprocessing.dummy
、threading
¶
multiprocessing.dummy
上面只列举了常用的模块,Queue这块就两个:Queue
和JoinableQueue
。既然提到了就顺便说几句,之前写进程篇的时候因为外出,急急忙忙就收尾了,像上面的Semaphore
和Condition
以及下面准备说的Event
和Barrier
等进程和线程都是通用的
如果要是非要找点不同,那么Queue这块还真有点不同,eg:Queue
里面没有task_done
和join
方法,而JoinableQueue
扩展了,而线程的Queue
是有task_done
和join
的,其他常用的进程api和线程基本上一样,用到的时候查下源码或者看看官方文档即可~
进程的Queue
与JoinableQueue
:
线程的Queue
:
threading
:
__all__ = [
'get_ident', 'active_count', 'Condition', 'current_thread', 'enumerate',
'main_thread', 'TIMEOUT_MAX', 'Event', 'Lock', 'RLock', 'Semaphore',
'BoundedSemaphore', 'Thread', 'Barrier', 'BrokenBarrierError', 'Timer',
'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size'
]
multiprocessing.dummy
:
__all__ = [
'Process', 'current_process', 'active_children', 'freeze_support',
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
'Event', 'Barrier', 'Queue', 'Manager', 'Pipe', 'Pool', 'JoinableQueue'
]
multiprocessing.dummy
可以理解为multiprocessing
的轻量级并发库:api基本上和multiprocessing
一致,很多都是在threading
的基础上修改下或者直接使用(multiprocessing
在Process
基础上修改)比如:
# 被轻量化了,本质还是线程
# Process模块:Process = DummyProcess(threading.Thread)
# 这就是为什么前面的代码的都是 as ThreadPool,这是怕和Process一起使用的时候把你们带坑里
# Pool:multiprocessing.pool.ThreadPool(processes, initializer, initargs)
# 为了和进程api使用起来一致
# current_process:current_process = threading.current_thread
# 再看看导入的模块就知道dummy的本质了:
from threading import Lock, RLock, Semaphore, BoundedSemaphore
from threading import Event, Condition, Barrier
from queue import Queue
5.其他Queue类型(看看就好,完全可以自己封装)¶
1.优先级队列:PriorityQueue¶
看看内部实现:(比我们实现的还精简,秒懂)
class PriorityQueue(Queue):
'''以优先级顺序检索打开条目的队列的变体(最低的第一个)
item通常是以下形式的元组:(优先级编号,数据)'''
def _init(self, maxsize):
self.queue = []
def _qsize(self):
return len(self.queue)
def _put(self, item):
heapq.heappush(self.queue, item)
def _get(self):
return heapq.heappop(self.queue)
看个上面MaxPriorityQueue
的案例:(想要大数字优先级高就变负数)
from uuid import uuid1
from queue import PriorityQueue
from multiprocessing.dummy import Pool as ThreadPool
stop_obj = uuid1() # 获取UUID(GUID)
def task_put(queue):
queue.put((-5, "小周"))
queue.put((-7, "小潘"))
queue.put((-3, "小明"))
queue.put((-9, "小张"))
global stop_obj
# 可以思考一下为什么用0,如果按照小到大的顺序又该如何设置呢?
queue.put((0, stop_obj))
def task_get(queue):
global stop_obj
# 全部读出来
while 1:
data = queue.get()
if data[-1] == stop_obj:
print("光荣退伍了")
queue.put((0, stop_obj)) # 保证其他消费者也能安全退出
break
print(data[-1])
def error_print(msg):
print(msg)
if __name__ == '__main__':
queue = PriorityQueue()
pool = ThreadPool()
pool.apply_async(task_get, args=(queue, ), error_callback=error_print)
pool.apply_async(task_put, args=(queue, ), error_callback=error_print)
pool.close()
pool.join()
输出:(如果功能不够用还是自己设计吧,设计的太简单调用的时候会比较麻烦)
小张
小潘
小周
小明
光荣退伍了
2.后进先出队列:LifoQueue¶
一看好像很高大上,翻翻源码:(其实就是基于List封装了个类,看来multiprocessing.dummy
重写这个是有原因的)
class LifoQueue(Queue):
def _init(self, maxsize):
self.queue = []
def _qsize(self):
return len(self.queue)
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
看个使用案例:(完全可以直接使用List...)
from queue import LifoQueue
def main():
queue = LifoQueue()
for i in range(10):
queue.put(i)
for i in range(queue.qsize()):
print(queue.get())
if __name__ == '__main__':
main()
输出:
9
8
7
6
5
4
3
2
1
0
other¶
SimpleQueue
就不说了,和Queue使用基本上一样。线程和进程有点不一样,注意下:(进程间通信手段毕竟比线程少
)
threading
中的SimpleQueue
是FIFO
简单队列multiprocessing
中的SimpleQueue
是在PIPE
管道的基础上封装版
JoinableQueue
在multiprocessing.dummy
就是Queue
:(等会直接使用Queue
即可)
# multiprocessing/dummy/__init__.py
from queue import Queue
JoinableQueue = Queue
相关源码:(下面会和Queue
对比举例)
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
self.mutex = threading.Lock()
self.not_empty = threading.Condition(self.mutex)
self.not_full = threading.Condition(self.mutex)
self.all_tasks_done = threading.Condition(self.mutex)
# 进程在这边使用的是Semaphore
self.unfinished_tasks = 0
def task_done(self):
with self.all_tasks_done:
unfinished = self.unfinished_tasks - 1
if unfinished <= 0:
if unfinished < 0:
raise ValueError('task_done() called too many times')
self.all_tasks_done.notify_all()
self.unfinished_tasks = unfinished
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
在multiprocessing
中的Queue
没有task_done
和join
方法,所以有了JoinableQueue
:
# multiprocessing/queues.py
class JoinableQueue(Queue):
def __init__(self, maxsize=0, *, ctx):
Queue.__init__(self, maxsize, ctx=ctx)
self._unfinished_tasks = ctx.Semaphore(0)
self._cond = ctx.Condition()
def task_done(self):
with self._cond:
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()
def join(self):
with self._cond:
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()
6.Queue拓展¶
使用队列来进行线程间通信是一个单向、不确定的过程。通常情况下,没法知道接收数据的线程是什么时候接收到数据并开始工作的。这时候就可以使用Queue
提供的task_done()
和join()
了~
之前通知消费者退出是使用发一个消息的方式,这次换种思路~直接设置后台线(进)程,然后使用Queue
的join
方法:
from multiprocessing.dummy import threading, Queue
def consumer(queue):
while 1:
data = queue.get()
print(f"[消费者]消费商品{data}号")
# 通知Queue完成任务了
queue.task_done()
def producer(queue):
for i in range(10):
print(f"[生产者]生产商品{i}号")
queue.put(i)
def main():
queue = Queue()
# 开启生产消费者线程任务
t_list = [
threading.Thread(target=func, args=(queue, ))
for func in (producer, consumer)
]
# 启动两个线程
for t in t_list:
# 设置后台线程,就算是死循环当主线程退出的时候也会退出的
t.setDaemon(True) # 进程是daemon属性,t.daemon=True
t.start()
# 等待所有任务完成
queue.join() # 你可以把这句话注释掉看输出
print(f"当前队列未完成的数量:{queue.unfinished_tasks}")
if __name__ == '__main__':
main()
输出:
[生产者]生产商品0号
[生产者]生产商品1号
[消费者]消费商品0号
[生产者]生产商品2号
[消费者]消费商品1号
[生产者]生产商品3号
[消费者]消费商品2号
[生产者]生产商品4号
[消费者]消费商品3号
[生产者]生产商品5号
[消费者]消费商品4号
[生产者]生产商品6号
[消费者]消费商品5号
[生产者]生产商品7号
[消费者]消费商品6号
[生产者]生产商品8号
[消费者]消费商品7号
[生产者]生产商品9号
[消费者]消费商品8号
[消费者]消费商品9号
当前队列未完成的数量:0
进程案例见:/BaseCode/tree/master/python5.concurrent/Thread/2.lock_queue/3.queue/6.JoinableQueue.py
PS:其实Queue的完整写法应该是每次收到消息的时候调用一下q.task_done()
,便于记录未完成状态,大家进程的Queue
用多了,也就不太写了。现在task_done
讲过了,以后用线程的Queue
和进程的JoinableQueue
记得加上哦~
再扩展一下,看看queue.join
源码:(如果还不清楚,下面还有一个手写线程池的demo)
def join(self):
# Condition条件变量
with self.all_tasks_done:
# 如果还有没有完成的任务就调用Condition的wait()方法
while self.unfinished_tasks:
self.all_tasks_done.wait()
Queue对象的方法:
q.full()
:判断队列是否已满q.empty()
:判断队列是否为空q.qsize()
:返回当前队列中的元素个数q.get_nowait()
:非阻塞获取消息,等价于q.get(block=Flase)
q.put_nowait()
:非阻塞发送消息,等价于q.put(block=Flase)
q.join()
:等待所有任务完成q.task_done()
:在Queue中标记任务完成
PS:q.qsize()
、q.full()
、q.empty()
等方法可以获取一个队列的当前大小和状态。但要注意,这些方法都不是线程安全的。
可能你对一个队列使用empty()
判断出这个队列为空,但同时另外一个线程可能已经向这个队列中插入一个数据项。所以,你最好不要在你的代码中使用这些方法。
queue模块定义的异常类:
queue.Full
:非阻塞发送消息时,如果队列满了~抛异常queue.Empty
:非阻塞获取消息时,如果队列为空~抛异常
eg:
try:
data = q.get_nowait() # get(timeout=5)
except queue.Empty:
pass
基于简单队列编写多线程程序在线程安全队列的底层实现来看,你无需在你的代码中使用锁和其他底层的同步机制,使用队列这种基于消息的通信机制可以被扩展到更大的应用范畴,比如,你可以把你的程序放入多个进程甚至是分布式系统而无需改变底层的队列结构。
使用线程队列有一个要注意的问题:向队列中添加数据项时并不会复制此数据项,线程间通信实际上是在线程间传递对象引用。如果担心对象的共享状态,那最好只传递不可修改的数据结构(如:整型、字符串或者元组)或者一个对象的深拷贝copy.deepcopy(data)
7.使用Queue实现一个线程池¶
和网络整合版的线程池后面再说,ThreadPoolExecutor
深入篇后会说,先模仿官方Pool
来个精简版:
from multiprocessing.dummy import threading, Queue
class Task(threading.Thread):
def __init__(self, queue):
super().__init__()
self.queue = queue
self.setDaemon(True) # 设置后台线程,主线程结束就终止
self.start() # 开启线程,执行run方法
print(f"开启一个线程~{self.name}")
def run(self):
func, args, kws = self.queue.get()
try:
func(args, kws)
except Exception as ex:
print(ex)
finally:
self.queue.task_done()
class ThreadPool(object):
def __init__(self, count=0):
# 设置Pool运行状态
self.running = True
from os import cpu_count # 用到的时候导入对应模块即可
# 默认是CPU核数,且至少有一个线程
if count <= 0:
count = cpu_count() or 1
# 设置线程数
self.queue = Queue(count)
# 启动对应个数的线程
for _ in range(count):
Task(self.queue) # 不能在这直接启动,会阻塞Pool的
def apply_async(self, func, args=(), kws={}):
if self.running:
# 执行任务
self.queue.put((func, args, kws))
def close(self):
# 不再运行加入任务
self.running = False
def join(self):
# 等待任务执行完退出
self.queue.join()
调用和官方风格一致:
def call_dad(*args, **kws):
from time import sleep
from random import randint
n = randint(1, 2) # [1,2]
print(f"休息{n}s")
sleep(n)
print(f"{args}~{kws}")
def main():
pool = ThreadPool()
pool.apply_async(call_dad, args=(1, 2, 3), kws={"dad": "小明"})
pool.apply_async(call_dad, args=(1, 2, 3), kws={"dad": "小张"})
pool.close()
pool.join()
if __name__ == '__main__':
main()
输出:(有些偶尔用的模块可以用的时候再导入【别放循环里,虽然重复导入模块不怎么耗时,但是总归有损耗的】)
开启一个线程~Thread-1
开启一个线程~Thread-2
开启一个线程~Thread-3
开启一个线程~Thread-4
休息1s
休息2s
((1, 2, 3), {'dad': '小明'})~{}
((1, 2, 3), {'dad': '小张'})~{}
2.2.8.线程同步~Event¶
1.初识¶
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就比较麻烦。这时候我们就可以使用Event
了~eg:(类比JQ里面的事件~eg:单击事件)
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Event
event = Event()
def click():
# event.clear() # 设置标准为假(默认是False)
print("用户在修改网页表单")
sleep(2)
print("点击了修改案例")
event.set() # 设置标准为真
def update():
print(f"事件状态:{event.is_set()}")
event.wait() # 等待到标志为真
print("修改成功")
print(f"事件状态:{event.is_set()}")
def main():
pool = ThreadPool()
pool.apply_async(click)
pool.apply_async(update)
pool.apply_async(click)
pool.close()
pool.join()
if __name__ == '__main__':
main()
输出:
用户在修改网页表单
事件状态:False
用户在修改网页表单
点击了修改案例
点击了修改案例
修改成功
事件状态:True
常用方法:
event.clear()
:恢复event的状态值为False(并发场景下有大用)event.wait()
:如果event.is_set()==False
将阻塞线程event.set()
: 设置event
的状态值为True
,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度event.is_set()
:返回event
的状态值(如果想非阻塞等可以使用这个先判断)线程有个重命名的方法叫isSet
。PS:进程线程中都有is_set
方法
2.案例¶
Event
对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生:
- 在初始情况下,
Event
对象中的信号标志被设置为假。等待Event
对象的线程将会被一直阻塞至标志为真。 - 当一个线程将一个
Event
对象的信号标志设置为真,它将唤醒所有等待这个Event
对象的线程。等待Event
的线程将忽略这个事件, 继续执行
再来个简单版的生产消费者的案例:
from time import sleep
from random import random
from multiprocessing.dummy import Pool as ThreadPool, Event
global_list = []
event = Event()
stop_event = Event()
n = 0
def consumer(i):
print(f"消费者{i}等待ing")
while 1:
event.wait()
count = len(global_list)
# 防止List空的时候pop出错
if count > 0:
print(f"消费了产品{global_list.pop()}")
# 重置状态(加这一句能减少很多次循环)
event.clear() # 可以思考一下为什么(提示:Lock)
# 防止生产者结束了,但是消费者还没处理完成
elif len(global_list) == 0 and stop_event.is_set():
break
global n
n += 1
print(f"消费者{i}完成任务~总共循环{n}次")
def producer():
print("生产者正在生产商品")
for i in range(10):
global_list.append(i)
sleep(random()) # 模拟网络延迟
event.set() # 通知消费者生产结束
stop_event.set() # 通知消费者已经可以结束线程了
def main():
pool = ThreadPool()
pool.map_async(consumer, range(2)) # 两个消费者
pool.apply_async(producer) #
pool.close()
pool.join()
if __name__ == '__main__':
main()
输出:(是不是又感觉多了种消费者安全退出的方式?)
消费者0等待ing
消费者1等待ing
生产者正在生产商品
消费了产品1
消费了产品0
消费了产品2
消费了产品3
消费了产品4
消费了产品5
消费了产品6
消费了产品7
消费了产品8
消费了产品9
消费者0完成任务
消费者1完成任务
PS:while条件换成:while not (len(global_list) == 0 and stop_event.is_set()):
也行
如果一个线程需要在一个“消费者”线程处理完特定的数据项时立即得到通知,你可以把要发送的数据和一个Event
一起使用,这样“生产者”就可以通过这个Event
对象来监测处理的过程了
from multiprocessing.dummy import Pool as ThreadPool, Queue, Event
def producer(queue):
for i in range(10):
event = Event()
queue.put((event, i))
print(f"[生产者]生产了产品{i}")
event.wait() # 等待消费者通知
print(f"生产者已经收到消费情况的反馈{i}")
def consumer(queue):
while True:
evt, data = queue.get()
print(f"[消费者]消费了产品{data}")
evt.set() # 通知生产者
def main():
queue = Queue()
pool = ThreadPool()
pool.apply_async(consumer, args=(queue, ))
pool.apply_async(producer, args=(queue, ))
pool.close()
pool.join()
if __name__ == '__main__':
main()
输出:(进程只需微微改动即可使用)
[生产者]生产了产品0
[消费者]消费了产品0
生产者已经收到消费情况的反馈0
[生产者]生产了产品1
[消费者]消费了产品1
生产者已经收到消费情况的反馈1
[生产者]生产了产品2
[消费者]消费了产品2
生产者已经收到消费情况的反馈2
[生产者]生产了产品3
[消费者]消费了产品3
生产者已经收到消费情况的反馈3
[生产者]生产了产品4
[消费者]消费了产品4
生产者已经收到消费情况的反馈4
[生产者]生产了产品5
[消费者]消费了产品5
生产者已经收到消费情况的反馈5
[生产者]生产了产品6
[消费者]消费了产品6
生产者已经收到消费情况的反馈6
[生产者]生产了产品7
[消费者]消费了产品7
生产者已经收到消费情况的反馈7
[生产者]生产了产品8
[消费者]消费了产品8
生产者已经收到消费情况的反馈8
[生产者]生产了产品9
[消费者]消费了产品9
生产者已经收到消费情况的反馈9
3.本质¶
来看看Event
到底是何方神圣:(本质就是基于Condition
封装了一个标识位,来标记事件是否完成)
class Event:
def __init__(self):
self._cond = Condition(Lock()) # 条件变量
self._flag = False
def is_set(self):
return self._flag
isSet = is_set # 建议用is_set,这样进程和线程方法就一致了
def set(self):
with self._cond:
self._flag = True
self._cond.notify_all()
def clear(self):
with self._cond:
self._flag = False
def wait(self, timeout=None):
with self._cond:
signaled = self._flag
if not signaled:
signaled = self._cond.wait(timeout)
return signaled
其实应用场景很多,用起来比Condition
方便,比如在连接远程数据库或者访问api的时候设置一个重试机制,成功后再执行SQL或者数据处理:
from time import sleep
from multiprocessing.dummy import Pool as ThreadPool, Event
event = Event()
def conn_redis():
n = 1
time_out = 0.5
# 重试机制
while not event.is_set():
if n == 4: # 自定义重试次数
raise TimeoutError("