一、多进程multiprocessing
multiprocessing包是Python中的多进程管理包。与threading.Thread类似,它可以利用multiprocessing.Process对象来创建一个进程。该Process对象与Thread对象的用法相同,也有start(), run(), join()的方法。
1 import multiprocessing,threading 2 import time 3 4 def thread_run(): 5 print(threading.get_ident()) 6 def run(name): 7 time.sleep(2) 8 print('Hello ',name) 9 t = threading.Thread(target=thread_run) 10 t.start() 11 12 13 if __name__== '__main__': 14 for i in range(10): 15 p = multiprocessing.Process(target=run, args=('bob_%s'%i,)) 16 p.start()
运行结果:
1 Hello bob_0 2 1144 3 Hello bob_8 4 1268 5 Hello bob_4 6 4360 7 Hello bob_2 8 768 9 Hello bob_6 10 5308 11 Hello bob_5 12 Hello bob_1 13 6076 14 5088 15 Hello bob_9 16 6104 17 Hello bob_3 18 5196 19 Hello bob_7 20 748
二、进程池
如果要创建多个进程,可以使用进程池,启用进程池需要使用Pool库,使用指令pool=Pool()可自动调用所有CPU,
map()函数相当于一个循环,将参数2中的列表元素逐次灌入参数1的函数中。
1 from multiprocessing import Pool 2 3 def squre(num): 4 return num ** 2 5 6 7 if __name__ == '__main__': 8 numbers = [0,1,2,3,4,5] 9 pool = Pool(processes=5)#进程池中最多能放入5个进程 10 print(pool.map(squre,numbers))
运行结果:
[0, 1, 4, 9, 16, 25]
Pool还有以下常用的方法:
- apply_async(func, args)从进程池中取出一个进程执行func,args为func的参数。它将返回一个AsyncResult的对象,我们可以调用get()方法以获得结果。
- close() 关闭进程池,不再创建新的进程
- join() wait()进程池中的全部进程。但是必须对Pool先调用close()方法才能join.
1 from multiprocessing import Process, Pool,freeze_support 2 import time 3 import os 4 5 def Foo(i): 6 time.sleep(2) 7 print("in process",os.getpid()) 8 return i + 100 9 10 def Bar(arg): 11 print('-->exec done:', arg,os.getpid()) 12 13 if __name__ == '__main__': 14 #freeze_support() 15 pool = Pool(processes=3) 16 print("主进程",os.getpid()) 17 for i in range(10): 18 pool.apply_async(func=Foo, args=(i,), callback=Bar) #callback=回调 19 20 print('end') 21 pool.close() 22 pool.join()
运行结果:
1 主进程 5660 2 end 3 in process 7048 4 -->exec done: 100 5660 5 in process 3396 6 -->exec done: 101 5660 7 in process 6728 8 -->exec done: 102 5660 9 in process 7048 10 -->exec done: 103 5660 11 in process 3396 12 -->exec done: 104 5660 13 in process 6728 14 -->exec done: 105 5660 15 in process 7048 16 -->exec done: 106 5660 17 in process 3396 18 -->exec done: 107 5660 19 in process 6728 20 -->exec done: 108 5660 21 in process 7048 22 -->exec done: 109 5660
除了主进程,其它结果是三个一组执行的,因为进程池中每次最多有三个进程。
三、进程通信
进程间通信常用两种方法:Queue和pipe,Queue可以用在多个进程间实现通信,pipe用在两个进程间通信。
1 import os 2 import multiprocessing 3 import time 4 #================== 5 # input worker 6 def inputQ(queue): 7 info = str(os.getpid()) + '(put):' + str(time.time()) 8 queue.put(info) 9 10 # output worker 11 def outputQ(queue,lock): 12 info = queue.get() 13 lock.acquire() 14 print (str(os.getpid()) + '(get):' + info) 15 lock.release() 16 17 if __name__ == '__main__': 18 record1 = [] # store input processes 19 record2 = [] # store output processes 20 lock = multiprocessing.Lock() # To prevent messy print 21 queue = multiprocessing.Queue(3) 22 23 24 for i in range(10): 25 process = multiprocessing.Process(target=inputQ, args=(queue,)) 26 process.start() 27 record1.append(process) 28 29 30 for i in range(10): 31 process = multiprocessing.Process(target=outputQ, args=(queue, lock)) 32 process.start() 33 record2.append(process) 34 35 for p in record1: 36 p.join() 37 38 queue.close() 39 40 for p in record2: 41 p.join()
运行结果:
1 4004(get):4556(put):1476337412.4875286 2 512(get):5088(put):1476337412.6345284 3 8828(get):7828(put):1476337412.7965286 4 8372(get):1032(put):1476337412.8185284 5 7740(get):1496(put):1476337412.9205284 6 4176(get):632(put):1476337412.9855285 7 5828(get):8508(put):1476337412.9595284 8 4236(get):9204(put):1476337412.9925284 9 7632(get):8956(put):1476337413.2055285 10 6376(get):4160(put):1476337413.0705285
Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
1 from multiprocessing import Process, Pipe 2 3 4 def f(conn): 5 conn.send([42, None, 'hello from child']) 6 conn.send([42, None, 'hello from child2']) 7 print("from parent:",conn.recv()) 8 conn.close() 9 10 if __name__ == '__main__': 11 parent_conn, child_conn = Pipe() 12 p = Process(target=f, args=(child_conn,)) 13 p.start() 14 print(parent_conn.recv()) # prints "[42, None, 'hello']" 15 print(parent_conn.recv()) # prints "[42, None, 'hello']" 16 parent_conn.send("chupi可好") # prints "[42, None, 'hello']" 17 p.join()
运行结果:
1 [42, None, 'hello from child'] 2 [42, None, 'hello from child2'] 3 from parent: chupi可好