multiprocessing模块介绍
multiprocessing
是一个支持使用与 threading
模块类似的 API 来产生进程的包。 multiprocessing
包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing
模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix 和 Windows 上均可运行。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
一、Process类的介绍
在 multiprocessing
中,通过创建一个 Process
对象然后调用它的 start()
方法来生成进程
简单实例:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
创建进程的类:
Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程
强调:
1. 需要使用关键字的方式来指定参数
2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
参数介绍:
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,'mike',)
kwargs表示调用对象的字典,kwargs={'name':'mike','age':18}
name为子进程的名称
方法介绍:
p.start()
:# 启动进程,并调用该子进程中的p.run()
p.run()
:# 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate()
: # 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive()
:# 如果p仍然运行,返回True
p.join([timeout])
:# 主进程等待p终止(强调:是主进程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。
属性介绍
p.daemon
:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
p.name
:进程的名称
p.pid
:进程的pid
multiprocessing
支持进程之间的两种通信通道:队列和管道
1、队列(Quene)
class multiprocessing.
Queue
([maxsize])
返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中。
一旦超时,将抛出标准库 queue
模块中常见的异常 queue.Empty
和 queue.Full
。
方法:
qsize
()-----返回队列大致长度,返回数字不可靠
empty
()-----如果队列是空的,返回 True
,反之返回 False。状态不可靠
full()-----如果队列是满的,返回 True
,反之返回 False
。状态不可靠
put
(obj[, block[, timeout]])
将 obj 放入队列。如果可选参数 block 是 True
(默认值) 而且 timeout 是 None
(默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full
异常。反之 (block 是 False
时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full
异常 (在这种情形下 timeout 参数会被忽略)。
put_nowait
(obj)-----相当于put(obj, False)
。
get
([block[, timeout]])-
从队列中取出并返回对象。如果可选参数 block 是
True
(默认值) 而且 timeout 是None
(默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出queue.Empty
异常。反之 (block 是False
时),仅当有可用对象能够取出时返回,否则抛出queue.Empty
异常 (在这种情形下 timeout 参数会被忽略)。在 3.8 版更改: 如果队列已经关闭,会抛出
ValueError
而不是OSError
。
get_nowait
()-----相当于get(False)
。
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
队列是线程和进程安全的。
2、管道(Pipe)
multiprocessing.
Pipe
([duplex])
返回一对 Connection
对象 (conn1, conn2)
, 分别表示管道的两端。
如果 duplex 被置为 True
(默认值),那么该管道是双向的。如果 duplex 被置为 False
,那么该管道是单向的,即 conn1
只能用于接收消息,而 conn2
仅能用于发送消息。
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
返回的两个连接对象 Pipe()
表示管道的两端。每个连接对象都有 send()
和 recv()
方法(相互之间的)。
请注意,如果两个进程(或线程)同时尝试读取或写入管道的 同一 端,则管道中的数据可能会损坏。
当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。
三、进程间同步
对于所有在 threading
存在的同步原语,multiprocessing
中都有类似的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
不使用锁的情况下,来自于多进程的输出很容易产生混淆。
四、服务进程
由 Manager()
返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。
Manager()
返回的管理器支持类型: list
、 dict
、 Namespace
、 Lock
、 RLock
、 Semaphore
、 BoundedSemaphore
、 Condition
、 Event
、 Barrier
、 Queue
、 Value
和 Array
。
from multiprocessing import Process, Manager
def f(d, l):
d[1] = '1'
d['2'] = 2
d[0.25] = None
l.reverse()
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(10))
p = Process(target=f, args=(d, l))
p.start()
p.join()
print(d)
print(l)
将打印
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢
五、使用工作进程
Pool
类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
请注意,进程池的方法只能由创建它的进程使用。
六、连接(Connection)对象
class multiprocessing.connection.
Connection
send
(obj)-----将一个对象发送到连接的另一端,可以用recv()
读取。recv
()------返回一个由另一端使用send()
发送的对象。fileno
()-----返回由连接对象使用的描述符或者句柄。close
()-----关闭连接对象。poll
([timeout])-----返回连接对象中是否有可以读取的数据。send_bytes
(buffer[, offset[, size]])-----从一个 bytes-like object (字节类对象)对象中取出字节数组并作为一条完整消息发送。recv_bytes
([maxlength])-----以字符串形式返回一条从连接对象另一端发送过来的字节数据。此方法在接收到数据前将一直阻塞。ecv_bytes_into
(buffer[, offset])-----将一条完整的字节数据消息读入 buffer 中并返回消息的字节数。 此方法在接收到数据前将一直阻塞。- 七、class
multiprocessing.
Lock一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。
方法:acquire
(block=True, timeout=None)-----获得锁,阻塞或非阻塞的。
如果 block 参数被设为 True
( 默认值 ) , 对该方法的调用在锁处于释放状态之前都会阻塞,然后将锁设置为锁住状态并返回 True
。
如果 block 参数被设置成 False
,方法的调用将不会阻塞。 如果锁当前处于锁住状态,将返回 False
; 否则将锁设置成锁住状态,并返回 True
。
方法:release
()---释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。
八、管理器--管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务。其他进程可以通过代理访问这些共享对象。
1、multiprocessing.
Manager
()---返回一个已启动的 SyncManager
管理器对象,这个对象可以用于在不同进程中共享数据。返回的管理器对象对应了一个已经启动的子进程,并且拥有一系列方法可以用于创建共享对象、返回对应的代理。
2、class multiprocessing.managers.
BaseManager
([address[, authkey]])---创建一个 BaseManager 对象。
一旦创建,应该及时调用 start()
或者 get_server().serve_forever()
以确保管理器对象对应的管理进程已经启动。
address 是管理器服务进程监听的地址。如果 address 是 None
,则允许和任意主机的请求建立连接。
authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkey 是 None
, 则会使用 current_process().authkey
, 否则,就使用 authkey , 需要保证它必须是 byte 类型的字符串。
方法:tart
([initializer[, initargs]])-----为管理器开启一个子进程,如果 initializer 不是 None
, 子进程在启动时将会调用 initializer(*initargs)
方法:get_server
()----返回一个 Server
对象,它是管理器在后台控制的真实的服务。 Server
对象拥有 serve_forever()
方法。
方法:connect
()----将本地管理器对象连接到一个远程管理器进程:
方法:shutdown
()-----停止管理器的进程。这个方法只能用于已经使用 start()
启动的服务进程。它可以被多次调用。
方法:register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])-----一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。
属性:address----
管理器所用的地址。
九、进程池------可以创建一个进程池,它将使用 Pool
类执行提交给它的任务。
class multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现。
map
(func, iterable[, chunksize])-----内置 map()
函数的并行版本 (但它只支持一个 iterable 参数,对于多个可迭代对象请参阅 starmap()
)。 它会保持阻塞直到获得结果。