1、进程之间如何通信?如何共享变量?
2、为什么这段代码执行结果不是多进程?
from multiprocessing import Pool
from time import sleep
def f(x):
return x*x
def speak(word):
for i in range(10):
print('i am speak:',word,i)
#sleep(1)
if __name__=='__main__':
x=[1,2,3]
speak(x)
with Pool(1) as p:
print(p.map(f,x))
i am speak: [1, 2, 3] 0
i am speak: [1, 2, 3] 1
i am speak: [1, 2, 3] 2
i am speak: [1, 2, 3] 3
i am speak: [1, 2, 3] 4
i am speak: [1, 2, 3] 5
i am speak: [1, 2, 3] 6
i am speak: [1, 2, 3] 7
i am speak: [1, 2, 3] 8
i am speak: [1, 2, 3] 9
[1, 4, 9]
为什么不是呢?其实这段代码是多进程。主进程按照顺序来执行,到子进程这里时,主进程后面没有要执行的内容(可以在子进程后面加内容),如果在子进程后面添加任务,则可以通过执行时间来看到开了子进程。并且这段程序值开了一个子进程,每一个轮流来执行。
问题,1、如何不用进程池,来新建三个子进程?2、是否可以在不用进程池的情况下,同时新建三个子进程,同时来执行?
问题1代码:
#1、不用进程池,开启3个子进程,现在不知道如何获得子进程结果,暂时先打印结果 from multiprocessing import Pool from multiprocessing import Process import time def f(x): print(x*x) return x*x def speak(word): for i in range(3): print('i am speak:',word,i) time.sleep(2) if __name__=='__main__': startTime=time.time() x=[1,2,3] speak(x) # with Pool(3) as p: # print(p.map(f,x)) for i in x: p=Process(target=f,args=(i,)) p.start() endTime=time.time() print('useTime',endTime-startTime) #### i am speak: [1, 2, 3] 0 i am speak: [1, 2, 3] 1 i am speak: [1, 2, 3] 2 useTime 6.183837652206421 1 4 9
可见问题1比较简单,依次建三个子进程,执行时间也比较正常,与进程池Pool(1)用时接近;注意,不知道return的值到哪里去了。。。。,由于speak花费大量时间,不好看明白子进程执行时间,如果把speak()删掉(2.1)和放在后面(2.2),看看是什么情况。。。
2.1把speak()删掉,看看结果
from multiprocessing import Pool,Process import time def f(x): time.sleep(2) print(x*x) return x*x def speak(word): for i in range(3): print('i am speak:',word,i) time.sleep(2) if __name__=='__main__': startTime=time.time() x=[1,2,3] for i in x: p=Process(target=f,args=(i,)) p.start() endTime=time.time() print('useTime',endTime-startTime) ### useTime 0.09105944633483887 1 4 9
2.1.1这个问题引申出一个问题,如何看整个程序执行的时间??想到了,子进程时间长,那就在子进程那里再放个时间记录!
from multiprocessing import Pool,Process import time def f(x): time.sleep(2) print(x*x) return x*x def speak(word): for i in range(3): print('i am speak:',word,i) time.sleep(2) if __name__=='__main__': startTime=time.time() x=[1,2,3] for i in x: p=Process(target=f,args=(i,)) p.start() endTimeSub=time.time() print('usetimesub',endTimeSub-startTime) endTimeMain=time.time() print('useTime',endTimeMain-startTime) #### usetimesub 0.031021595001220703 usetimesub 0.06004023551940918 usetimesub 0.10607600212097168 useTime 0.1070716381072998 1 4 9
2.1.2这还真是越来越有意思了,居然出现这个结果,我想想看。。。。为啥会有意思呢?1、我觉得会先print 1 再出现usetimesub,可见调用函数慢一些!2、分析子进程什么时候开启,什么时候结束:有三种情况:1、子进程是一个开启并结束,再开启下一个;2、子进程同时全部开启,按照每个子进程执行完毕后自己结束;3、子进程按照先后顺序开启(主进程继续往后走,开启后就继续往后走,如果下面有新的子进程,新开启下一个),新开启的子进程对主进程几乎没什么影响。结论:从print结果来看,应该是情况3。如果在子进程后面的主进程中停顿2.2S,看看会不会子进程先执行完?
为什么不是1?如果是情况1,不管怎么修改,返回结果应该是类似如下结果:
usetimesub 1.0864052772521973
1
usetimesub 2.167843818664551
4
usetimesub 3.640583038330078
9
而不会出现这种结果。
usetimesub 0.03602457046508789
usetimesub 0.06504368782043457
usetimesub 0.1070716381072998
1
4
9
from multiprocessing import Pool,Process import time def f(x): time.sleep(2) print(x*x) return x*x def speak(word): for i in range(3): print('i am speak:',word,i) time.sleep(2) if __name__=='__main__': startTime=time.time() x=[1,2,3] for i in x: p=Process(target=f,args=(i,)) p.start() endTimeSub=time.time() print('usetimesub',endTimeSub-startTime) endTimeMain=time.time() time.sleep(2.2) print('useTime',endTimeMain-startTime) #### usetimesub 0.03602457046508789 usetimesub 0.06504368782043457 usetimesub 0.1070716381072998 1 4 9 useTime 0.1070716381072998
结果符合预期,主进程和子进程分开比较明显。
2.1.3如果在开启下一个子进程之前让主程序停顿1S,看看结果会是怎么样??
from multiprocessing import Pool,Process import time def f(x): time.sleep(2) print(x*x) return x*x def speak(word): for i in range(3): print('i am speak:',word,i) time.sleep(2) if __name__=='__main__': startTime=time.time() x=[1,2,3] for i in x: p=Process(target=f,args=(i,)) p.start() time.sleep(1) endTimeSub=time.time() print('usetimesub',endTimeSub-startTime) endTimeMain=time.time() time.sleep(2.2) print('useTime',endTimeMain-startTime) #### usetimesub 1.0864052772521973 1 usetimesub 2.167843818664551 4 usetimesub 3.640583038330078 9 useTime 3.640583038330078
结果比较符合预期!
问题2:是否可以在不用进程池的情况下,同时新建三个子进程,同时来执行?
从上面2.1新建三个子进程,与通过Pool建的三个子进程执行时间进行比较,看看两者执行时间差别怎么样。
Pool应该是同时新建三个子进程,2.1应该是分别新建三个子进程,按道理来说Pool应该比分别新建快一丢丢。
###Pool多进程
from multiprocessing import Pool import time def f(x): time.sleep(2) return x*x def speak(word): for i in range(3): print('i am speak:',word,i) time.sleep(2) if __name__=='__main__': startTime=time.time() x=[1,2,3] #speak(x) with Pool(3) as p: print(p.map(f,x)) endTimeSub=time.time() print('usetimesub',endTimeSub-startTime) time.sleep(2.2) endTimeMain=time.time() print('useTime',endTimeMain-startTime) #### [1, 4, 9] usetimesub 2.213435173034668 useTime 4.4474756717681885
##不用Pool,按顺序新开子进程 from multiprocessing import Pool,Process import time def f(x): time.sleep(2) print(x*x) return x*x def speak(word): for i in range(3): print('i am speak:',word,i) time.sleep(2) if __name__=='__main__': startTime=time.time() x=[1,2,3] for i in x: p=Process(target=f,args=(i,)) p.start() #time.sleep(1) endTimeSub=time.time() print('usetimesub',endTimeSub-startTime) time.sleep(2.2) endTimeMain=time.time() print('useTime',endTimeMain-startTime) #### usetimesub 0.06504273414611816 usetimesub 0.11357498168945312 usetimesub 0.1641085147857666 1 4 9 useTime 2.3644044399261475
结果出乎我的意料,Pool比不用Pool慢了快一倍,不知道这其中原因在哪里?难道在Pool.map调用花费时间?从时间结果来看,好像当调用Pool中的子进程,主进程停止了似的??试试不print,只调用看看
if __name__=='__main__': startTime=time.time() x=[1,2,3] #speak(x) with Pool(3) as p: #print(p.map(f,x)) p.map(f,x) endTimeSub=time.time() print('usetimesub',endTimeSub-startTime) time.sleep(2.2) endTimeMain=time.time() print('useTime',endTimeMain-startTime) #### usetimesub 2.2102527618408203 useTime 4.451737642288208
这个没啥影响,后面再看看Pool部分,看看能不能再改改。Pool的出现虽然没有提高执行效率,可能有利于子线程的管理,不然一堆子线程,也不知道执行怎么样,通过Pool来控制,一次只能生成几个子线程,避免程序崩溃。
2.3下面是一个关于进程池的问题。
from multiprocessing import Pool
import time
def f(x):
time.sleep(2)
return x*x
def speak(word):
for i in range(3):
print('i am speak:',word,i)
time.sleep(2)
if __name__=='__main__':
startTime=time.time()
x=[1,2,3]
#speak(x)
with Pool(1) as p:
print(p.map(f,x))
endTime=time.time()
print('useTime',endTime-startTime)
####
[1, 4, 9]
useTime 6.195619344711304
如果用Pool(3),看看会是多长时间。多进程、多线程一个重要关注点是执行时间。可见时间减少了接近2/3,开启了3个子进程。
#前面代码相同
if __name__=='__main__':
startTime=time.time()
x=[1,2,3]
#speak(x)
with Pool(3) as p:
print(p.map(f,x))
endTime=time.time()
print('useTime',endTime-startTime)
###
[1, 4, 9]
useTime 2.2474865913391113
测试进程池中同时开启2个子进程,看看结果是不是4S多左右。
if __name__=='__main__': startTime=time.time() x=[1,2,3] #speak(x) with Pool(2) as p: print(p.map(f,x)) endTime=time.time() print('useTime',endTime-startTime) #### [1, 4, 9] useTime 4.238807439804077
结果果然是4S多!!
2.4下面是直观感受主进程和子进程。
from multiprocessing import Process import os from time import sleep def info(title): print(title) print('module name:',__name__) print('parent process:',os.getppid()) print('process id:',os.getpid()) def f(name): info('function f') for i in range(10): print(i) sleep(1) #每次停顿1s,总共停顿10s print('hello',name) if __name__=='__main__': info('main line') p=Process(target=f,args=('bob',)) p.start() #p.join() info('main line') sleep(5) #停顿5s print('ok')
#执行结果 main line module name: __main__ parent process: 9176 process id: 11432 main line module name: __main__ parent process: 9176 process id: 11432 function f module name: __mp_main__ parent process: 11432 process id: 13180 0 1 2 3 4 ok 5 6 7 8 9 hello bob
3、一般情况下,python都是从上往下执行,如果主进程一直在执行,则不会到子进程中去。如果主进程负责分配,子进程负责执行,开了四个子进程,则分别执行,会涉及到变量的共享,信息的传递。
4、如果开了4个子进程,但是没有进程池或者相关功能的话,仅仅只用start和join的话,还是只会按照先后顺序来执行。把join注释掉,就好了。
join()方法:the method blocks until the process whose join() method is called terminates,即主进程要等调用jon的子进程执行完了,主进程才再执行。
4.1体会子进程的一些方法:
import multiprocessing,time,signal p=multiprocessing.Process(target=time.sleep,args=(1000,)) if __name__=='__main__': print(p,p.is_alive()) #<Process(Process-1, initial)> False p.start() print(p,p.is_alive()) #<Process(Process-1, started)> True p.terminate() #不太明白为何还是alive print(p,p.is_alive()) #<Process(Process-1, started)> True time.sleep(0.1) print(p,p.is_alive()) #<Process(Process-1, stopped[SIGTERM])> False print(p.exitcode==-signal.SIGTERM) #True
5、为什么要队列queue???队列和进程之间是什么关系?队列和线程之间是什么关系?队列是为了实现线程的安全吗?那进程池呢?
Queue是python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者线程之间的信息传递,FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。
问题:1、一个队列是一个进程吗?如果要保证先进先出,那就只要按照顺序在进程中一个一个来执行了。2、指明了队列中能存放数据个数上限??队列中是存放数据的,而不是子进程个数?
from multiprocessing import Queue q=Queue() for i in range(5): q.put(i) while not q.empty(): print(q.get()) 0 1 2 3 4
6、进程之间交换对象,可以通过Queue和Pipe
When using multiple processes, one generally uses message passing for communication between processes and avoids having to use any synchronization primitives like locks. 当使用多进程时候,人们通常使用信息传递进行进程间通信,避免使用任何同步源语prmitives(如锁)。
For passing messages one can use Pipe() (for a connection between two processes) or a queue (which allows multiple producers and consumers).
通信可以使用管道Pipe()(在两个进程之间连接)、或者队列Queue(容许多个生产者和客户)
multiprocessing.Pipe([duplex])
Returns a pair (conn1, conn2) of Connection objects representing the ends of a pipe. 返回一对(conn1,conn2)表示管端连接对象。
If duplex is True (the default) then the pipe is bidirectional. If duplex is False then the pipe is unidirectional: conn1 can only be used for receiving messages and conn2 can only be used for sending messages.
如果双工为真(默认的),则管道是双向的。如果是假的,那么双管是单向的:conn1只能用于接收消息和conn2只能用于发送消息。
class multiprocessing.Queue([maxsize])
Returns a process shared queue implemented using a pipe and a few locks/semaphores. When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.
返回一个进程共享队列的实现使用的管道和一些锁/信号灯。当一个进程首先将一个项放在队列上时,一个馈送线程就开始了,它将对象从缓冲区转移到管道中。
put(obj[, block[, timeout]])
Put obj into the queue. If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Full exception if no free slot was available within that time. Otherwise (block is False), put an item on the queue if a free slot is immediately available, else raise the queue.Full exception (timeout is ignored in that case)
将对象放入队列。如果可选参数块为真(默认值)和超时值为空(默认值),则在必要时进行阻塞,直到空闲槽可用为止。如果超时是正数,它最多在超时时间内等待,如果在该时间内没有空闲槽可用,引发队列满了的异常queue.Full exception。否则(即有空槽了,不用阻塞了,放到队列中)。
正常话就是:所有默认情况下,把对象放入队列,如果队列中没有位置,等待队列中有位置,一有位置就把对象放进去。中间有个等待时间参数,时间内对象可以等待,超时就报出队列满了的异常。
get([block[, timeout]])
Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).
从队列中删除并返回一个项。如果可选的参数块是真实的(默认)和超时没有(默认),如果有必要,直到项目块是可用的。如果超时是一个正数,它最在多超时时间内阻塞,超过该时间,引发队列空异常queue.Empty exception。
正常话:默认情况下,等待直到有一个项,从队列中删除并返回这个项。如果有时间参数,在时间参数内干这个事,没干成,返回空队列异常。
from multiprocessing import Pipe a,b=Pipe() a.send([1,'hello',None]) print(b.recv()) #[1, 'hello', None] b.send_bytes(b'thank you') print(a.recv_bytes()) #b'thank you'
7、在并行编程时,通常最好避免使用共享状态。这特别是在使用多进程时。
8、Pool出现的原因是什么?有必要吗?
我认为有必要,如果启用一个子进程,上面的就可以实现了,如果有多个子进程,好像不好办??可以通过Pool进程池来管理。
话说,主进程和子进程为啥好像实现不了呢???结果都是按照顺序来执行的。。。
新问题:Pool和Queue有啥区别呢?能不能只要一个??
17.2.2.9. Process Pools
One can create a pool of processes which will carry out tasks submitted to it with the Pool class.
可以创建一个进程池,这些进程将执行池类向它提交的任务。
class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.
一个进程池对象,它控制可以提交作业的工作流程池。它支持超时和异步结果的回调,有平行图的实现。
参数:
processes is the number of worker processes to use. If processes is None then the number returned by os.cpu_count() is used.
进程是要使用的进程的数量。如果进程是空,则通过操作系统返回的数量。cpu_count()使用
If initializer is not None then each worker process will call initializer(*initargs) when it starts.
如果初始化不是None,那么当它开始时每个工作进程将调用初始化(* initargs)。
maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool.
maxtasksperchild是任务的可完成工作进程数,在它将退出,被一个新的工作进程所取代之前。使闲置的资源被释放。默认maxtasksperchild是空,这意味着工作进程将和池一起存在。
context can be used to specify the context used for starting the worker processes. Usually a pool is created using the function multiprocessing.Pool() or the Pool() method of a context object. In both cases context is set appropriately.
上下文可用于指定启动工作进程的上下文。通常一个池创建是使用multiprocessing.Pool()函数或者context对象的pool()方法。在这两种情况下,context都是适当设置的。
Note that the methods of the pool object should only be called by the process which created the pool.
注意,池对象的方法只能由创建池的进程调用。
8.1 http://www.jb51.net/article/91199.htm 按照这上面说的,Pool可以创建多个子进程的进程池,但是子进程中要通信,就要采用队列Queue。这应该是两者区别,感觉这里好啰嗦,本来很简单的事,搞得这么复杂!
9、子进程结束后,怎么办呢????1、是像函数那样返回值???2、子进程必须要与主进程或者其他子进程通信,为什么呢?如果不通信,那他存在的意义是什么?让你单独干一件事,不通信,不知道干了没有?不知道干完没有?(这个好像可以知道),不知道干的结果怎么样,所以,至少要反馈一下。还有,如果是循环执行的,那每一段时间都要启动运行一下。