进程
-
multiprocessing模块介绍
- python中的多线程无法利用CPU资源,在python中大部分情况使用多进程。python中提供了非常好的多进程包multiprocessing。
- multiprocessing模块用来开启子进程,并在子进程中执行功能(函数),该模块与多线程模块threading的编程接口类似。
- multiprocessing的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
-
Process类的介绍
-
创建进程的类:
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) 强调: 1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
-
参数介绍:
group参数未使用,值始终为None target表示调用对象,即子进程要执行的任务 args表示调用对象的位置参数元组,args=(1,2,'egon',) kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} name为子进程的名称
-
方法介绍:
p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。 如果p还保存了一个锁那么也将不会被释放,进而导致死锁 p.is_alive():如果p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间, 需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
-
属性介绍:
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 p.name:进程的名称 p.pid:进程的pid p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
-
-
Process类的使用
-
一定要把开进程的代码写在if name=='main':下面
开一个进程和主进程是并发的关系,我start一下就是先告诉操作系统我要开一个进程
,然而它不会等待,他会去执行下面的代码,完了他吧进程开始后,就开始执行了strat():方法的功能
1.开启进程
2.执行功能
-
-
开启进程的两种方式:
-
第一种:
from multiprocessing import Process import time import random def piao(name): print('%s is piaoing'%name) time.sleep(random.randint(1,3)) print('%s is piao end'%name) if __name__ =='__main__': p1 = Process(target=piao,kwargs={'name':'alex'})# 创建一个进程对象 p2 = Process(target=piao,kwargs={'name':'alex'}) p3 = Process(target=piao,kwargs={'name':'alex'}) p1.start() # 只是向操作系统发出一个开辟子进程的信号,然后就执行下一行了. # 这个信号操作系统接收到之后,会从内存中开辟一个子进程空间,然后在将主进程所有数据copy加载到子进程,然后在调用cpu去执行. # 开辟子进程开销是很大的 p2.start() p3.start() print('主进程') #第一种方式
-
第二种:
from multiprocessing import Process import time import random import os class Piao(Process): def __init__(self,name): super().__init__() #必须继承父类的一些属性 self.name = name def run(self): #必须得实现一个run方法 print(os.getppid(),os.getpid()) print('%s is piaoing'%self.name) time.sleep(random.randint(1,3)) print('%s is piao end'%self.name) if __name__ =='__main__': p1 = Piao('alex') p2 = Piao('wupeiqi') p3 = Piao('yuanhao') p1.start() p2.start() p3.start() print('主进程',os.getpid()) #第二种方式
-
-
验证进程之间的空间隔离:
# 接下来我们验证一下进程之间的互相隔离。 # 在一个进程中 x = 1000 def task(): global x x = 2 task() print(x) # 在不同的进程中: from multiprocessing import Process import time x = 1000 def task(): global x x = 2 if __name__ == '__main__': p = Process(target=task) p.start() time.sleep(3) print(x) #代码验证
-
进程对象的join方法:
from multiprocessing import Process import time # 父进程等待子进程结束之后在执行 # 方法一 加sleep 不可取! def task(n): time.sleep(3) print('子进程结束....') if __name__ == '__main__': p = Process(target=task,args=('太白金星',)) p.start() time.sleep(5) print('主进程开始运行....') # 这样虽然达到了目的, # 1,但是你在程序中故意加sleep极大影响程序的效率。 # 2,sleep(3)只是虚拟子进程运行的时间,子进程运行完毕的时间是不固定的。 # 方法二: join from multiprocessing import Process import time def task(n): time.sleep(3) print('子进程结束....') if __name__ == '__main__': p = Process(target=task,args=('太白金星',)) p.start() p.join() # 等待p这个子进程运行结束之后,在执行下面的代码(主进程). print('主进程开始运行....') # 接下来我要开启十个子进程,先看看效果 from multiprocessing import Process import time def task(n): print('%s is running' %n) if __name__ == '__main__': for i in range(1, 11): p = Process(target=task,args=(i,)) p.start() # ''' # 我这里是不是运行十个子进程之后,才会运行主进程?当然不会!!! # 1,p.start()只是向操作系统发送一个请求而已,剩下的操作系统在内存开启进程空间,运行进程程序不一定是马上执行。 # 2,开启进程的开销是比较大的。 # ''' print('主进程开始运行....') # 那么有人说,老师我对这个不理解,我给你拆解开来。 from multiprocessing import Process import time def task(n): print('%s is running' %n) if __name__ == '__main__': p1 = Process(target=task,args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) p4 = Process(target=task,args=(4,)) p5 = Process(target=task,args=(5,)) p1.start() p2.start() p3.start() p4.start() p5.start() print('主进程开始运行....') # 接下来 实现起多子个进程,然后等待这些子进程都结束之后,在开启主进程。 from multiprocessing import Process import time def task(n): time.sleep(3) print('%s is running' %n) if __name__ == '__main__': start_time = time.time() p1 = Process(target=task,args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) # # 几乎同一个时刻发送三个请求 p1.start() p2.start() p3.start() # # 对着三个自己成使用三个join p1.join() p2.join() p3.join() print(time.time() - start_time,'主进程开始运行....') # 3s 多一点点这是来回切换的所用时间。 # 那么在进行举例: from multiprocessing import Process import time def task(n): time.sleep(n) print('%s is running' %n) if __name__ == '__main__': start_time = time.time() p1 = Process(target=task,args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) # 几乎同一个时刻发送三个请求 p1.start() p2.start() p3.start() # 对着三个自己成使用三个join p1.join() # 1s p2.join() # 2s p3.join() # 3s print(time.time() - start_time,'主进程开始运行....') # 3s 多一点点这是来回切换的所用时间。 # 利用for循环精简上面的示例: from multiprocessing import Process import time def task(n): time.sleep(1) print('%s is running' %n) if __name__ == '__main__': start_time = time.time() for i in range(1,4): p = Process(target=task,args=(i,)) p.start() p.join() p1 = Process(target=task,args=(1,)) p2 = Process(target=task,args=(2,)) p3 = Process(target=task,args=(3,)) # # 几乎同一个时刻发送三个请求 p1.start() p1.join() p2.start() p2.join() p3.start() p3.join() # 上面的代码,p1.join()他的作用:你的主进程代码必须等我的p1子进程执行完毕之后,在执行 # # p2.start()这个命令是主进程的代码。 # # 而 如果你这样写: # ''' # p1.join() # p2.join() # p3.join() # ''' print(time.time() - start_time,'主进程开始运行....') # 所以你上面的代码应该怎么写? from multiprocessing import Process import time def task(n): time.sleep(3) print('%s is running' %n) if __name__ == '__main__': p_l = [] start_time = time.time() for i in range(1,4): p = Process(target=task,args=(i,)) p.start() p_l.append(p) # 对着三个自己成使用三个join for i in p_l: i.join() print(time.time() - start_time,'主进程开始运行....')
-
进程对象的其他属性(了解):
# from multiprocessing import Process # import time # import os # # def task(n): # time.sleep(3) # print('%s is running' %n,os.getpid(),os.getppid()) # # if __name__ == '__main__': # p1 = Process(target=task,args=(1,),name = '任务1') # # print(p1.name) # 给子进程起名字 # # for i in range(3): # # p = Process(target=task, args=(1,)) # # print(p.name) # 给子进程起名字 # p1.start() # # p1.terminate() # # time.sleep(2) # 睡一会,他就将我的子进程杀死了。 # # print(p1.is_alive()) # False # print(p1.pid) # # print('主') # print(os.getpid())
-
守护进程
-
守护进程的概念:
# 守护进程: # 古时候 太监守护这个皇帝,如果皇帝驾崩了,太监直接也就死了. # 子进程守护着主进程,只要主进程结束,子进程跟着就结束,
-
代码示例:
from multiprocessing import Process import time def task(name): print(f'{name} is running') time.sleep(2) print(f'{name} is gone') if __name__ == '__main__': p = Process(target=task, args=('立业',)) p.daemon = True #将子进程p设置成守护进程,只要主进程结束,守护进程也马上结束 p.start() time.sleep(1) print('==主进程执行') #打印结果 # 立业 is running #主进程结束,子进程也随之结束, # ==主进程执行
-
-
僵尸进程与孤儿进程:
-
僵尸进程和孤儿进程只会出现在Unix,linux,macos上面,windows上不会出现
-
进程的概念:
#1:主进程需要等待子进程结束之后,主进程才结束 主进程时刻监视子进程的运行状态,当子进程结束之后,一段时间之内,将子进程进行回收
-
为什么主进程不在子进程结束后马上对其进行回收
#1:主进程和子进程时异步关系,主进程无法马上捕获子进程什么时候结束 #2:如果子进程结束后马上在内存中释放资源,主进程就没有办法监视子进程的状态了
-
Unix针对上面的问题,提供了一个机制
#所有的子进程结束之后,立马会释放文件的操作链接,内存中的大部分数据,但会保留一些内容:进程号,结束时间,运行状态,等待主进程监测,回收
-
僵尸进程:
#所有的子进程在结束之后,在被主进程回收之前都会进入僵尸状态
-
僵尸进程是有害的:
#如果父进程不对僵尸进程进行回收,产生大量的僵尸进程,这样就会占用内存,占用进程pid号
-
僵尸进程如何解决:
#父进程产生了大量子进程,但是不回收,这样就会形成大量的僵尸进程,解决方式就是直接杀死父进程,将所有的僵尸进程变成孤儿进程进程,由init进行回收
-
孤儿进程:
#父进程由于某种原因结束了,但是你的子进程还在运行,这样你的这些子进程就变成了孤儿进程,你的父进程如果结束了,你的所有的孤儿进程,就会被init进程回收,init就变成了父进程,对孤儿进程进行回收
-
-
互斥锁:
-
什么是互斥锁:
#互斥锁就是在保证子进程串行的同时,也保证了子进程执行顺序的随机性,以及数据的安全性 使用互斥锁的注意点: 使用互斥锁不能连续锁,不然会造成阻塞和死锁
-
代码示例:
from multiprocessing import Process from multiprocessing import Lock import time import os import random import sys def task1(lock): lock.acquire() print(f'{os.getpid()}开始打印了') time.sleep(random.randint(1,3)) print(f'{os.getpid()}打印结束了') lock.release() def task2(lock): lock.acquire() print(f'{os.getpid()}开始打印了') time.sleep(random.randint(1, 3)) print(f'{os.getpid()}打印结束了') lock.release() def task3(lock): lock.acquire() print(f'{os.getpid()}开始打印了') time.sleep(random.randint(1, 3)) print(f'{os.getpid()}打印结束了') lock.release() if __name__ == '__main__': lock=Lock() for i in ['task1','task2','task3']: p=Process(target=getattr(sys.modules[__name__],i),args=(lock,)) p.start()
-
示意图:
-
join和Lock的区别:
#共同点: 都可以把并发改成串行,保证了执行的顺序 #不同点: join是人为的设置顺序,Lock锁时让其争抢顺序,保证了公平性
-
基于文件的进程之间的通信:
#进程在内存级别是隔离的,但是文件在磁盘上 #使用lock锁实现文件上的通讯 from multiprocessing import Process from multiprocessing import Lock import json import time import os import random # 当多个进程共抢一个数据时,如果要保证数据的安全,必须要串行. # 要想让购买环节进行串行,我们必须要加锁处理. def search(): time.sleep(random.randint(1,3)) # 模拟网络延迟(查询环节) with open('ticket.json',encoding='utf-8') as f1: dic = json.load(f1) print(f'{os.getpid()} 查看了票数,剩余{dic["count"]}') def paid(): with open('ticket.json', encoding='utf-8') as f1: dic = json.load(f1) if dic['count'] > 0: dic['count'] -= 1 time.sleep(random.randint(1,3)) # 模拟网络延迟(购买环节) with open('ticket.json', encoding='utf-8',mode='w') as f1: json.dump(dic,f1) print(f'{os.getpid()} 购买成功') def task(lock): search() lock.acquire() paid() lock.release() if __name__ == '__main__': mutex = Lock() for i in range(6): p = Process(target=task,args=(mutex,)) p.start() #当很多进程共强一个资源(数据)时, 你要保证顺序(数据的安全),一定要串行. #互斥锁: 可以公平性的保证顺序以及数据的安全.
-
基于文件的进程通信的特点
#1:效率低 #2:自己加锁麻烦而且容易出现死锁
-
-
队列
-
队列的特点:
#1:把队列当成一个容器,这个容器可以承载一些数据 #2:队列的特性是数据保持先进先出原则,FIFO
-
队列示例代码:
from multiprocessing import Queue q = Queue(3) q.put(1) q.put('alex') q.put([1,2,3]) q.put(5555) # 当队列满了时,在进程put数据就会阻塞. print(q.get()) print(q.get()) print(q.get()) print(q.get()) # 当数据取完时,在进程get数据也会出现阻塞,直到某一个进程put数据. from multiprocessing import Queue q = Queue(3) # maxsize q.put(1) q.put('alex') q.put([1,2,3]) # q.put(5555,block=False) # print(q.get()) print(q.get()) print(q.get()) print(q.get(timeout=3)) # 阻塞3秒,3秒之后还阻塞直接报错. # print(q.get(block=False)) # block=False 只要遇到阻塞就会报错.
-
使用基于文件和队列的进程通信方式的优缺点:
#基于文件的进程通讯方式,在写入文件的时候耗费性能,效率低,在枷锁的时候,又可能出现锁死或递归锁的情况,相比之下队列的效率更高,也不会出现锁死或者递归锁的情况
-