生产者和消费者模型
"""
1.进程:生产者消费者模型
编程思想:模型,设计模式,理论等等
都是交给你一种编程的方法,以后遇到类似的情况,套用即可
生产者消费者模型三要素
生产者:产生数据的
消费者:接收数据做进一步处理的
容器:队列
队列容器起到的作用:
缓冲作用,平衡了生产力和消费力,解耦
"""
from multiprocessing import Process
from multiprocessing import Queue
import time
import random
def producer(q,name):
"""
生产者:生产包子
:param q:
:param name:
:return:
"""
for i in range(5):
time.sleep(random.randint(1,2))
res = f"{i}号"
q.put(res)
print(f'生产者{name}生产了{res}')
def consumer(q,name):
"""
消费者:吃包子
:param q:
:param name:
:return:
"""
while 1:
try:
food = q.get()
time.sleep(random.randint(1,3))
print(f' 33[31;0m消费者{name}吃了{food} 33[0m')
except Exception:
return
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer,args=(q,'zs'))
p2 = Process(target=consumer,args=(q,'ls'))
p1.start()
p2.start()
# 输出:
# 生产者zs生产了0号
# 生产者zs生产了1号
# 生产者zs生产了2号
# 消费者ls吃了0号
# 消费者ls吃了1号
# 生产者zs生产了3号
# 消费者ls吃了2号
# 生产者zs生产了4号
# 消费者ls吃了3号
# 消费者ls吃了4号
递归锁
# 递归锁可以解决死锁的现象,业务需要多个锁时,优先考虑递归锁
from threading import Thread
from threading import Lock
import time
lock_A = Lock()
lock_B = Lock()
class MyThread(Thread):
def run(self): # 必须是run方法
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到A锁') # 拿到A锁
lock_B.acquire()
print(f'{self.name}拿到B锁') # 拿到B锁
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到B锁') # 拿到A锁
time.sleep(0.1)
lock_A.acquire()
print(f'{self.name}拿到A锁') # 拿到B锁
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t=MyThread()
t.start()
# 输出
# Thread-1拿到A锁
# Thread-1拿到B锁
# Thread-1拿到B锁
# Thread-2拿到A锁
"""
thread 先抢到了A锁,此时t2和t3也想抢A锁,但是只能等待,等待t1释放A锁
t1又抢到了B锁,此时t1有AB两把锁,也没有释放,t2和t3 继续等待
当t1依次释放BA锁,线程t2和t3争抢A锁,t1争抢B锁
结果分析,t2抢到了A锁,t1拿到B锁
但是接下来,t1睡了0.01秒,t1拥有者B锁,但想要A锁,t2拥有A锁,想要B锁,形成了死锁现象
"""
# 递归锁可以解决死锁现象
from threading import Thread
from threading import RLock # 递归锁
import time
lock_A = lock_B = RLock()
# 递归锁有一个计数的功能,原数字为0,上一次锁计数加一,释放一次数,计数-1
# 只要递归锁上面的数字不为零,其他线程不能抢锁
class MyThread(Thread):
def run(self): # 必须是run方法
self.f1()
self.f2()
def f1(self):
lock_A.acquire()
print(f'{self.name}拿到A锁') # 拿到A锁
lock_B.acquire()
print(f'{self.name}拿到B锁') # 拿到B锁
lock_B.release()
lock_A.release()
def f2(self):
lock_B.acquire()
print(f'{self.name}拿到B锁') # 拿到A锁
time.sleep(0.1)
lock_A.acquire()
print(f'{self.name}拿到A锁') # 拿到B锁
lock_A.release()
lock_B.release()
if __name__ == '__main__':
for i in range(3):
t=MyThread()
t.start()
# 输出
# Thread-1拿到A锁
# Thread-1拿到B锁
# Thread-2拿到A锁
# Thread-2拿到B锁
# Thread-2拿到B锁
# Thread-2拿到A锁
# Thread-1拿到B锁
# Thread-1拿到A锁
# Thread-3拿到A锁
# Thread-3拿到B锁
# Thread-3拿到B锁
# Thread-3拿到A锁
信号量
"""
信号量:也是锁的一种
"""
from threading import Thread
from threading import Semaphore
from threading import current_thread
# 同一个模块可以引用多个功能加.
import time
import random
sem = Semaphore(5) # 并发数量是5
# 控制并发任务
def task():
sem.acquire() # 上锁
print(f'{current_thread().name}正在排队')
time.sleep(random.randint(1,3))
sem.release() # 解锁 鸭子类型
if __name__ == '__main__':
for i in range(20): # 同时开启20个进程
t = Thread(target=task)
t.start()
GIL全局
"""
GIL全局解释器锁
进程空间:解释器 文件
Python解释器:虚拟机 编译器
编译器:c语言能识别的字节码
虚拟机:字节码转化成机器码
把机器码交给cpu去执行
文件
理论上说:单个进程的多线程可以利用多核
实际上说:同一时刻,只能允许一个线程进入解释器
给解释器加了一个锁
为什么要加锁:
1.是单核时代,CPU加个贵
2.不加全局解释器锁,开发CPython解释器的程序员就会在源码内部各种主动加锁,解锁
非常的麻烦,为了节省方便,直接在进入解释器时给线程加锁
优点:保证了CPython解释器的数据资源的安全
缺点:单个进程的多线程不能利用多核
Jpython 没有GIL锁
pypy 没有GIL锁
现在多核时代,
因为CPython解释器所有的业务逻辑都是围绕着单个线程实现的
去掉GIL锁几乎不可能
遇到IO阻塞,CPU就会无情的被操作系统切走
GIL锁被释放,线程被挂起,另一个线程进入(可以进行并发)
单个进程的多线程可以并发,但是不能利用多核不能并行
多个线程可以并发,并行
"""
"""
GIL与lock锁的区别
相同点:都是同种锁,互斥锁
不同点:
1.保护的对象不一样
GIL 锁全局解释器锁,保护解释器内部的资源数据的安全
GIL锁上锁,释放无需手动操作
自己代码中定义的互斥锁保护进程中的资源数据的安全
lock
t1先进入解释器要抢到GIL锁,然后抢lock锁
遇到IO阻塞,操作系统强行将CPU切走,立马释放了GIL锁
t3抢到GIL锁,要抢lock锁,但是lock锁被t1占用,挂起
直到t1阻塞完毕,加上GIL锁,继续执行执行完毕
释放GIL锁,释放lock锁,下一个线程才能进入,周而复始
"""
"""
验证
IO密集型:单个进程的多线程合适,并发执行
计算密集型:多进程的并行
"""
IO密集型
# IO密集型:单个进程的多线程并发vs多个进程的并发并行
# 多进程并发并行
def task():
count = 0
# time.sleep(random.randint(1,3))
count += 1
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(4):
p = Process(target=task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f'执行效率{time.time()-start_time}')
# 输出 执行效率:0.08116006851196289
# 多个线程的并发并行
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(50):
p = Thread(target=task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f'执行效率{time.time() - start_time}')
# 输出 执行效率执行效率:0.0043070316314697266
"""
总结:
对于IO密集型:单个进程的多线程的并发效率高
"""
计算密集型
from threading import Thread
from multiprocessing import Process
import time
import random
# 计算密集型:单个进程的多线程并发vs多个进程的并发并行
# 多个进程的并发并行
def task():
count = 0
for i in range(10000000):
count += 1
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(4):
p = Process(target=task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f'效率{time.time()-start_time}')
# 输出 效率1.0616059303283691
from threading import Thread
from multiprocessing import Process
import time
# 多线程并发
def task():
count = 0
for i in range(10000000):
count += 1
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(4):
p = Thread(target=task,)
l1.append(p)
p.start()
for p in l1:
p.join()
print(f'效率{time.time()-start_time}')
# 输出 效率2.1974008083343506
"""
总结:
计算密集型:多进程的并发并行效率高
"""
线程池和进程池
"""
线程池 进程池
进程池:一个容器,限制住开启进程的数量
线程池:一个容器,限制住开启线程的数量,比如4个
第一次肯定只能并发的处理4个任务,只要有任务完成,线程马上会执行下一个任务
以时间换空间
"""
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os
import time
import random
def task(n):
print(f'{os.getpid()}开始')
time.sleep(random.randint(1,3)) # 开启4个进程
if __name__ == '__main__':
# 开启进程池(并行(并发+并行))ProcessPoolExecutor()
p = ProcessPoolExecutor() # 默认不写,进程池里面的进程数和CPU个数相等
for i in range(20): # 处理20个任务
p.submit(task,i) # 发布任务和参数 submit(任务,参数)
# 输出
# 20395开始
# 20396开始
# 20397开始
# 20398开始
# 20395开始
# 20396开始
# 20397开始
# 20396开始
# 20397开始
# 20398开始
# 20395开始
# 20396开始
# 20397开始
# 20395开始
# 20398开始
# 20397开始
# 20396开始
# 20395开始
# 20398开始
# 20397开始
# 开启线程池(并发)
t = ThreadPoolExecutor() # 默认不写,CPU个数*5 线程数 写多少就是多少
for i in range(20): # 开启线程100个
t.submit(task,i) # 发布任务和参数submit(任务,参数)
# 输出
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始
# 20403开始