多进程multiprocessing
multiprocessing
is a package that supports spawning processes using an API similar to the threading
module. The multiprocessing
package offers both local and remote concurrency(并发), effectively side-stepping(有效的避过) the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage(充分利用) multiple processors on a given machine. It runs on both Unix and Windows.
多进程的用法和多线程差不多
import multiprocessing def run(): print(__name__) print("prcess is running") if __name__ == "__main__": #On Windows the subprocesses will import (i.e. execute) the main module at start. You need to protect the main code like this to avoid creating subprocesses recursively p = multiprocessing.Process(target=run) p.start() 结果: __mp_main__ prcess is running
To show the individual process IDs involved, here is an expanded example:
import multiprocessing,threading import os def info(title): print(title) print("module name:",__name__) print("parent process:",os.getppid()) #父进程 print("process id:",os.getpid()) #当前进程id print(threading.current_thread()) #打印当前线程id def f(name): info("33[31;1m function f33[0m") print("hello",name) if __name__ == "__main__": info("33[32;1mmain process line33[0m") p = multiprocessing.Process(target=f,args=("lxj",)) p.start() multiprocessing.Queue() 结果: main process line module name: __main__ parent process: 7388 process id: 13352 <_MainThread(MainThread, started 5828)> function f module name: __mp_main__ parent process: 13352 process id: 608 <_MainThread(MainThread, started 2948)> hello lxj
我们直接运行info,也有主进程id,那这个主进程pid是怎么产生的呢?查看windows任务管理器,可以看到,7388pid号即pycharm
这里验证了我们之前进程是由线程创建的,我们称为主线程。
进程间通信
先来看下线程间通讯
#子线程共享所属进程的内存空间 def f(): q.put(1) if __name__ == "__main__": q = queue.Queue() #线程的队列 p = threading.Thread(target=f) p.start() print(q.get()) p.join() 结果: 1
j
进程间是不共享内存空间的,看下进程间是如何通信的
Queue
from multiprocessing import Queue,Process import threading import queue #换成多进程 def f(): q.put(1) if __name__ == "__main__": q = queue.Queue() p = Process(target=f) p.start() print(q.get()) p.join() 结果: 报错 NameError: name 'q' is not defined 因为进程间的内存是不共享的 对上面的程序做修改 def f(q): q.put(1) if __name__ == "__main__": q = Queue() #换成进程间的Queue p = Process(target=f,args=(q,)) #把q传到子进程 p.start() print(q.get()) p.join() 结果: 1
进程间通讯Pipe
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process,Pipe import os def f(conn): conn.send([1,2,3]) data = conn.recv() print(os.getpid(),os.getppid(),data) if __name__ == "__main__": parent_conn,child_conn = Pipe() #两端数据 p = Process(target=f,args=(parent_conn,)) #这里传入一端,随便哪个都行 p.start() data = child_conn.recv() #使用另外一个来接收,有点类似socket客户端服务器模型,可以发多条,收多条 print(os.getpid(), os.getppid(), data) child_conn.send(["asd"]) p.join() 结果: 5176 11420 [1, 2, 3] 7304 5176 ['asd']
Queue和Pipe是实现进程间数据的传递,那么如何能使数据共享呢?答案使使用Mangers
A manager object returned by Manager()
controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager()
will support types list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Barrier
, Queue
, Value
and Array
. For example
from multiprocessing import Process,Manager import os def func(d,l): d[os.getpid()] = os.getppid() l.append(os.getpid()) if __name__ == "__main__": with Manager() as manger: d = manger.dict() #生成一个字典 l = manger.list(range(3)) p_list = [] for i in range(10): p = Process(target=func,args=(d,l)) p.start() p_list.append(p) for p in p_list: #等待进程结束 p.join() print(d) print(l) 结果: {1232: 4656, 13656: 4656, 11668: 4656, 9368: 4656, 2516: 4656, 13772: 4656, 12316: 4656, 6764: 4656, 11788: 4656, 8756: 4656} [0, 1, 2, 2516, 11668, 12316, 9368, 13656, 13772, 11788, 6764, 8756, 1232] 可以发现实现了进程间的数据共享
进程同步
当要用到同一块屏幕时,需要用到锁
Without using the lock output from the different processes is liable to get all mixed up.
from multiprocessing import Process, Lock def f(l, i): l.acquire() try: print('hello world', i) finally: l.release() if __name__ == '__main__': lock = Lock() for num in range(10): Process(target=f, args=(lock, num)).start()
进程池
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。
进程池中有两个方法:
- apply
- apply_async
apply(同步)用法
from multiprocessing import Process,Pool import os,time def foo(i): time.sleep(1) print("in process",os.getpid()) return i +100 if __name__ == "__main__": pool = Pool(5) for i in range(10): pool.apply(func=foo,args=(i,)) print("end") pool.close() pool.join() 结果:是串行的执行方式
apply_async(异步)用法
from multiprocessing import Process,Pool import os,time def foo(i): time.sleep(1) print("in process",os.getpid()) return i +100 if __name__ == "__main__": pool = Pool(5) for i in range(10): # pool.apply(func=foo,args=(i,)) pool.apply_async(func=foo,args=(i,)) pool.close() pool.join() #这里必须先close再join(可以看源码),如果这里只close,不join则主进程不会等待进程池执行完,会看到结果直接打印end print("end") #结果会看到5个进程,5个进程的执行效果
from multiprocessing import Process,Pool import os,time def foo(i): time.sleep(1) print("in process",i+100,os.getpid()) return i +100 def bar(arg): #arg参数为foo返回的结果 print("-->exex done:",arg,os.getpid()) if __name__ == "__main__": pool = Pool(5) print(os.getpid()) for i in range(10): # pool.apply(func=foo,args=(i,)) pool.apply_async(func=foo,args=(i,),callback=bar) #回调函数,当foo函数执行完成,执行回调函数bar pool.close() pool.join() #这里必须先close再join(可以看源码),如果这里close,不join则主进程不会等待进程池执行完 print("end") #回调函数的用法 结果: 12328 in process 100 4160 -->exex done: 100 12328 in process 101 1304 -->exex done: 101 12328 in process 102 1988 -->exex done: 102 12328 in process 103 4744 -->exex done: 103 12328 in process 104 9460 -->exex done: 104 12328 in process 105 4160 -->exex done: 105 12328 in process 106 1304 -->exex done: 106 12328 in process 107 1988 -->exex done: 107 12328 in process 108 4744 -->exex done: 108 12328 in process 109 9460 -->exex done: 109 12328 end 每次执行的结果都不一样,上面的结果我们可以得到回调函数是主进程执行的