在python程序中的进程操作
之前我们已经了解了很多进程相关的理论知识,了解进程是什么应该不再困难了,刚刚我们已经了解了,运行中的程序就是一个进程。所有的进程都是通过它的父进程来创建的。因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。多个进程可以实现并发效果,也就是说,当我们的程序中存在多个进程的时候,在某些时候,就会让程序的执行速度变快。以我们之前所学的知识,并不能实现创建进程这个功能,所以我们就需要借助python中强大的模块。
multiprocess模块
仔细说来,multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。
multiprocess.process模块
process模块介绍
process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) 强调: 1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号 参数介绍: 1 group参数未使用,值始终为None 2 target表示调用对象,即子进程要执行的任务 3 args表示调用对象的位置参数元组,args=(1,2,'egon',) 4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} 5 name为子进程的名称
方法介绍:
p.start():启动进程,并调用该子进程中的p.run() p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 p.is_alive():如果p仍然运行,返回True p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
1 p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 2 p.name:进程的名称 3 p.pid:进程的pid 4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。
使用process模块创建进程
在一个python进程中开启子进程,start方法和并发效果。
import time from multiprocessing import Process def f(name): print('hello', name) print('我是子进程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() time.sleep(1) print('执行主进程的内容了')5
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) print('我是子进程') if __name__ == '__main__': p = Process(target=f, args=('bob',)) p.start() #p.join() print('我是父进程')
import os from multiprocessing import Process def f(x): print('子进程id :',os.getpid(),'父进程id :',os.getppid()) return x*x if __name__ == '__main__': print('主进程id :', os.getpid()) p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start()
进程的创建
import os import time from multiprocessing import Process # 进程模块 def func(): time.sleep(2) print('in func',os.getpid(),os.getppid()) #getpid 子进程的pid getppid 父进程的pid if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func) # 进程对象 p1.start() # 向操作系统提交了一个开启子进程的申请 p2 = Process(target=func) # 进程对象 p2.start() # 向操作系统提交了一个开启子进程的申请 print('主进程 的 代码执行结束了') print结果 : 三个程序独立 谁先执行完毕就先执行谁 # in main 6560 5364 # 主进程 的代码执行结束了 # in func 6560 6560 子程序1 # in func 6560 6560 子程序2 执行子程序的时候相当于又创建了个空间,并且引入了模块里的内容,当执行子程序的时候又创建了个空间,加if __name__ == '__main__':就是为了终止循环.当在本模块里时,__name__ == '__main__',当被引用到另一个模块时,__name__ ==被引用模块的名字,条件改变了,所以终止循环 原理 if __name__ == '__main__': 使用python都是调用操作系统的命令来启动进程 同样使用python 不同的操作系统的操作是不同的 对于windows来说 必要加if __name__ == '__main__': 对于linux ios来说 不必要加if __name__ == '__main__':
给子进程传参数
import os import time from multiprocessing import Process # 进程模块 def func(num): time.sleep(2) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func,args=(1,)) # 进程对象 p1.start() # 向操作系统提交了一个开启子进程的申请 p2 = Process(target=func,args=(2,)) # 进程对象 p2.start() # 向操作系统提交了一个开启子进程的申请 print('主进程 的 代码执行结束了')
开启多个子进程
import os import time from multiprocessing import Process # 进程模块 def func(num): print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) for i in range(10): p = Process(target=func,args=(i,)) # args表示调用对象的位置参数元组 p.start() # start不是运行一个程序,而是调用操作系统的命令,要创建子进程 print('主进程 的 代码执行结束了')
join方法: 阻塞,直到p这个子进程执行完毕之后再继续执行
import os import time from multiprocessing import Process # 进程模块 def func(num): time.sleep(1) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p = Process(target=func,args=(1,)) p.start() # start不是运行一个程序,而是调用操作系统的命令,要创建子进程 p.join() # 阻塞,直到p这个子进程执行完毕之后再继续执行 print('主进程 的 代码执行结束了')
一批任务使用join
import os import time from multiprocessing import Process # 进程模块 def func(num): print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p_l = [] for i in range(10): p = Process(target=func,args=(i,)) p.start() # start不是运行一个程序,而是调用操作系统的命令,要创建子进程,非阻塞 p_l.append(p) print(p_l) for p in p_l : p.join() # 阻塞,直到p这个子进程执行完毕之后再继续执行 print('主进程 的 代码执行结束了')
is_alive(查看子进程是否活着)
terminate(强制结束一个正在运行的进程)--非阻塞
import os import time from multiprocessing import Process # 进程模块 def func(num): time.sleep(2) print('in func',num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main',os.getpid(),os.getppid()) p1 = Process(target=func,args=(1,)) # 进程对象 p1.start() # 向操作系统提交了一个开启子进程的申请 print(p1.is_alive()) # 检测进程是否在执行任务 p1.terminate() # 强制结束子进程 - 非阻塞 print(p1.is_alive()) # 检测进程是否在执行任务 print('主进程 的 代码执行结束了')
面向对象的方式开启子进程
import os import time from multiprocessing import Process # 进程模块 class MyProcess(Process): def __init__(self,num): #如果传参需要:自定义__init__,需要执行父类的__init__方法 super().__init__() self.num = num def run(self): #重写 run方法 print('in run ',self.num,os.getpid(),os.getppid()) if __name__ == '__main__': print('in main ', os.getpid(), os.getppid()) p = MyProcess(1) p.start()
进阶,多个进程同时运行(注意,子进程的执行顺序不是根据启动顺序决定的)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start() p_lst.append(p)
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start() p_lst.append(p) p.join() # [p.join() for p in p_lst] print('父进程在执行')
import time from multiprocessing import Process def f(name): print('hello', name) time.sleep(1) if __name__ == '__main__': p_lst = [] for i in range(5): p = Process(target=f, args=(i,)) p.start() p_lst.append(p) # [p.join() for p in p_lst] print('父进程在执行')
除了上面这些开启进程的方法,还有一种以继承Process类的形式开启进程的方式
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,name): super().__init__() self.name=name def run(self): print(os.getpid()) print('%s 正在和女主播聊天' %self.name) p1=MyProcess('zhangsan') p2=MyProcess('lisi') p3=MyProcess('wangwu') p1.start() #start会自动调用run p2.start() # p2.run() p3.start() p1.join() p2.join() p3.join() print('主线程')
进程之间的数据隔离问题
from multiprocessing import Process def work(): global n n=0 print('子进程内: ',n) if __name__ == '__main__': n = 100 p=Process(target=work) p.start() print('主进程内: ',n)
注意:
进程与进程之间的内存中的数据是隔离的,内存空间是不能共享的
所以要想进行通信,必须借助其他手段,且这两个进程都是自愿的
子进程的执行结果父进程获取不到
父进程依赖子进程的执行结果呢
父进程如何获取子进程的执行结果???
父子进程之间通过socket通信
守护进程
会随着主进程的结束而结束。
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
import os import time from multiprocessing import Process class Myprocess(Process): def __init__(self,person): super().__init__() self.person = person def run(self): print(os.getpid(),self.name) print('%s正在和女主播聊天' %self.person) p=Myprocess('哪吒') p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行 p.start() time.sleep(10) # 在sleep时查看进程id对应的进程ps -ef|grep id print('主')
from multiprocessing import Process def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() time.sleep(0.1) print("main-------")#打印该行则主进程代码结束,则守护进程p1应该被终止.#可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止.
import time from multiprocessing import Process def func1(): print('begin') time.sleep(3) print('wahaha') if __name__ == '__main__': p = Process(target=func1) p.daemon = True # 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程,设置守护进程的操作应该在开启子进程之前 p.start() time.sleep(1) print('主进程') print 结果: # begin # 主进程 原因:因为 p = Process(target=func1),而p.daemon = True 创建了守护进程,主进程执行完,就不执行wahaha,守护进程设置func1,主进程代码执行完毕后,func1代码就会结束执行
import time from multiprocessing import Process def func1(): print('begin') time.sleep(3) print('wahaha') def func2(): while True: print('in func2') time.sleep(0.5) if __name__ == '__main__': Process(target=func1).start() p = Process(target=func2) p.daemon = True # 守护进程的属性,默认是False,如果设置成True,就表示设置这个子进程为一个守护进程 # 设置守护进程的操作应该在开启子进程之前 p.start() time.sleep(1) print('主进程') print结果: begin in func2 in func2 主进程 wahaha 原因: p = Process(target=func2),而p.daemon = True守护进程func2跟func1没有关系,所以会执行wahaha,守护进程设置谁,主进程代码执行完毕后,谁就会结束执行
总结:
python - multiprocessing Process 类 - 创建子进程 操作系统的差别 windows 开启子进程的代码必须写在if __name__ == '__main__'下面 start: 只是向操作系统发出指令,创建进程 创建进程有一定的时间开销 子进程和主进程的执行互不干扰,子进程和主进程是异步的 主进程会等待子进程的结束再结束 如果 主进程需要在子进程都运行结束之后再做某件事情 : join 复杂的计算 主进程如何拿到子进程计算的结果 ????? isalive 查看子进程是否活着 terminate强制结束一个正在运行的进程 - 非阻塞 使用面向对象的方式开启一个子进程 继承Process类 重写run方法 如果要传递参数 : 自定义__init__,需要执行父类的__init__方法
多进程启动tcp协议的socket来完成并发
import socket from multiprocessing import Process def talk(conn): try: while True: conn.send(b'hello') print(conn.recv(1024)) finally: conn.close() if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',9091)) sk.listen() try: while True: conn,addr = sk.accept() Process(target=talk,args=(conn,)).start() finally: sk.close()
import socket import os sk = socket.socket() sk.connect(('127.0.0.1',9091)) while True: print(sk.recv(1024)) sk.send(str(os.getpid()).encode('utf-8'))
socket聊天并发实例
from socket import * from multiprocessing import Process server=socket(AF_INET,SOCK_STREAM) server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) server.bind(('127.0.0.1',8080)) server.listen(5) def talk(conn,client_addr): while True: try: msg=conn.recv(1024) if not msg:break conn.send(msg.upper()) except Exception: break if __name__ == '__main__': #windows下start进程一定要写到这下面 while True: conn,client_addr=server.accept() p=Process(target=talk,args=(conn,client_addr)) p.start()
from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8080)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) msg=client.recv(1024) print(msg.decode('utf-8'))
多进程中的其他方法
from multiprocessing import Process import time import random class Myprocess(Process): def __init__(self,person): self.name=person super().__init__() def run(self): print('%s正在和网红脸聊天' %self.name) time.sleep(random.randrange(1,5)) print('%s还在和网红脸聊天' %self.name) p1=Myprocess('哪吒') p1.start() p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活 print(p1.is_alive()) #结果为True print('开始') print(p1.is_alive()) #结果为False
1 class Myprocess(Process): 2 def __init__(self,person): 3 self.name=person # name属性是Process中的属性,标示进程的名字 4 super().__init__() # 执行父类的初始化方法会覆盖name属性 5 #self.name = person # 在这里设置就可以修改进程名字了 6 #self.person = person #如果不想覆盖进程名,就修改属性名称就可以了 7 def run(self): 8 print('%s正在和网红脸聊天' %self.name) 9 # print('%s正在和网红脸聊天' %self.person) 10 time.sleep(random.randrange(1,5)) 11 print('%s正在和网红脸聊天' %self.name) 12 # print('%s正在和网红脸聊天' %self.person) 13 14 15 p1=Myprocess('哪吒') 16 p1.start() 17 print(p1.pid) #可以查看子进程的进程id
进程同步(multiprocess.Lock)
锁 —— multiprocess.Lock (互斥锁)
通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,他们之间的运行没有顺序,一旦开启也不受我们控制。尽管并发编程让我们能更加充分的利用IO资源,但是也给我们带来了新的问题。
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
import os import time import random from multiprocessing import Process def work(n): print('%s: %s is running' %(n,os.getpid())) time.sleep(random.random()) print('%s:%s is done' %(n,os.getpid())) if __name__ == '__main__': for i in range(3): p=Process(target=work,args=(i,)) p.start()
# 由并发变成了串行,牺牲了运行效率,但避免了竞争 import os import time import random from multiprocessing import Process,Lock def work(lock,n): lock.acquire() print('%s: %s is running' % (n, os.getpid())) time.sleep(random.random()) print('%s: %s is done' % (n, os.getpid())) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,i)) p.start()
上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。
接下来,我们以模拟抢票为例,来看看数据安全的重要性。
#文件db的内容为:{"count":1} #注意一定要用双引号,不然json无法识别 #并发运行,效率高,但竞争写同一文件,数据写入错乱 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db')) print('