multiprocessing模块下Process、Lock
进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。
无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务
并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发
并行:同时运行,只有具备多个cpu才能实现并行
开启子进程
方式一:实例化Process对象,指定参数target,args
方式二:重构Process类的run方法,直接实例化
from multiprocessing import Process import time def task(name): print('%s is running' %name) time.sleep(3) print('%s is done' %name) if __name__ == '__main__': # p = Process(target=task,kwargs={'name':'子进程1'}) p = Process(target=task,args=('子进程1',)) p.start() #仅仅只是给操作系统发送了一个信号 print('主')
from multiprocessing import Process import time class MyProcess(Process): def __init__(self, name): super().__init__() self.name = name def run(self): # 方法名一定要叫run print('%s is running' % self.name) time.sleep(3) print('%s is done' % self.name) # windows系统里开进程一定要放在if __name__ == '__main__': if __name__ == '__main__': p = MyProcess('子进程1') p.start() # 向操作系统发信号,告诉操作系统开启子进程 print('主')
查看进程号pid
os.getpid():当前文件的进程id
os.getppid():当前文件的父进程id,即pycharm
windows 系统cmd查看进程id:tasklist | findstr pycharm(进程名)
杀死进程:taskkill /F /PID 进程号xxx
from multiprocessing import Process import time, os def task(): print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid())) time.sleep(3) print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid())) if __name__ == '__main__': p = Process(target=task, ) p.start() print('主', os.getpid(), os.getppid()) # os.getpid():当前文件的进程id # os.getppid():当前文件的父进程id,即pycharm ''' windows 系统cmd查看进程id命令: tasklist | findstr pycharm '''
join方法
# join方法 # from multiprocessing import Process # import time,os # # def task(): # print('%s is running,parent id is <%s>' %(os.getpid(),os.getppid())) # time.sleep(3) # print('%s is done,parent id is <%s>' %(os.getpid(),os.getppid())) # # if __name__ == '__main__': # p=Process(target=task,) # p.start() """""" # p.join() # 进程p执行结束以后程序才会往下走 # print('主',os.getpid(),os.getppid()) # print(p.pid) # 验证僵尸进程,子进程已结束但pid未回收, # # 如果主进程pid被回收,子进程pid也被回收 # from multiprocessing import Process # import time,os # # def task(name,n): # print('%s is running' %name) # time.sleep(n) # # # if __name__ == '__main__': # start=time.time() # p1=Process(target=task,args=('子进程1',5)) # p2=Process(target=task,args=('子进程2',3)) # p3=Process(target=task,args=('子进程3',2)) # p_l=[p1,p2,p3] # # # p1.start() # 向操作系统发信号,告诉操作系统开启子进程 # # p2.start() # 但是操作系统什么时候开,先开谁 # # p3.start() # 不知道 # for p in p_l: # p.start() # # # p1.join() # # p2.join() # # p3.join() # for p in p_l: # p.join() # # print('主',(time.time()-start)) # from multiprocessing import Process # import time,os # # def task(name,n): # print('%s is running' %name) # time.sleep(n) # # # if __name__ == '__main__': # start=time.time() # p1=Process(target=task,args=('子进程1',5)) # p2=Process(target=task,args=('子进程2',3)) # p3=Process(target=task,args=('子进程3',2)) # # p1.start() # p1.join() # p2.start() # p2.join() # p3.start() # p3.join() # # print('主',(time.time()-start)) # 了解:is_alive from multiprocessing import Process import time, os def task(): print('%s is running,parent id is <%s>' % (os.getpid(), os.getppid())) time.sleep(3) print('%s is done,parent id is <%s>' % (os.getpid(), os.getppid())) if __name__ == '__main__': # p=Process(target=task,) # p.start() # # print(p.is_alive()) # p进程是否结束 # p.join() # print('主',os.getpid(),os.getppid()) # print(p.pid) # # print(p.is_alive()) p = Process(target=task, name='sub——Precsss') p.start() p.terminate() # 向系统发信号:"干掉p进程"。系统什么时候执行?不知道 time.sleep(3) print(p.is_alive()) # 进程p是否还活着 print('主') print(p.name) # 进程名,在产生进程的时候可以设置,否则使用默认值 # # 进程p已被干掉但进程名,进程id还在
守护进程:p.daemon=True 在p.start()之前设置
主进程结束则整个程序结束,不会再等子进程执行完
from multiprocessing import Process import time def task(name): print('%s is running' %name) time.sleep(2) p=Process(target=time.sleep,args=(3,)) p.start() if __name__ == '__main__': p=Process(target=task,args=('子进程1',)) p.daemon=True # 设置p为守护进程,一定要在start前设置 p.start() # 守护进程内不能再开子进程 p.join() print('主')
互斥锁:多个进程时,牺牲效率,保证数据安全
在开启进程之前先生成锁,之后抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢
# 互斥锁:牺牲效率,保证数据安全 from multiprocessing import Process, Lock import time def task(name, mutex): mutex.acquire() # 获取锁 print('%s 1' % name) time.sleep(1) print('%s 2' % name) time.sleep(1) print('%s 3' % name) mutex.release() # 释放锁 if __name__ == '__main__': mutex = Lock() # 生成锁 for i in range(3): # 3个task抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢 p = Process(target=task, args=('进程%s' % i, mutex)) p.start()
if __name__ == '__main__': # mutex = Lock() # 生成锁 for i in range(3): # 3个task抢锁,谁抢到了先执行谁,全部执行完后释放锁,剩下task的再抢 p = Process(target=task, args=('进程%s' % i, Lock() )) p.start() 可不可以这样呢?如果Lock() 是单例模式,那么没问题,否则不可。三个进程拿到三把不同的锁,就没有意义了。
join():把整个程序变成串行
Lock():把并行的程序,需要串行的地方加锁改成串行
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行地修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
虽然可以用文件共享数据实现进程间通信,但问题是:
1、效率低(共享数据基于文件,而文件是硬盘上的数据)
2、需要自己加锁处理
因此我们最好找寻一种解决方案能够兼顾:
1、效率高(多个进程共享一块内存的数据)
2、帮我们处理好锁问题。
这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
队列和管道都是将数据存放于内存中,而队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,因而队列才是进程间通信的最佳选择。
我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
队列:Queue
先进先出
from multiprocessing import Queue q = Queue(3) q.put('hello') q.put({'a': 1}) q.put([3, 3, 3, ]) print(q.full()) # q.put(4) print(q.get()) print(q.get()) print(q.get()) print(q.empty())
from multiprocessing import Process, Queue import time def producer(q): for i in range(3): res = '包子%s' % i time.sleep(0.5) print('生产者生产了%s' % res) q.put(res) def consumer(q): while True: res = q.get() # if res is None:break if not res: break time.sleep(1) print('消费者吃了%s' % res) if __name__ == '__main__': # 容器 q = Queue() # 生产者们 p1 = Process(target=producer, args=(q,)) p2 = Process(target=producer, args=(q,)) p3 = Process(target=producer, args=(q,)) # 消费者们 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() # 等所有的生产者生产完毕以后,向队列里放入结束信号 q.put(None) # 有几个消费者放几个结束信号 q.put('') print('主')
JoinableQueue
from multiprocessing import Process, JoinableQueue import time def producer(q): for i in range(2): res = '包子%s' % i time.sleep(0.5) print('生产者生产了%s' % res) q.put(res) q.join() def consumer(q): while True: res = q.get() if res is None: break time.sleep(1) print('消费者吃了%s' % res) q.task_done() if __name__ == '__main__': # 容器 q = JoinableQueue() # 生产者们 p1 = Process(target=producer, args=(q,)) p2 = Process(target=producer, args=(q,)) p3 = Process(target=producer, args=(q,)) # 消费者们 c1 = Process(target=consumer, args=(q,)) c2 = Process(target=consumer, args=(q,)) c1.daemon = True c2.daemon = True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join() print('主')
这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止