导航:
1、创建进程的两种方式
2、Process的方法
3、进程间的通讯1,进程队列Queue--先进先出
4、进程间的通讯2,管道通讯 Pipe
5、进程间的数据共享,Manager
6、多进程同步问题
7、进程池Pool
python中多进程可以解决cpython解释器多线程中GIL存在的问题,可以利用CPU的多核资源,实现真的并发效果。操作系统中每个线程有自己的内存空间,数据并不共享。
python中使用multiprocessing包提供的接口给我们创建多进程,multiprocessing与threading的使用方法相似。
1、创建进程的两种方式
1)通过multiprocessing.Process创建
Process(group=None, target=None, name=None, args=(), kwargs={})
- group 线程组,没什么用,默认为空就好
- target 要执行的方法
- name 进程的名字
- args/kwargs 执行target方法要传入的参数
1 def fn(word): 2 # ------子进程的逻辑----- 3 print("父进程", os.getppid()) 4 print("子进程", os.getpid()) 5 print("传入的参数:", word) 6 7 8 if __name__ == "__main__": 9 p = Process(target=fn, args=("haha",)) # 创建子进程 10 p.start() # 开启子进程 11 print("主进程", os.getpid()) 12 13 14 # 输出结果 15 主进程 5916 16 父进程 5916 17 子进程 2936 18 传入的参数: haha
很简单这样就可以创建一个子进程了,可以看第3 4 11行打印的进程id可以知道这是不同的进程。
这里需要说明的是:在所有的进程当中子进程都是由父进程创建出来的,在这个例子中,子进程的的父进程就是主进程,可以看到第3 11行打印的进程id。
在windows下,创建进程一定要放在__main__中,不然会报错
2)通过继承Process类创建子进程
1 from multiprocessing import Process 2 import os 3 4 class MyProcess(Process): 5 def __init__(self, name): 6 super().__init__() # 如果重写了初始化方法,在初始化方法中一定要调用父类的初始化方法 7 self.name = name 8 9 def run(self): 10 # ------子进程的逻辑----- 11 print("父进程", os.getppid()) 12 print("子进程", os.getpid()) 13 print("子进程的名字", self.name) 14 15 16 if __name__ == "__main__": 17 p = MyProcess("haha") # 创建子进程 18 p.start() # 开启子进程 19 print("主进程", os.getpid()) 20 21 22 # 输出结果 23 主进程 740 24 父进程 740 25 子进程 7052 26 子进程的名字 haha
通过继承Process创建子进程需要重写 run 方法,这是子进程逻辑的入口,当开启子进程时会自动调用这个方法
上边这两种方式中,主进程执行完所有的逻辑后会等待子进程结束在一起结束,与fork函数创建的方式不一样
2、Process的方法
1)is_alive()
is_alive方法判断指定对象进程的存活状态。进程 start 后一直到该进程结束都返回True。进程 start 前 或进程已经结束返回False
1 from multiprocessing import Process 2 import time 3 4 def fn(): 5 # ------子进程的逻辑----- 6 time.sleep(1) # 模拟子进程执行需要消耗的时间 7 8 9 if __name__ == "__main__": 10 p = Process(target=fn) # 创建子进程 11 print("p.start前-->", p.is_alive()) 12 p.start() # 开启子进程 13 print("p.start后-->", p.is_alive()) 14 time.sleep(2) # 模拟主进程执行需要消耗的时间,为了确保子进程先结束 15 print("子进程结束后-->", p.is_alive()) 16 17 18 # 输出结果 19 p.start前--> False 20 p.start后--> True 21 子进程结束后--> False
2)join()
join(timeout=None)方法,堵塞当前环境的进程,直到调此方法的进程结束后再继续往下执行。可设置timeout值,最多堵塞timeout时间(秒)。注意:join方法只能在start()后才可以使用
1 from multiprocessing import Process 2 import time 3 4 def fn(): 5 # ------子进程的逻辑----- 6 time.sleep(3) # 模拟子进程执行需要消耗的时间 7 print("子进程中-->", time.ctime()) 8 9 10 if __name__ == "__main__": 11 p = Process(target=fn) # 创建子进程 12 p.start() # 开启子进程 13 print("p.join前-->", time.ctime()) 14 p.join() # 堵塞当前环境的进程,直到调此方法的进程结束后再继续往下执行 15 print("p.join后-->", time.ctime()) 16 17 18 # 输出内容 19 p.join前--> Sat Sep 29 16:03:40 2018 20 子进程中--> Sat Sep 29 16:03:43 2018 21 p.join后--> Sat Sep 29 16:03:43 2018
这里可以看到第15行语句一直等到第7行执行完才输出
1 from multiprocessing import Process 2 import time 3 4 def fn(): 5 # ------子进程的逻辑----- 6 time.sleep(2) # 模拟子进程执行需要消耗的时间 7 print("子进程中-->", time.ctime()) 8 9 10 if __name__ == "__main__": 11 p = Process(target=fn) # 创建子进程 12 p.start() # 开启子进程 13 print("p.join前-->", time.ctime()) 14 p.join(1) # 设置超时1秒 15 print("p.join后-->", time.ctime()) 16 17 18 # 输出内容 19 p.join前--> Sat Sep 29 16:04:02 2018 20 p.join后--> Sat Sep 29 16:04:03 2018 21 子进程中--> Sat Sep 29 16:04:04 2018
这里可以看到第15行语句只堵塞了1秒的时间
3)start(),进程准备就绪,等待cpu的执行(调度)
4)run(),继承Process类的子类,需要重写的方法,当进程对象调用 start 方法时自动执行 run 方法,也是进程的入口
5)terminate(),不管进程是否执行完,直接终止进程
6)daemon属性True/False
与线程的setDeamon()一样。将该进程对象设置为守护进程,效果:父进程将不再等待子进程,父进程结束时,子进程一起结束。注意:daemon属性只能在 start() 前设置
1 from multiprocessing import Process 2 import time 3 4 def fn(): 5 # ------子进程的逻辑----- 6 time.sleep(2) # 模拟子进程执行需要消耗的时间 7 print("----------") 8 9 10 if __name__ == "__main__": 11 p = Process(target=fn) # 创建子进程 12 p.daemon = True 13 p.start() # 开启子进程 14 print("++++++++++") 15 16 17 # 输出内容 18 ++++++++++
子进程中的第7行语句并没有执行,即:子进程在父进程结束时也跟着结束了
3、进程间的通讯1,进程队列Queue--先进先出
Queue(maxsize=-1),maxsize=-1默认队列长度没有最大值,maxsize=5表示队列长度最大值为5
1)put(obj, block=True, timeout=None)
- obj 添加进队列的值,可以添加任意类型的值
- block 默认为True,当队列满时,继续添加则发生堵塞,直到队列get()值出去;block=False,队列满时继续添加不堵塞,但会抛出queue.Full异常
- timeout 堵塞超时,当队列满时,继续添加发生堵塞,堵塞超时timeout秒,超时则会抛出queue.Full异常
1 >>> from multiprocessing import Process, Queue 2 >>> q = Queue(3) # 创建进程队列,队列最大长度为3 3 >>> q.put("haha") # 往队列添加字符串 4 >>> q.put([]) # 往队列添加列表 5 >>> q.put({}) # 往队列添加字典 6 >>> q.put(5) # 往队列添加数字 7 _ # 满时继续添加,发生堵塞(光标会一直卡在这) 8 9 >>> q.put(5,block=False) # 往队列添加数字,不堵塞添加 10 Traceback (most recent call last): 11 File "<stdin>", line 1, in <module> 12 File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 83, in put 13 raise Full 14 queue.Full 15 16 >>> q.put(5, timeout=3) # 往队列添加数字,并设置超时3秒 17 Traceback (most recent call last): 18 File "<stdin>", line 1, in <module> 19 File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 83, in put 20 raise Full 21 queue.Full
2)get(block=True, timeout=None) 使用方法与put一样的用法,返回先添加的值
1 >>> q.get() 2 'haha' 3 >>> q.get() 4 [] 5 >>> q.get() 6 {} 7 >>> q.get() 8 _ # 为空时继续取值,发生堵塞(光标会一直卡在这) 9 10 >>> q.get(timeout=3) 11 Traceback (most recent call last): 12 File "<stdin>", line 1, in <module> 13 File "C:D_ProgramPythonPython37libmultiprocessingqueues.py", line 105, in get 14 raise Empty 15 _queue.Empty
3)qsize() 返回当前队列的长度
1 >>> from multiprocessing import Queue 2 >>> q = Queue(3) 3 >>> q.put("haha") 4 >>> q.qsize() 5 1 6 >>> q.put(100) 7 >>> q.qsize() 8 2 9 >>> q.get() 10 'haha' 11 >>> q.qsize() 12 1
4)put_nowait(obj) 相当于put(obj, block=False)
5)get_nowait() 相当于get(block=False)
6)empty() 判断队列是否为空,True:空,False:不为空
7)full() 判断队列是否已满,True:已满,False:未满
8)close() 关闭队列,关闭后,将不能添加或取出值
9)通过队列实现多个进程间的通讯
1 from multiprocessing import Process, Queue 2 import time 3 4 def write_queue(q): 5 for i in range(5): 6 q.put(i) # 往队列添加值 7 print("put %d" % i) 8 9 10 def read_queue(q): 11 time.sleep(1) # 确保write_queue中队列已经有值 12 while not q.empty(): 13 s = q.get() # 取出队列的值 14 print("get %s" % s) 15 16 17 if __name__ == "__main__": 18 q = Queue(5) 19 p1 = Process(target=write_queue, args=(q,)) # 创建子进程 20 p2 = Process(target=read_queue, args=(q,)) # 创建子进程 21 p1.start() # 开启子进程 22 p2.start() # 开启子进程 23 24 25 # 输出结果 26 put 0 27 put 1 28 put 2 29 put 3 30 put 4 31 get 0 32 get 1 33 get 2 34 get 3 35 get 4
1 from multiprocessing import Process, Queue 2 import time 3 4 class WriteProcess(Process): 5 def __init__(self, q): 6 super().__init__() 7 self.q = q 8 9 def run(self): 10 print("子进程WriteProcess-->") 11 for i in range(5): 12 self.q.put(i) 13 print("put %d" % i) 14 15 16 class ReadProcess(Process): 17 def __init__(self, q): 18 super().__init__() 19 self.q = q 20 21 def run(self): 22 time.sleep(1) # 确保write_queue中队列已经有值 23 print("子进程ReadProcess-->") 24 while not self.q.empty(): 25 s = self.q.get() 26 print("get %s" % s) 27 28 29 if __name__ == "__main__": 30 q = Queue(5) 31 p1 = WriteProcess(q) # 创建子进程 32 p2 = ReadProcess(q) # 创建子进程 33 p1.start() # 开启子进程 34 p2.start() # 开启子进程 35 36 37 # 输出结果 38 子进程WriteProcess--> 39 put 0 40 put 1 41 put 2 42 put 3 43 put 4 44 子进程ReadProcess--> 45 get 0 46 get 1 47 get 2 48 get 3 49 get 4
4、进程间的通讯2,管道通讯 Pipe
1)Pipe(duplex=True)
Pipe是一个函数,返回元组(Connection(), Connection()). 即返回管道的两端。默认duplex=True为全双工模式,duplex=Fasle中第一个Connection只能接收信息,第二个Connection只能发送消息
2)Connection常用方法
- send(obj) 将对象obj发送到管道另一端,发送的数据必须是可序列化的对象。
- recv() 从管道的另一端接收send()发送的数据。没有数据可接收,将发生堵塞。
- send_bytes(buffer, offset=-1, size=-1) 发送字节缓冲区,buffer是支持支持字节缓冲的任意对象,offset为buffer的字节偏移量(可以当初下标),size为要发送的字节数。
- recv_bytes(maxlength=-1) 接收send_bytes()发送的一次数据,maxlength指定接收长度,超出这个长度则抛出将引发IOError异常,没有数据可接收,将发生堵塞。
- poll([timeout]) 返回 True/False,判断管道内是否有数据可以接收,True:数据可接收。timeout为堵塞的时间秒,timeout=None时一直堵塞,直到有数据可以接收
- close() 关闭链接,关闭链接后将不能继续使用管道,当不再使用管道时可将其关闭
1 from multiprocessing import Process, Pipe 2 import time 3 4 class MyProcess1(Process): 5 def __init__(self, con): 6 super().__init__() 7 self.con = con 8 9 def run(self): 10 self.con.send("12345") 11 print("MyProcess1--send-->", "12345") 12 msg = self.con.recv_bytes().decode(encoding="utf-8") 13 print("MyProcess1--recv_bytes-->", msg) 14 15 16 class MyProcess2(Process): 17 def __init__(self, con): 18 super().__init__() 19 self.con = con 20 21 def run(self): 22 msg = self.con.recv() 23 print("MyProcess2--recv-->", msg) 24 self.con.send_bytes("哈哈".encode("UTF-8")) 25 print("MyProcess2--send_bytes-->", "哈哈") 26 27 28 if __name__ == "__main__": 29 con1, con2 = Pipe() 30 p1 = MyProcess1(con1) # 创建子进程 31 p2 = MyProcess2(con2) # 创建子进程 32 p1.start() # 开启子进程 33 p2.start() # 开启子进程 34 35 36 # 输出结果 37 MyProcess1--send--> 12345 38 MyProcess2--recv--> 12345 39 MyProcess2--send_bytes--> 哈哈 40 MyProcess1--recv_bytes--> 哈哈
5、进程间的数据共享,Manager
Manager提供多进程间的数据共享,Manager内的主要方法有 dict(mapping_or_sequence), list(sequence), Value(typecode, value), Array(typecode, sequence)
1 from multiprocessing import Process, Manager 2 3 def fn(dic, li, a): 4 dic['count'] -= 1 5 print("子进程-->a", a.value) # a 为Value对象,通过a.value将值取出、修改 6 print("子进程-->li", li) 7 8 9 if __name__ == '__main__': 10 with Manager() as manager: 11 dic = manager.dict({'count': 1000}) # 字典 12 li = manager.list([1, 2, 3, 4]) # 列表 13 a = manager.Value("int", 100) # 传值 14 p = Process(target=fn, args=(dic, li, a)) 15 p.start() 16 p.join() 17 print("主进程-->dic", dic) # 打印子进程修改后的内容 18 19 20 # 输出结果 21 子进程-->a 100 22 子进程-->li [1, 2, 3, 4] 23 主进程-->dic {'count': 0}
6、多进程同步问题
当多个进程同时访问同一个资源时就得涉及到同步的问题,如:向控制台打印数据,必须要确保同一时刻只能有一个进程在打印数据,否则将会出现乱序的效果;多进程同时对一个文件进行读写等
1 from multiprocessing import Process, Lock 2 import time 3 4 def fn(lock): 5 # ------子进程的逻辑----- 6 # lock.acquire() # 未加锁 7 for i in range(3): 8 print("12", end="") 9 time.sleep(0.005) # 为了看到效果,添加的延迟 10 print("34") 11 # lock.release() 12 13 14 if __name__ == "__main__": 15 lock = Lock() 16 for i in range(3): 17 p = Process(target=fn, args=(lock,)) # 创建子进程 18 p.start() # 开启子进程 19 20 21 # 输出结果 22 121234 23 1234 24 1234 25 121234 26 1234 27 34 28 1234 29 34 30 1234
1 from multiprocessing import Process, Lock 2 import time 3 4 def fn(lock): 5 # ------子进程的逻辑----- 6 lock.acquire() # 加锁 7 for i in range(3): 8 print("12", end="") 9 time.sleep(0.005) # 为了看到效果,添加的延迟 10 print("34") 11 lock.release() # 释放锁 12 13 14 if __name__ == "__main__": 15 lock = Lock() 16 for i in range(3): 17 p = Process(target=fn, args=(lock,)) # 创建子进程 18 p.start() # 开启子进程 19 20 21 # 输出结果 22 1234 23 1234 24 1234 25 1234 26 1234 27 1234 28 1234 29 1234 30 1234
7、进程池Pool
当任务不多时可以可以自己手动创建进程,但任务量大且任务量多的时候手动创建进程的工作量就会比较大,而多次创建进程消耗较大的资源,这个时候进程池就是一种比较好的解决方法。
在程序一开始就创建好进程池指定进程的数量,当任务来临的时候就从进程池里拿出一个进程,任务完成后进程不会被销毁,而是继续回到进程池等等下一个任务。
当任务数量大于进程数时,将按照任务添加的先后顺序执行任务,未执行的任务将在任务列表中等待。进程池中的进程用法跟上边的一致。
Pool()创建进程池的时候可以指定进程池里的进程数量,不指定的将默认为cpu的核心数。
主要方法介绍
apply_async(func, args=(), kwds={}, callback=None, error_callback=None) func:任务执行的方法入口;callback:任务执行完毕后执行的回调函数,回调函数执行在父进程里边,即调用apply_async的进程;error:任务执行出错后执行的回调函数
apply(func, args=(), kwds={})
close() 关闭进程池,关闭后将不能再继续添加任务
join() 等所有进程结束后再退出
1 from multiprocessing import Process, Pool 2 import time 3 import os 4 5 def fn(i): 6 # ------子进程的逻辑----- 7 print("进程pid--> %5d,参数--> %2d" % (os.getpid(), i)) 8 time.sleep(0.01) 9 10 11 if __name__ == "__main__": 12 pool = Pool(4) 13 for i in range(12): 14 # pool.apply(fn, (i,)) # 同步方式 15 pool.apply_async(fn, (i,)) # 异步方式 16 pool.close() 17 pool.join() 18 19 20 # 输出结果 21 进程pid--> 1176,参数--> 0 22 进程pid--> 1556,参数--> 1 23 进程pid--> 8864,参数--> 2 24 进程pid--> 1176,参数--> 3 25 进程pid--> 1556,参数--> 4 26 进程pid--> 9552,参数--> 5 27 进程pid--> 8864,参数--> 6 28 进程pid--> 1556,参数--> 7 29 进程pid--> 1176,参数--> 8 30 进程pid--> 9552,参数--> 9 31 进程pid--> 8864,参数--> 10 32 进程pid--> 1176,参数--> 11
1 from multiprocessing import Process, Pool 2 import time 3 import os 4 5 def fn(i): 6 # ------子进程的逻辑----- 7 print("进程pid--> %5d,参数--> %2d" % (os.getpid(), i)) 8 time.sleep(1) 9 10 11 if __name__ == "__main__": 12 pool = Pool(4) 13 for i in range(12): 14 pool.apply(fn, (i,)) # 同步方式 15 # pool.apply_async(fn, (i,)) # 异步方式 16 pool.close() 17 pool.join() 18 19 20 # 输出结果 21 进程pid--> 15056,参数--> 0 22 进程pid--> 14380,参数--> 1 23 进程pid--> 2312,参数--> 2 24 进程pid--> 14360,参数--> 3 25 进程pid--> 15056,参数--> 4 26 进程pid--> 14380,参数--> 5 27 进程pid--> 2312,参数--> 6 28 进程pid--> 14360,参数--> 7
1 from multiprocessing import Process, Pool 2 import time 3 import os 4 5 def fn(i): 6 # ------子进程的逻辑----- 7 time.sleep(1) 8 print("子进程pid--> %5d,参数--> %2d" % (os.getpid(), i)) 9 if i == 2: 10 raise ValueError("出错") 11 return i 12 13 14 def fc(args): 15 print("callback--> pid:%5d, type(args)%s, args:%s" % (os.getpid(), type(args), args)) 16 17 18 def fce(args): 19 print("error_callback--> pid:%5d, type(args)%s, args:%s" % (os.getpid(), type(args), args)) 20 21 22 if __name__ == "__main__": 23 print("主进程pid:%d", os.getpid()) 24 pool = Pool(2) 25 for i in range(4): 26 # pool.apply(fn, (i,)) # 同步方式 27 pool.apply_async(fn, (i,), callback=fc, error_callback=fce) # 异步方式 28 pool.close() 29 pool.join() 30 31 32 # 输出结果进程 33 主进程pid:%d 7944 34 子进程pid--> 9368,参数--> 0 35 callback--> pid: 7944, type(args)<class 'int'>, args:0 36 子进程pid--> 13724,参数--> 1 37 callback--> pid: 7944, type(args)<class 'int'>, args:1 38 子进程pid--> 9368,参数--> 2 39 error_callback--> pid: 7944, type(args)<class 'ValueError'>, args:出错 40 子进程pid--> 13724,参数--> 3 41 callback--> pid: 7944, type(args)<class 'int'>, args:3
1 from multiprocessing import Process, Pool, Manager 2 import time 3 import os 4 5 def fn(i, li): 6 # ------子进程的逻辑----- 7 li[i] = i + 10 8 print("子进程中-->", li[i]) 9 10 11 if __name__ == "__main__": 12 with Manager() as manager: 13 pool = Pool(2) 14 li = manager.list([1, 2, 3, 4]) # 列表 15 for i in range(4): 16 pool.apply_async(fn, (i, li)) # 异步方式 17 pool.close() 18 pool.join() 19 print("主进程-->", li) 20 21 22 # 输出结果 23 子进程中--> 10 24 子进程中--> 12 25 子进程中--> 11 26 子进程中--> 13 27 主进程--> [10, 11, 12, 13]
在这里要说一下同步方式和异步方式的区别,同步方式即一个一个任务的执行,即使进程池有多个进程也会等上一个任务做完才执行下一个任务,达不到并发效果,一般不用 apply()
注意点:① close()必须在join()前边;② 主进程默认不会等待进程池中的进程,必须要在主进程中添加close,join;③ 进程池间的通讯与进程的用法一致