互斥锁

from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() 上锁 print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() 解锁 if __name__ == '__main__': lock=Lock() 造锁(实例化一个锁) for i in range(3): p=Process(target=work,args=(lock,)) p.start()
示例背景
三台电脑同一时刻共同调用打印机完成打印任务,即三个进程同一时刻共抢同一个资源(输出平台)
多个进程共抢一个资源,应结果第一位,效率第二位,所以应该牺牲效率,保求结果(串行)

以上的代码虽然完成串行结果,但没有实现公平,执行的顺序都是人为写好的,应该做到公平的抢占资源,谁先抢到就执行谁

from multiprocessing import Process from multiprocessing import Lock import time import random def task1(lock): print('task1') # 验证cpu遇到io切换了 lock.acquire() print('task1: 开始打印') time.sleep(random.randint(1, 3)) print('task1: 打印完成') lock.release() def task2(lock): print('task2') # 验证cpu遇到io切换了 lock.acquire() print('task2: 开始打印') time.sleep(random.randint(1, 3)) print('task2: 打印完成') lock.release() def task3(lock): print('task3') # 验证cpu遇到io切换了 lock.acquire() print('task3: 开始打印') time.sleep(random.randint(1, 3)) print('task3: 打印完成') lock.release() if __name__ == '__main__': lock = Lock() p1 = Process(target=task1, args=(lock,)) p2 = Process(target=task2, args=(lock,)) p3 = Process(target=task3, args=(lock,)) p1.start() p2.start() p3.start()
上锁时一定要给进程上同一把锁,而且上锁一次,就一定要解锁一次
互斥锁和join的区别
共同点
都是完成了进程之间的串行
区别
join是通过人为控制使进程串行
互斥锁是随机抢占资源,保证了公平性

业务需求分析: 买票之前先要查票,必须经历的流程: 你在查票的同时,100个人也在查本此列票. 买票时,你要先从服务端获取到票数,票数>0 ,买票,然后服务端票数减一. 中间肯定有网络延迟. from multiprocessing import Process from multiprocessing import Lock import time import json import os import random 多进程原则上是不能互相通信的,它们在内存级别数据隔离的.不代表磁盘上数据隔离. 它们可以共同操作一个文件. def search(): time.sleep(random.random()) with open('db.json',encoding='utf-8') as f1: dic = json.load(f1) print(f'剩余票数{dic["count"]}') def get(): with open('db.json',encoding='utf-8') as f1: dic = json.load(f1) time.sleep(random.randint(1,3)) if dic['count'] > 0: dic['count'] -= 1 with open('db.json', encoding='utf-8', mode='w') as f1: json.dump(dic,f1) print(f'{os.getpid()}用户购买成功') else: print('没票了.....') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock = Lock() for i in range(5): p = Process(target=task,args=(lock,)) p.start()
队列
进程之间的通信最好的方式是基于队列

from multiprocessing import Process from multiprocessing import Queue import time import os import random # 多进程原则上是不能互相通信的,它们在内存级别数据隔离的.不代表磁盘上数据隔离. # 它们可以共同操作一个文件. def search(ticket): time.sleep(random.random()) print(f'剩余票数{ticket}') def _get(q): dic = q.get() time.sleep(random.randint(1,3)) if dic['count'] > 0: dic['count'] -= 1 print(f'{os.getpid()}用户购买成功') else: print('没票了.....') q.put(dic) def task(ticket,q): search(ticket) _get(q) if __name__ == '__main__': q = Queue(1) q.put({'count': 1}) ticket = 1 for i in range(5): p = Process(target=task,args=(ticket,q)) p.start()
什么是队列
队列是存在与内存中的一个容器,最大的特点:先进先出(FIFO)
利用队列进行进程之间的通信,简单,方便,不用自己手动加锁,队列自带优质阻塞,可持续化取数据

from multiprocessing import Queue q = Queue(3) # 可以设置元素个数 def func(): print('in func') q.put('alex') q.put({'count': 1}) q.put(func) q.put(666) # 当队列数据已经达到上限,在插入数据的时候,程序就会夯住. 当你将数据全部取完继续再取值时,程序也会夯住 put中有block参数 默认为True 当你插入的数据超过最大限度,默认阻塞.改成False 数据超过最大限度,不阻塞了直接报错. put中有timeout参数 延时报错,比如q.put(3,timeout = 3) 超过三秒再put不进,程序就会报错 get也有这两个参数

# 小米:抢手环4.预期发售10个. # 有100个人去抢. import os from multiprocessing import Queue from multiprocessing import Process def task(q): try: q.put(f'{os.getpid()}',block=False) except Exception: return if __name__ == '__main__': q = Queue(10) for i in range(100): p = Process(target=task,args=(q,)) p.start() for i in range(1,11): print(f'排名第{i}的用户: {q.get()}',)
栈
栈与队列性质类似,特点与队列相反:先进后出(FILO)

import queue q = queue.LifoQueue() q.put(1) q.put(3) q.put('barry') print(q.get()) print(q.get()) print(q.get()) print(q.get())
优先级队列
需要通过元组的形式放入,(int,数据) int 代表优先级,数字越低,优先级越高

import queue q = queue.PriorityQueue(3) q.put((10, '垃圾消息')) q.put((-9, '紧急消息')) q.put((3, '一般消息')) print(q.get()) print(q.get()) print(q.get())
生产者消费者模型(多应用于并发)
生产者:生产数据进程
消费者:对生产者生产出来的数据做进一步处理进程
from multiprocessing import Process from multiprocessing import Queue import time import random def producer(name,q): for i in range(1,6): time.sleep(random.randint(1,3)) res = f'{i}号包子' q.put(res) print(f'