通过继承的方式,实现Process多进程
1 from multiprocessing import Process 2 import time 3 4 class MyNewProcess(Process): 5 def run(self): 6 for i in range(10): 7 print("----run----") 8 time.sleep(1) 9 10 11 if __name__ == "__main__": 12 p = MyNewProcess() 13 14 p.start() # Process 中的 start 方法会主动调用 run 方法 15 for i in range(10): print("---main---") time.sleep(1)
Pool 进程池实现多进程
1 import time 2 from multiprocessing import Pool 3 4 def worker(): 5 for i in range(10): 6 print("From worker %s"%i) 7 time.sleep(0.5) 8 9 def foo(): 10 for i in range(10): 11 print("From foo %s"%i) 12 time.sleep(0.5) 13 14 def bar(): 15 for i in range(10): 16 print("From bar %s"%i) 17 time.sleep(0.5) 18 19 if __name__ == "__main__": 20 pool = Pool(3) # 创建三个 进程 21 pool.apply_async(worker) 22 pool.apply_async(foo) 23 pool.apply_async(bar) 24 25 pool.close() # 关闭进程池,禁止添加任务 26 pool.join() # 等待子进程结束后,主进程才往下走 27 print("Is done...") 28 29 30 31 # process and Pool 最后都是调用 fork 32 # 通常情况下,主进程一半用来等低啊,,,,真正的任务子进程中执行
Queue队列的简单使用
1 from multiprocessing import Queue 2 3 q = Queue(3) # 初始化一个Queue对象,最多可以put三条信息,如果不写3,那么久无限制 4 5 q.put("Message01") # 添加信息的方法 6 q.put("Message02") 7 print(q.full()) # 查看 队列 是否满了的方法 8 9 q.put("Message03") 10 print(q.full()) 11 12 # 因为队列已经满了,所以下面的消息会出现异常,第一个 try 会等待2秒后再抛出异常, 13 # 第二个 try 会立刻抛出异常 14 try: 15 q.put("Message04", True, 2) 16 except: 17 print("消息队列已满,现有消息数量:%s"%q.qsize()) 18 19 try: 20 q.put_nowait("Message04") 21 except: 22 print("消息队列已满,现有消息数量:%s"%q.qsize()) 23 24 # 推荐使用的方式,先判断队列是否已满,再写入 25 if not q.full(): 26 q.put_nowait("Message04") 27 28 # 读取消息的时候,先判断消息队列是否为空,再读取 29 if not q.empty(): 30 for i in range(q.qsize()): 31 print(q.get_nowait())
Process配合Queue实现进程间通信
1 from multiprocessing import Process, Queue 2 import time, random 3 4 # 写数据进程执行的代码 5 def write(q): 6 for value in ['a', 'b', 'c']: 7 print("Put %s to queue..."%value) 8 q.put(value) 9 time.sleep(random.random()) 10 11 # 读数据进程的代码 12 def read(q): 13 while True: 14 if not q.empty(): 15 value = q.get(True) 16 print("Get %s from queue..."%value) 17 time.sleep(random.random()) 18 else: 19 break 20 21 if __name__ == "__main__": 22 # 父进程创建 Queue, 并传给各个子进程 23 q = Queue() 24 pw = Process(target=write, args=(q, )) 25 pr = Process(target=read, args=(q, )) 26 27 # 启动写入子进程,并等待结束 28 pw.start() 29 pw.join() 30 31 # 启动读取子进程,并等待结束 32 pr.start() 33 pr.join()
进程池与队列合作实现进程间通信
1 # 修改 import 中的 Queue 为 Manager 2 from multiprocessing import Manager, Pool 3 import os 4 5 6 def reader(q): 7 print("reader启动(%s),父进程为(%s)"%(os.getpid(), os.getppid())) 8 for i in range(q.qsize()): 9 print("reader从Queue获取到消息:%s"%q.get(True)) 10 11 def writer(q): 12 print("writer启动(%s),父进程为(%s)"%(os.getpid(), os.getppid())) 13 for i in "Always": 14 q.put(i) 15 16 17 if __name__ == "__main__": 18 print("(%s) start"%os.getpid()) 19 q = Manager().Queue() # 使用Manager中的Queue来初始化 20 po = Pool() 21 # 使用阻塞模式创建进程,这样就不需要咋reader中使用死循环了,可以让writer完全执行后,再reader 22 po.apply(writer, (q,)) 23 po.apply(reader, (q,)) 24 25 po.close() 26 po.join() 27 print("(%s) End" % os.getpid())
利用Pool进程池实现简单的文件复制
1 import os 2 import time 3 from multiprocessing import Pool 4 5 def copyFile(oldPath, newPath, fileName): 6 print("%s 准备复制中。。。"%fileName) 7 with open("%s\%s"%(oldPath, fileName), 'r') as fr, open("%s\%s"%(newPath, fileName), 'w') as fw: 8 for line in fr: 9 fw.write(line) 10 time.sleep(1) 11 12 13 if __name__ == "__main__": 14 oldPath = r"file" 15 newPath = r"file-副本" 16 17 os.mkdir(newPath) 18 19 pool = Pool(5) 20 21 fileList = os.listdir(oldPath) 22 23 for fileName in fileList: 24 pool.apply_async(copyFile, (oldPath, newPath, fileName)) 25 26 pool.close() 27 28 pool.join() 29 30 print("文件复制完成....")