1.进程介绍
1.1什么是进程
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。程序是指令、数据及其组织形式的描述,进程是程序的实体。
狭义定义:进程是正在运行的程序的实例(an instance of a computer program that is being executed)。
广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。
人话 : 进程指的是正在进行的一个过程或者说一个任务。而负责执行任务则是cpu
程序和进程的区别
'''
程序就是一坨躺在硬盘上的代码(那个py文件) , 是 "死" 的
进程则表示程序正执行的过程 , 是 "活" 的
'''
如果没有进程你怎么描述程序的运行?我的电脑内存里有wx qq两个程序正在运行,显然没有直接说内存中有wx进程 和 qq进程更加方便
注意 : 同一个程序执行两次,就会在操作系统中出现两个进程,所以我们可以同时运行一个软件,分别做不同的事
情也不会混乱
1.2进程调度
多个程序在内存中 , 每个程序都需要等待cpu来执行 , 那么cpu怎么知道要先执行谁 , 然后下一个再切换到谁呢 , 这就是怎么调度 , 其中必须是要按照一个标准 , 这就是即将要说的进程调度算法
先来先服务(FCFS)调度算法
先来先服务(FCFS)调度算法是一种最简单的调度算法,该算法既可用于作业调度,也可用于进程调度。FCFS算法比较有利于长作业(进程),而不利于短作业(进程)。由此可知,本算法适合于CPU繁忙型作业,而不利于I/O繁忙型的作业(进程)。
短作业(进程)优先调度算法(SJ/PF)
短作业(进程)优先调度算法(SJ/PF)是指对短作业或短进程优先调度的算法,该算法既可用于作业调度,也可用于进程调度。但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。
时间片轮转法
时间片轮转(Round Robin,RR)法的基本思路是让每个进程在就绪队列中的等待时间与享受服务的时间成比例。在时间片轮转法中,需要将CPU的处理时间分成固定大小的时间片,例如,几十毫秒至几百毫秒。如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度。同时,进程调度程序又去调度当前就绪队列中的第一个进程。
显然,轮转法只能用来调度分配一些可以抢占的资源。这些可以抢占的资源可以随时被剥夺,而且可以将它们再分配给别的进程。CPU是可抢占资源的一种。但打印机等资源是不可抢占的。由于作业调度是对除了CPU之外的所有系统硬件资源的分配,其中包含有不可抢占资源,所以作业调度不使用轮转法。
在轮转法中,时间片长度的选取非常重要。首先,时间片长度的选择会直接影响到系统的开销和响应时间。如果时间片长度过短,则调度程序抢占处理机的次数增多。这将使进程上下文切换次数也大大增加,从而加重系统开销。反过来,如果时间片长度选择过长,例如,一个时间片能保证就绪队列中所需执行时间最长的进程能执行完毕,则轮转法变成了先来先服务法。时间片长度的选择是根据系统对响应时间的要求和就绪队列中所允许最大的进程数来确定的。
在轮转法中,加入到就绪队列的进程有3种情况:
一种是分给它的时间片用完,但进程还未完成,回到就绪队列的末尾等待下次调度去继续执行。
另一种情况是分给该进程的时间片并未用完,只是因为请求I/O或由于进程的互斥与同步关系而被阻塞。当阻塞解除之后再回到就绪队列。
第三种情况就是新创建进程进入就绪队列。
如果对这些进程区别对待,给予不同的优先级和时间片从直观上看,可以进一步改善系统服务质量和效率。例如,我们可把就绪队列按照进程到达就绪队列的类型和进程被阻塞时的阻塞原因分成不同的就绪队列,每个队列按FCFS原则排列,各队列之间的进程享有不同的优先级,但同一队列内优先级相同。这样,当一个进程在执行完它的时间片之后,或从睡眠中被唤醒以及被创建之后,将进入不同的就绪队列。
多级反馈队列
前面介绍的各种用作进程调度的算法都有一定的局限性。如短进程优先的调度算法,仅照顾了短进程而忽略了长进程,而且如果并未指明进程的长度,则短进程优先和基于进程长度的抢占式调度算法都将无法使用。
而多级反馈队列调度算法则不必事先知道各种进程所需的执行时间,而且还可以满足各种类型进程的需要,因而它是目前被公认的一种较好的进程调度算法。在采用多级反馈队列调度算法的系统中,调度算法的实施过程如下所述。
(1) 应设置多个就绪队列,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,其余各队列的优先权逐个降低。该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先权愈高的队列中,为每个进程所规定的执行时间片就愈小。例如,第二个队列的时间片要比第一个队列的时间片长一倍,……,第i+1个队列的时间片要比第i个队列的时间片长一倍。
(2) 当一个新进程进入内存后,首先将它放入第一队列的末尾,按FCFS原则排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地按FCFS原则等待调度执行;如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列,……,如此下去,当一个长作业(进程)从第一队列依次降到第n队列后,在第n 队列便采取按时间片轮转的方式运行。
(3) 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。如果处理机正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的处理机,即由调度程序把正在运行的进程放回到第i队列的末尾,把处理机分配给新到的高优先权进程。
1.3进程三状态图
在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行和阻塞
就绪(Ready)状态
当进程已分配到除CPU以外的所有必要的资源,只要获得处理机便可立即执行,这时的进程状态称为就绪状态。
执行/运行(Running)状态当进程已获得处理机,其程序正在处理机上执行,此时的进程状态称为执行状态。
阻塞(Blocked)状态正在执行的进程,由于等待某个事件发生而无法执行时,便放弃处理机而处于阻塞状态。引起进程阻塞的事件可有多种,例如,等待I/O完成、申请缓冲区不能满足、等待信件(信号)等。
2.并行和并发
无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务
并行:同时运行,只有具备多个cpu才能实现并行
并发:是伪并行,即看起来是同时运行 , 实际上是交替运行 , 只不过切换的时间超快 , 用户感知上就行同时在运行似的。单个cpu+多道技术就可以实现并发
3.同步异步阻塞非阻塞
-
同步异步
""" 描述的是任务(代码层面就是函数)的提交方式 """ 同步:任务提交之后,原地等待任务的返回结果,等待的过程中不做任何事(干等) 程序层面上表现出来的感觉就是卡住了 异步:任务提交之后,不原地等待任务的返回结果,直接去做其他事情 我提交的任务结果如何获取?任务的返回结果会有一个异步回调机制自动处理
-
阻塞和非阻塞
""" 描述的是程序的运行状态 """阻塞 : 阻塞态非阻塞 : 就绪态 , 运行态
上述概念的组合最高效的一种组合就是异步非阻塞
4.multiprocessing模块
multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。
了解知识
"""进程的创建"""1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的。"""进程的结束"""1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)2. 出错退出(自愿,python a.py中a.py不存在)3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)4. 被其他进程杀死(非自愿,如kill -9)
4.1创建进程的两种方式
在一个python进程中开启子进程
import timefrom multiprocessing import Processdef f(name): print('hello', name) print('我是子进程') # py代码创建的都是子进程,下面我们就直接称之为进程if __name__ == '__main__': # 实例化一个对象 p = Process(target=f, args=('bob',)) # target是任务函数, args是传进函数的参数, 必须是元祖类型 # 开启一个进程 , p.start() # 应用程序告诉操作系统创建一个进程 异步 time.sleep(1) print('执行主进程的内容了')
关于multiprocessing模块的其他类
multiprocessing.Process(target, args, kwargs) 创建并返回一个进程对象 processobject multiprocessing.active_children() 查看当前进程下的所有子进程对象,以列表形式返回 [processObject, ...] multiprocessing.current_process() 获取当前的进程对象 processObject
关于p(进程对象)的其他属性和方法
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置p.name: 进程的名称p.pid: 进程的pidp.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p.close():关闭进程p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁p.kill():终止进程p.is_alive():如果p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
小细节 : 以方法返回值或变量值为bool值的方法命名和变量的命名都以is_开头
os模块关于进程的方法os.getpid() 返回当前进程pid intos.getppid() 返回当前进程的父进程的pid int
补充 :
进程的创建是异步的 , 两个进程之间不会相互等待 , 而是在各自的内存空间中有着各自的代码在执行 , 存在的联系是整个py文件是主进程 , p.start()开启的是一个子进程在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。即在windows创建进程类似导入模块的形式 , 在linux中是copy一份源代码 , 所以在windows下必须写在if __name__ ==‘__main__’下面
另外一种开启进程的方式 通过继承Process类开启进程(用的比较少,了解即可)
import osfrom multiprocessing import Processclass MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name)p1=MyProcess('wupeiqi')p2=MyProcess('yuanhao')p3=MyProcess('nezha')p1.start() #start会自动调用runp2.start()# p2.run()p3.start()p1.join()p2.join()p3.join()print('主线程')
小总结
""" 创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去一个进程对应在内存中就是一块独立的内存空间多个进程对应在内存中就是多块独立的内存空间进程与进程之间数据默认情况下是无法直接交互,如果想交互可以借助于第三方工具、模块进程的执行顺序不是根据启动顺序决定的 , 每个都是相互独立同时运行的"""
简单小需求 , 主进程等待多个子进程结束才结束----> join()
方法
import timefrom multiprocessing import Processdef f(x): print('hello', x) time.sleep(1)if __name__ == '__main__': p_lst = [] # 先5个进程全起来了 for i in range(5): p = Process(target=f, args=(i,)) p.start() p_lst.append(p) # 然后主进程等待所有子进程结束 for obj in p_lst: obj.join() print("主进程结束")
4.2进程之间数据隔离
from multiprocessing import Processn = 100def work(): global n n=10 print('子进程内: ',n)if __name__ == '__main__': p=Process(target=work) p.start() p.join() print('主进程内: ',n) 子进程内: 10主进程内: 100
4.3守护进程
子进程守护着主进程结束 , 他才自己跟着主进程的结束而结束
后代默认在跑的进程都是守护进程 , 守护着操作系统 , 系统起来他就起来了, 系统关闭他们也就才结束
主进程创建守护进程其一:守护进程会在主进程代码执行结束后就终止其二:守护进程内无法再开启子进程,否则抛出异常
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
怎么写代码验证呢?
from multiprocessing import Processdef foo(): print(123) time.sleep(1) print("end123")def bar(): print(456) time.sleep(3) print("end456")p1=Process(target=foo)p2=Process(target=bar)p1.daemon=True # 这句话一定要放在start()上面否在会报错p1.start()p2.start()time.sleep(0.1)print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止"""会打印123 p1进程没有来的急打印end123,是因为他是守护进程,主进程结束了,及时他还有代码没有执行,也要结束456main-------end456 p2进程不会因为主进程的结束就结束,所以会打印end456"""
4.4僵尸进程和孤儿进程(了解)
僵尸进程
""" 僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。详解如下我们知道在unix/linux中,正常情况下子进程是通过父进程创建的,子进程在创建新的进程。子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程到底什么时候结束,如果子进程一结束就立刻回收其全部资源,那么在父进程内将无法获取子进程的状态信息。因此,UNⅨ提供了一种机制可以保证父进程可以在任意时刻获取子进程结束时的状态信息:1、在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。但是仍然为其保留一定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等)2、直到父进程通过wait / waitpid来取时才释放. 但这样就导致了问题,如果进程不调用wait / waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。 任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。 如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。"""
import timefrom multiprocessing import Processdef f(): print('hello world') time.sleep(3) print('get out')if __name__ == '__main__': p = Process(target=f) p.start() print('主进程结束') # 这里我们的主进程会等待子进程结束才结束 , 不是说进程是相互独立的吗 , 我结束了干嘛还要等你???# 因为主进程进入了一段时间的僵尸进程
孤儿进程
""" 孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。 孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。子进程存活,父进程意外死亡 操作系统会开设一个"儿童福利院"专门管理孤儿进程回收相关资源"""
5.锁(互斥锁)
multiprocessing模块中锁的接口和使用与threading中锁的接口和使用一致
通过模拟买票的过程 , 引出来锁的好处 , 首先分析买票的过程 , 第一步你会发一个请求查看当前剩余票数 , 然后看到有剩余车票 , 第二步你会再发一个请求购买车票 , ok , 简单代码模拟
# 没有加锁# 文件db的内容为:{"count":1}# 注意一定要用双引号,不然json无法识别# 并发运行,效率高,但竞争写同一文件,数据写入错乱from multiprocessing import Process, Lockimport time, json, randomdef search(i): """查票函数""" dic = json.load(open('db')) print('用户%s 剩余票数%s' % (i, dic.get('count')))def get(i): """买票函数,注意:买票是有两步的,一定是先查看,再买票""" # 先查票 dic = json.load(open('db')) # 模拟读数据的网络延迟 time.sleep(random.randint(1, 3)) # 查出来如果有票 if dic.get('count') > 0: # 修改数据库 买票 dic['count'] -= 1 time.sleep(0.2) # 模拟写数据的网络延迟 json.dump(dic, open('db', 'w')) print('用户%s购票成功' % (i))def task(i): search(i) get(i)if __name__ == '__main__': for i in range(10): # 模拟并发100个客户端抢票 p = Process(target=task, args=(i,)) p.start() # 10个用户,但是只有1张票,最后却10个用户都买到票了,这就很离谱,原因是在别人# 买票的时候,其他也可以查票, 能不能买到票的标准是当前查票的时候有没有票,有就可以买到,没有就没不到
多个进程操作同一份数据的时候, 会出现数据错乱的问题 , 针对上述问题 , 解决方式就是加锁处理 , 将并发变成串行, 牺牲效率但是保证数据的安全
# 加锁版本# 没有加锁# 文件db的内容为:{"count":1}# 注意一定要用双引号,不然json无法识别# 并发运行,效率高,但竞争写同一文件,数据写入错乱from multiprocessing import Process, Lockimport time, json, randomdef search(i): """查票函数""" dic = json.load(open('db')) print('用户%s 剩余票数%s' % (i, dic.get('count')))def buy(i): """买票函数,注意:买票是有两步的,一定是先查看,再买票""" # 先查票 dic = json.load(open('db')) # 模拟读数据的网络延迟 time.sleep(random.randint(1, 3)) # 查出来如果有票 if dic.get('count') > 0: # 修改数据库 买票 dic['count'] -= 1 time.sleep(0.2) # 模拟写数据的网络延迟 json.dump(dic, open('db', 'w')) print('用户%s购票成功' % (i)) else: print('用户%s购票失败' % (i))def task(i,mutex): search(i) # 只需要在买票前后加锁释放放 # mutex.acquire() # buy(i) # mutex.release() # 简写 with mutex: buy(i)if __name__ == '__main__': mutex = Lock() for i in range(10): # 模拟并发100个客户端抢票 p = Process(target=task, args=(i,mutex)) p.start()
"""注意1.锁不要轻易的使用,容易造成死锁现象(我们写代码一般不会用到,都是内部封装好的)2.锁只在处理数据的部分加来保证数据安全(只在争抢数据的环节加锁处理即可"""
6.进程间通信
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
6.1队列Queue模块
队列底层是管道+锁
实现的
"""队列 : 先进先出 堆栈 : 先进后厨"""
在python中提供了一个内置模块 , 叫做Queue
, 他能够在内存中维护一个队列 , 支持往这个队列里存取数据
from multiprocessing import Queue # 另一种导入方式from queue import Queue# 创建一个队列q = Queue(5) # 括号内可以传数字 标示着生成的队列最大可以同时存放的数据量(默认不写,则无大小限制)# 往队列里中存数据q.put(111)q.put(222)q.put(333)q.put(444)q.put(555)# q.put(666) # 当队列数据放满了之后 如果还有数据要放程序会阻塞 直到有位置让出 不会报错"""存取数据存 是为了更好的取千方百计的存、简单快捷的取"""# 取数据v1 = q.get()v2 = q.get()v3 = q.get()# print(q.full()) # 判断当前队列是否满了# print(q.empty()) # 判断当前队列是否空了v4 = q.get()v5 = q.get()v6 = q.get() # 队列中如果已经没有数据的话 get方法会原地阻塞# v6 = q.get_nowait() # 没有数据直接报错 queue. Empty# V6 = q.get(timeout=3) # 没有数据之后原地等待三秒之后再报错 queue. Emptyprint(v1, v2, v3, v4, v5, v6)"""q.full()q.empty()q.get_nowait在多进程的情况下是不精确 因为当前进程过来是空的,可能下一秒就有一个进程往里面put了"""
主要方法 :
q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常. q.get_nowait():同q.get(False)q.put_nowait():同q.put(False)q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
应用 :
主进程和子进程之间通信
from multiprocessing import Process, Queuedef task(q): q.put('给主进程发个消息: 你好啊')if __name__ == '__main__': q = Queue() p = Process(target=task, args=(q,)) p.start() print(q.get()) # get自带等待,所以不需要写p.join()
子进程与子进程之间通信
from multiprocessing import Process, Queuedef task1(q): q.put('给子进程p1发个消息: 你好啊')def task2(q): print(q.get())if __name__ == '__main__': q = Queue() p1 = Process(target=task1, args=(q,)) p2 = Process(target=task2, args=(q,)) p1.start() p2.start()
6.2管道(了解即可)
#创建管道的类:Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道#参数介绍:dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。#主要方法: conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。 conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象 #其他方法:conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法conn1.fileno():返回连接使用的整数文件描述符conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常
基于管道实现进程间通信
from multiprocessing import Process,Pipeimport time,osdef consumer(p,name): left,right=p left.close() while True: try: baozi=right.recv() print('%s 收到包子:%s' %(name,baozi)) except EOFError: right.close() breakdef producer(seq,p): left,right=p right.close() for i in seq: left.send(i) # time.sleep(1) else: left.close()if __name__ == '__main__': left,right=Pipe() c1=Process(target=consumer,args=((left,right),'c1')) c1.start() seq=(i for i in range(10)) producer(seq,(left,right)) right.close() left.close() c1.join() print('主进程')
管道可以用于双向通信,利用通常在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序
7.生产者消费者模型
"""生产者 : 生产/制造东西的消费者 : 消费/处理东西的该模型除了上述两个东西 , 还需要一个媒介 ,生活中的例子做包子的将包子做好后放在蒸笼(媒介)里面,买包子的取蒸笼里面拿 ,厨师做菜做完之后用盘子装着给你消费者端过去 生产者和消费者之间不是直接做交互的,而是借助于媒介做交互"""生产者(做包子的)+消息队列(蒸笼)+消费者(吃包子的)
使用代码模拟实现
from multiprocessing import Process, Queueimport random, time# 生产者def producer(name, q): for i in range(1, 10): # 生产包子 data = f"{name}生产的包子{i}" # 模拟延迟 time.sleep(random.randint(1,3)) print(data) # 把包子放进蒸笼 q.put(data)# 消费者def consumer(name, q): # 顾客从蒸笼里取包子 while 1: food = q.get() # 模拟吃的时间 time.sleep(random.randint(1,3)) print(f"{name}吃了 {food}")if __name__ == '__main__': # 模拟蒸笼 q = Queue() p1 = Process(target=producer, args=('厨师A', q,)) p2 = Process(target=consumer, args=('顾客A', q,)) p1.start() p2.start() print("主进程")"""上述代码消费者会阻塞住, 因为q为空, q.get()会阻塞解决方法:1.生产者生产完了,放入结束标识符,None,当消费者拿到None,就结束消费2.使用 JoinableQueue() 队列"""
解决代码1:
from multiprocessing import Process, Queueimport random, time# 生产者def producer(name, q): for i in range(1, 10): # 生产包子 data = f"{name}生产的包子{i}" # 模拟延迟 time.sleep(random.randint(1,3)) print(data) # 把包子放进蒸笼 q.put(data) # 也可以在生产者里面q.put(None)都一样# 消费者def consumer(name, q): # 顾客从蒸笼里取包子 while 1: food = q.get() if not food:break # 模拟吃的时间 time.sleep(random.randint(1,3)) print(f"{name}吃了 {food}")if __name__ == '__main__': # 模拟蒸笼 q = Queue() p1 = Process(target=producer, args=('厨师A', q,)) p2 = Process(target=consumer, args=('顾客A', q,)) p1.start() p2.start() # 主进程等待p1结束,然后放个None p1.join() q.put(None) print("主进程")# 遇见多个生产者就主进程统一等待子进程结束,队列中追加相同个数的None
解决代码2:
from multiprocessing import Process, Queue, JoinableQueueimport random, timefrom multiprocessing import Process, Queue, JoinableQueueimport random, time# 生产者def producer(name, q): for i in range(1, 10): # 生产包子 data = f"{name}生产的包子{i}" # 模拟延迟 time.sleep(random.randint(1, 3)) print(data) # 把包子放进蒸笼 q.put(data)# 消费者def consumer(name, q): # 顾客从蒸笼里取包子 while 1: food = q.get() # 模拟吃的时间 time.sleep(random.randint(1, 3)) print(f"{name}吃了 {food}") q.task_done() # 告诉队列你已经从里面取出数据并且处理完毕了if __name__ == '__main__': # 模拟蒸笼 q = JoinableQueue() p1 = Process(target=producer, args=('厨师A', q,)) c1 = Process(target=consumer, args=('顾客A', q,)) # 设置c1为守护进程 c1.daemon = True # 开启进程 p1.start() c1.start() # 主进程等待子进程 p1.join() q.join() # 等待队列中所有的数据被取完再执行往下执行代码"""joinablequeue每当你往该队列中存入数据的时候内部会有一个计数器+1没当你调用task_done的时候计数器-1join()当计数器为0的时候才往后运行"""# 只要q.join()执行完毕 就说明消费者已经处理完数据了 消费者(子进程)就没有存在的必要了
生产者消费者模型可应用于网络爬虫 , 爬虫是生产者,制造数据 , 数据处理是消费者对数据进行消费