进程的创建和结束:
multiprocess模块:
multiprocess不是一个模块而是python中一个操作、管理进程的包
分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动) 强调: 1. 需要使用关键字的方式来指定参数 2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号 参数介绍: 1 group参数未使用,值始终为None 2 target表示调用对象,即子进程要执行的任务 3 args表示调用对象的位置参数元组,args=(1,2,'egon',) 4 kwargs表示调用对象的字典,kwargs={'name':'egon','age':18} 5 name为子进程的名称
1 p.start():启动进程,并调用该子进程中的p.run() 2 p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 3 p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁 4 p.is_alive():如果p仍然运行,返回True 5 p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
1 p.daemon:守护进程 默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随 之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置 2 p.name:进程的名称 3 p.pid:进程的pid 4 p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可) 5 p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。所以必须把创建子进程的部分使用if __name__ ==‘__main__’ 判断保护起来,import 的时候 ,就不会递归运行了。
使用process模块创建进程
启动两个进程(可以用for循环启动多个进程)
from multiprocessing import Process def func(name): print(666) print('我是子进程', name) if __name__ == '__main__': p1 = Process(target=func, args=('alex',)) # 传的必须是元组,所以alex后面要逗号 p2 = Process(target=func, args=('bob',)) # 传的必须是元组,所以alex后面要逗号 p1.start() # 第一个进程 p2.start() # 第二个进程 print('我是主进程'))
join用法:
from multiprocessing import Process import time def func(a): time.sleep(2) print('子进程', a) if __name__ == '__main__': p = Process(target=func, args=(1,)) p.start() # 把自己程交给CPU,就开始执行下面的代码。非阻塞态不用等 p.join() # 等待子进程运行完后,父进程才运行,父进程要回收子进程内存空间 print('主进程')
查看子进程和父进程 id: 子进程:os.getpid 父进程:os.getppid
from multiprocessing import Process import time import os def func(a): time.sleep(2) print('子进程', os.getpid()) # os.getpid获得子进程ID print('父进程', os.getppid()) # os.getppid 获得主进程ID if __name__ == '__main__': print('主进程ID', os.getpid()) # os.getpid获得主进程ID p = Process(target=func, args=(1,)) p.start()
同时启动多个子进程:
from multiprocessing import Process import time import os def func(a): time.sleep(1) print('子进程ID', os.getpid()) if __name__ == '__main__': for i in range(5): # 启动五个子进程 p = Process(target=func, args=(1,)) p.start() print('主进程ID', os.getppid())
多个子进程中join的用法:
from multiprocessing import Process import os def func(a): print('子进程ID%s'% a, os.getpid()) if __name__ == '__main__': for i in range(5): # 启动五个子进程 p = Process(target=func, args=(i,)) p.start() # p.join() # 放这里会等待第一个子进程结束后,在执行下一个子进程。不能实现同时启动运行 p.join() #放这里,只会等待进程4结束后,就执行下面的程序。此时可能进程1,2,3并没有执行完。无法达到阻隔子进程的效果 print('主进程ID', os.getppid())
from multiprocessing import Process def func(a): time.sleep(0.3) print('子进程%s正在发发邮件' % a) if __name__ == '__main__': p_lis = [] for i in range(5): # 启动五个子进程 p = Process(target=func, args=(i,)) p.start() p_lis.append(p) for i in p_lis: p.join() #遍历列表,列表里面是每一个进程。都会被join阻隔 print('所有邮件都已经发出') # 等所有进程都给join阻隔完后,才运行
进程间的数据隔离:
进程间数据是相互隔离的
from multiprocessing import Process n = 100 def func(): global n for i in range(5): n = n - 1 print(n) # 每一个进程执行的结果都是 95,说明子进程间彼此隔离 if __name__ == '__main__': p_lst = [] for i in range(10): p = Process(target=func) p.start() p_lst.append(p) for p in p_lst: p.join() print(n) # 100
开启进程的另一种方式: 继承Process类
class Myprocess(Process): def run(self): print('子进程',os.getpid()) if __name__ == '__main__': p = Myprocess() p.start() print('主进程', os.getppid())
class Myprocess(Process): def __init__(self,arg): # 用于接收参数 super().__init__() # 必须执行父类的init方法 self.arg = arg def run(self): print('子进程',os.getpid(), self.arg) # 使用参数 if __name__ == '__main__': p = Myprocess('我是参数') # 实例化是传输参数 p.start() print('主进程', os.getppid())
多进程中的其他方法:
p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
print(p1.is_alive()) #判断进程是否存活,结果为True or False
from multiprocessing import Process import time class Myprocess(Process): def __init__(self,arg): super().__init__() self.arg = arg def run(self): print('子进程',os.getpid(), self.arg) if __name__ == '__main__': p = Myprocess('我是参数') p.start() print(p.is_alive()) # 判断进程是否存活,返回True,False print('主进程', os.getppid()) p.terminate() # 关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活 time.sleep(1) print(p.is_alive())
守护进程:
# 1.守护进程 会随着主进程代码的结束而结束
# 2.守护进程不会守护除了主进程代码之外的其他子进程
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
p.daemon = True # 定义p为守护进程
import time from multiprocessing import Process def guard(): while True: print('我是守护进程,正在工作') # 只守护主进程 time.sleep(1) def main_main(): print('我不知道自己是不是主进程,正在运行....') time.sleep(5) print('结束') def func(): print('我不知道自己是不是主进程') time.sleep(3) print('func结束了') if __name__ == '__main__': p = Process(target=guard) p.daemon = True # 定义一个守护进程 p.start() print('大结局') # 这也是主进程 # p2 = Process(target=func()) # p2.start() # 如果用Process启动,则是子进程 p1 = Process(target=main_main) # 非主进程 p1.start() func() # 这是主进程
多进程 实现socket tcp协议 server端的并发
import socket from multiprocessing import Process def func(conn): while True: conn.send(b'hello') if __name__ == '__main__': sk = socket.socket() sk.bind(('127.0.0.1',9000)) sk.listen() while True: conn,addr = sk.accept() p = Process(target=func,args = (conn,)) p.start()
import socket sk = socket.socket() sk.connect(('127.0.0.1',9000)) while True: msg = sk.recv(1024) print(msg)
进程同步: (multiprocess.Lock), 抢票例子
保证数据安全,会让程序重新变成串行,浪费时间
# 多个进程 抢占同一个数据资源 会造成 数据不安全
# 我们必须要牺牲效率来保证数据的安全性
import json import time from multiprocessing import Lock from multiprocessing import Process def search(name): with open('ticket') as f: ticket_count = json.load(f) if ticket_count['count'] >=1: print('%s : 有余票%s张'%(name,ticket_count['count'])) else: print('%s : 没票了'%name) def buy(name): with open('ticket') as f: ticket_count = json.load(f) time.sleep(0.2) if ticket_count['count'] >=1: print('有余票%s张'%ticket_count['count']) ticket_count['count'] -= 1 print('%s买到票了'%name) else: print('%s没买到票' % name) time.sleep(0.2) with open('ticket','w') as f: json.dump(ticket_count,f) def opt(lock,name): search(name) lock.acquire() # 拿走钥匙 buy(name) lock.release() # 归还钥匙 if __name__ == '__main__': lock = Lock() # 锁 互斥锁 for i in range(10): p = Process(target=opt,args = (lock,'alex'+str(i),)) p.start()
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.需要自己加锁处理 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
队列(multiprocess.Queue)
IPC: 进程间通信
q.get( [ block [ ,timeout ] ] )
返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。
q.put(item [, block [,timeout ] ] )
将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。
q.qsize()
返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。
q.close()
关闭队列,防止队列中加入更多数据。调用此方法时,后台线程将继续写入那些已入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将自动调用此方法。关闭队列不会在队列使用者中生成任何类型的数据结束信号或异常。例如,如果某个使用者正被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
生产者消费者模型:
import time from multiprocessing import Process, Queue def consumer(name,q): while True: food = q.get() if not food: break time.sleep(1) print('%s吃了一个%s' % (name, food)) def producer(q, food_name): for i in range(20): time.sleep(0.1) food='%s%s' % (food_name, i ) print('制造了%s' % food) q.put(food) if __name__ == '__main__': q = Queue(5) p1 = Process(target=consumer, args=('alex',q)) p2 = Process(target=consumer, args=('wusir',q)) p3 = Process(target=producer, args=(q, '泔水')) p1.start() p2.start() p3.start() p2.join() q.put(None) q.put(None)
数据共享 Manager
进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: #不加锁而操作共享的数据,肯定会出现数据错乱 d['count']-=1 if __name__ == '__main__': lock=Lock() with Manager() as m: dic=m.dict({'count':100}) p_l=[] for i in range(100): p=Process(target=work,args=(dic,lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic) Manager例子
进程池: multiprocessing.Pool
定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。
import os import time from multiprocessing import Pool def func(i): time.sleep(0.1) print(i,os.getpid()) if __name__ == '__main__': p = Pool(4) # cpu个数 + 1/cpu的个数 for i in range(10): p.apply_async(func,args=(i,)) # async异步的提交任务 p.close() # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务 p.join() # 阻塞,直到池子中的任务都执行完毕
起多进程的意义 # 1.为了更好的利用CPU,所以如果我们的程序中都是网络IO,文件IO就不适合起多进程 # 2.为了数据的隔离,如果我们的程序中总是要用到数据共享,那么就不适合使用多进程 # 3.超过了cpu个数的任务数,都应该使用进程池来解决问题,而不能无限的开启子进程
Pool([numprocess [,initializer [, initargs]]]):创建进程池
numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值 2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None 3 initargs:是要传给initializer的参数组
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。 p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执func(*args,**kwargs),然后返回结果。 p.close():关闭进程池,防止进一步操作 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
多进程与进程池性能测试:
import os import time from multiprocessing import Pool,Process def func(i): print(i,os.getpid()) if __name__ == '__main__': start = time.time() p_lst = [] for i in range(100): p = Process(target=func,args = (i,)) p.start() p_lst.append(p) for p in p_lst: p.join() end = time.time() pro_time = end-start start = time.time() p = Pool(4) for i in range(100): p.apply_async(func,args=(i,)) # async异步的提交任务 p.close() # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务 p.join() # 阻塞,直到池子中的任务都执行完毕 end = time.time() pool_time = end - start print(pro_time,pool_time)
同步和异步:
import os,time from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(3) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞 # 但不管该任务是否存在阻塞,同步调用都会在原地等着 print(res_l) 进程池的同步调用
import os import time import random from multiprocessing import Pool def work(n): print('%s run' %os.getpid()) time.sleep(random.random()) return n**2 if __name__ == '__main__': p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行 # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务 # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束 # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。 res_l.append(res) # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果 # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了 p.close() p.join() for res in res_l: print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get 进程池的异步调用
回调函数:
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
import time import random from multiprocessing import Process,Pool def get(i): # 进程池的子进程执行的 time.sleep(random.random()) print('从网页获取一个网页的内容', i) return i,'网页的内容'*i def call_back(content): # 主进程执行的 print(content) if __name__ == '__main__': # p = Process(target=get) # p.start() p = Pool(5) ret_l = [] # for i in range(10): # ret = p.apply_async(get,args=(i,)) # ret_l.append(ret) # for ret in ret_l: # content = ret.get() # print(len(content)) for i in range(10): p.apply_async(get,args=(i,),callback=call_back) p.close() p.join() # 将n个任务交给n个进程去执行 # 每一个进程在执行完毕之后会有一个返回值,这个返回值会直接交给callback参数指定的那个函数去进行处理 # 这样的话 所有的进程 哪一个执行的最快,哪一个就可以先进性统计工作 # 能在最短的时间内得到结果