进程通信
"""
进程在内存级别是隔离的,但是文件在磁盘上,
1.基于文件通信
利用抢票系统讲解
2.基于队列通信
3.基于管道的通信
"""
文件通信
# 抢票系统
# 1.先可以查票,查询余票数 并发
# 2.进行购买,向服务端发送请求,服务端接收请求,在后端将票数-1,返回到前端 串行
from multiprocessing import Process
from multiprocessing import Lock
import json
import time
import os
import random
# task一个总进程
def search():
"""
查看余票
:return:
"""
time.sleep(random.randint(1,3))
with open('ticket.json',encoding='utf-8') as f1:
dic = json.load(f1)
print(f'{os.getpid()}查看了票数,剩余{dic["count"]}')
def paid():
"""
支付
打开文件写入文件转成json模式
:return:
"""
with open('ticket.json', encoding='utf-8') as f1:
dic = json.load(f1)
if dic['count'] > 0:
dic['count'] -=1
time.sleep(random.randint(1,3))
with open('ticket.json',encoding='utf-8',mode='w') as f1:
json.dump(dic,f1)
print(f'{os.getpid()}购买成功')
def task(lock): # task进程
search()
lock.acquire()
paid()
lock.release()
if __name__ == '__main__':
mutex = Lock()
for i in range(6):
p = Process(target=task,args=(mutex,))
p.start() # 启动
# 当很多进程共抢一个资源数据时,要保证顺序数据安全,一定要串行
# 互斥锁:可以公平性的保证顺序以及数据的安全
# 基于文件的进程之间的通信
# 缺点:效率低,自己加锁会麻烦,容易出现死锁
# 优点:能通信了
"""
几乎同时6个进程开启,都进入了一个查票环节,
task1进入search,睡了3s,其他的进程都进入查票,查票此时是并发
当task2进程进入买票环节,读取文件票数,将文件票数-1,
进入阻塞状态,其他的进程陆续执行同样的操作,这样就造成了数据不安全
"""
队列通信
"""
"""
"""
队列:一个容器,这个容器可以承载一些数据 Queue
队列的特性:先进先出永远保持这个顺序 FIFO first in first out
"""
from multiprocessing import Queue
q = Queue()
def func():
print('in func')
return '函数'
q.put(1)
q.put('alex')
q.put([1,2,3])
q.put(func())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# 设置大小
from multiprocessing import Queue
q = Queue(3) # maxsize的最大值
def func():
print('in func')
q.put(1)
q.put('alex')
q.put([1,2,3])
q.put(func) # 若put超出最大值则队列满了 进程put会阻塞 无法执行下面的代码
print(q.get())
print(q.get())
print(q.get())
print(q.get()) # 若get满了,当数据取完时,在进程get数据也会出现阻塞,直到再put一个数据
# 一个进程,几个put就几个get
#
# block = False 只要遇到阻塞就会报错
from multiprocessing import Queue
q = Queue(3) # maxsize
def func():
print('in func')
q.put(1)
q.put('alex')
q.put([1,2,3])
q.put(555,block=False)
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# timeout = 3 延时报错 超过3秒还阻塞,抛出异常
from multiprocessing import Queue
q = Queue(3) # maxsize
q.put(1)
q.put('alex')
q.put([1,2,3])
print(q.get())
print(q.get())
print(q.get())
print(q.get(timeout=3))
join
# join让主进程等待子进程的结束,再执行主进程
from multiprocessing import Process
import time
def task(name):
print(f'{name} is running')
time.sleep(2)
print(f'{name} is gone') # 主进程
if __name__ == '__main__':
p = Process(target=task,args=('zs',))# 创建一个进程对象
p.start() # 启动p子进程同时启动主进程
p.join() # 必须等待p执行完再执行主
print('主进程')
# 输出
# zs is running
# zs is gone
# 主进程
# join开启多个子进程
# 未开启时:先执行主进程,再一个一个执行子进程(先执行时间短的)
from multiprocessing import Process
import time
def task(name,sec):
print(f'{name} is running')
time.sleep(sec)
print(f'{name} is gone') # 主进程
if __name__ == '__main__':
start_time = time.time()
# 同一个时刻开启4个进程,并发或者并行,按照最大的时间走
p = Process(target=task,args=('zs',2))
p2 = Process(target=task,args=('zs2',7))
p3 = Process(target=task,args=('zs3',5))
p.start()
p2.start()
p3.start()
print(f'主进程消耗的时间{time.time() - start_time}') # 主进程消耗的时间与其他进程无关 0.000123秒
# 输出:
# 主进程消耗的时间0.0050699710845947266
# zs is running
# zs2 is running
# zs3 is running
# zs is gone
# zs3 is gone
# zs2 is gone
# 验证1:
# join只针对主进程,如果join下面多次join是不阻塞的
# 不会按照一行一行输出,同时开始执行,最后执行主进程
# 必须等待最长的1个p执行完再执行主进程
# p,p1,p2 同时进行,p执行速度快,先执行完,执行时阻塞不执行'2秒'主进程2秒后执行主进程'2秒'
# 执行p的时候同时执行p1和p2 p2执行速度比p3慢 p3先执行完,执行完后join无法阻塞主进程
# p3执行完,p2再过2秒也执行完,p2执行完后,直接执行'7'秒和'5'秒的主进程
p.join()
print('2秒')
p2.join()
print('7秒')
p3.join()
print('5秒')
print(f'主进程消耗的时间{time.time() - start_time}') # 并发执行主进程消耗的时间 3.0387587秒
# p1和p2和p3 同时运行 按照最长的时长打印
# 输出
# 主进程消耗的时间0.005324602127075195
# zs is running
# zs2 is running
# zs3 is running
# zs is gone
# 2秒
# zs3 is gone
# zs2 is gone
# 7秒
# 5秒
# 主进程消耗的时间7.008730888366699
# 验证2
# 对验证1进行优化代码 循环打印
# 正确示范
from multiprocessing import Process
import time
def task(sec):
print(f'is running')
time.sleep(sec)
print(f'is gone')
if __name__ == '__main__':
start_time = time.time()
l1 = []
for i in range(1,4):
p = Process(target=task,args=(i,))
l1.append(p)
p.start()
for i in l1:
i.join()
print(f'主进程{time.time()-start_time}')
# join就是阻塞,主进程有join主进程下面的代码一律不执行
互斥锁
# 互斥锁:
# 多个任务共抢占一个资源时,想要顺序优先保障数据安全,一定要让其串行
# 现在的进程都并发的抢占输出
# 并发是以效率优先的,但是目前的需求是:顺序优先
# 多个进程共枪一个资源时,要保证顺畅优先:串行,一个一个来
# 并行或者并发
from multiprocessing import Process
import time
import random
import os
def task1():
print(f'{os.getpid()}开始')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}打印结束了')
def task2():
print(f'{os.getpid()}开始')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}打印结束了')
def task3():
print(f'{os.getpid()}开始')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}打印结束了')
if __name__ == '__main__':
# 开启3个任务
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3)
p1.start()
p2.start()
p3.start()
# 串行
# 利用join 解决了并行的问题,保证了顺序优先,但是这个谁先谁后是固定的,
# 这样不合理,争抢同一个资源的时候,应该是先到先得,保证公平
from multiprocessing import Process
import time
import random
import os
def task1():
print(f'{os.getpid()}开始')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}打印结束了')
def task2():
print(f'{os.getpid()}开始')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}打印结束了')
def task3():
print(f'{os.getpid()}开始')
time.sleep(random.randint(1,3))
print(f'{os.getpid()}打印结束了')
if __name__ == '__main__':
# 开启3个任务
p1 = Process(target=task1)
p2 = Process(target=task2)
p3 = Process(target=task3)
p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
from multiprocessing import Process
import time
import random
import os
def task1(p):
print(f'{p}开始')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
def task2(p):
print(f'{p}开始')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
def task3(p):
print(f'{p}开始')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
if __name__ == '__main__':
# 开启3个任务
p1 = Process(target=task1,args=('p1',))
p2 = Process(target=task2,args=('p2',))
p3 = Process(target=task3,args=('p3',))
p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
from multiprocessing import Process
from multiprocessing import Lock #(锁)
import time
import random
import os
def task1(p,lock):
# 上锁
lock.acquire()
# lock.acquire() 互斥锁上多个无法解开
print(f'{p}开始')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
# 解锁 释放掉
lock.release()
def task2(p,lock):
# 上锁
lock.acquire()
print(f'{p}开始')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
# 解锁 释放掉
lock.release()
def task3(p,lock):
# 上锁
lock.acquire()
print(f'{p}开始')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
# 解锁 释放掉,再往下执行,开启强锁
lock.release()
if __name__ == '__main__':
# 开启3个任务
# 开始上锁,添加参数lock
mutex = Lock()
p1 = Process(target=task1,args=('p1',mutex))
p2 = Process(target=task2,args=('p2',mutex))
p3 = Process(target=task3,args=('p3',mutex))
p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
"""
三个任务同时抢占一把锁,先到先得,假如task2 先得到
然后执行下面的程序,此时的task1和task3也会抢锁,但是抢到后,发现已经上锁了,只能阻塞等待
遇到阻塞后,操作系统会强行将CPU切换到其他任务,其他任务发现锁还没有被打开,继续阻塞
直到task2 将锁释放掉,task1和task3继续争抢这把锁
如果一个子进程里面有多个锁,会变成死锁,无法继续执行
"""
"""
join和互斥锁的区别
共同点:都能将并行变成串行,保证了顺序
不同点:join人为的设定顺序,lock让其争抢顺序,保证了公平性
"""
from multiprocessing import Process
from multiprocessing import Lock
import time
import random
def task1(p,lock):
lock.acquire()
print(f'{p}开始打印了')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
lock.release()
def task2(p,lock):
lock.acquire()
print(f'{p}开始打印了')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
lock.release()
def task3(p,lock):
lock.acquire()
print(f'{p}开始打印了')
time.sleep(random.randint(1,3))
print(f'{p}打印结束了')
lock.release()
if __name__ == "__main__":
mutex = Lock()
p1 = Process(target=task1,args=('p1',mutex))
p2 = Process(target=task2,args=('p2',mutex))
p3 = Process(target=task3,args=('p3',mutex))
p2.start()
p1.start()
p3.start()