由于和线程相似,这里的前几个示例都是从线程示例中修改的。
内容目录
- multiprocessing基础
- 可导入的目标函数
- 确定当前进程
- 守护进程 Daemon
- 等待进程 join()
- 终止进程 terminate()
- 进程退出状态
- 调试 log_to_stderr
- 子类化进程
1.multiprocessing基础
生成进程的最简单的方法是用目标函数实例化一个进程对象,并调用start()来让它开始工作。
import multiprocessing
def worker():
"""worker function"""
print('Worker')
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
运行结果:
Worker
Worker
Worker
Worker
Worker
传递参数:
import multiprocessing
def worker(num):
"""thread worker function"""
print('Worker:', num)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
运行结果:
Worker: 1
Worker: 0
Worker: 3
Worker: 2
Worker: 4
2.可导入的目标函数
# main.py
import multiprocessing
import multiprocessing_import_worker
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(
target=multiprocessing_import_worker.worker,
)
jobs.append(p)
p.start()
其中worker()函数在multiprocessing_import_worker.py中定义:
# multiprocessing_import_worker.py
def worker():
"""worker function"""
print('Worker')
return
运行结果:
Worker
Worker
Worker
Worker
Worker
3.确定当前进程
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print(name, 'Starting')
time.sleep(2)
print(name, 'Exiting')
def my_service():
name = multiprocessing.current_process().name
print(name, 'Starting')
time.sleep(3)
print(name, 'Exiting')
if __name__ == '__main__':
service = multiprocessing.Process(
name='my_service',
target=my_service,
)
worker_1 = multiprocessing.Process(
name='worker 1',
target=worker,
)
worker_2 = multiprocessing.Process( # default name
target=worker,
)
worker_1.start()
worker_2.start()
service.start()
运行结果:可以看到默认的进程名字对应于Process-3, 和线程很类似。
worker 1 Starting
Process-3 Starting
my_service Starting
worker 1 Exiting
Process-3 Exiting
my_service Exiting
4.守护进程 Daemon
默认情况下主进程会在子进程全部执行完毕后才退出,如果子进程设置为守护进程,便不再阻塞主进程退出。
为了将进程标记为守护进程只需将daemon属性设置为True。默认情况下,进程不是守护进程。
import multiprocessing
import time
import sys
def daemon():
p = multiprocessing.current_process()
print('Starting:', p.name, p.pid)
sys.stdout.flush()
time.sleep(2)
print('Exiting :', p.name, p.pid)
sys.stdout.flush()
def non_daemon():
p = multiprocessing.current_process()
print('Starting:', p.name, p.pid)
sys.stdout.flush()
print('Exiting :', p.name, p.pid)
sys.stdout.flush()
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
运行结果:可以看到d进程还没执行完主进程就退出了。
Starting: daemon 24852
Starting: non-daemon 23248
Exiting : non-daemon 23248
在主程序退出之前,守护进程会自动终止,这将避免留下孤儿进程。
5.等待进程 join()
要等到进程完成工作再退出,请使用join()方法。
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
time.sleep(2)
print('Exiting :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('Starting:', name)
print('Exiting :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
d.join()
n.join()
运行结果:
Starting: non-daemon
Exiting : non-daemon
Starting: daemon
Exiting : daemon
默认情况下,join()无限期阻塞。也可以使用超时参数。
6.终止进程 terminate()
在一个进程调用terminate()会杀死子进程
import multiprocessing
import time
def slow_worker():
print('Starting worker')
time.sleep(0.1)
print('Finished worker')
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print('BEFORE:', p, p.is_alive())
p.start()
print('DURING:', p, p.is_alive())
p.terminate()
print('TERMINATED:', p, p.is_alive())
p.join()
print('JOINED:', p, p.is_alive())
运行结果:在终止它之后,使用join()是很重要的,以便让进程管理代码有时间来更新对象的状态以反映终止。
BEFORE: <Process(Process-1, initial)> False
DURING: <Process(Process-1, started)> True
TERMINATED: <Process(Process-1, started)> True
JOINED: <Process(Process-1, stopped[SIGTERM])> False
7.进程退出状态
当进程退出时产生的状态码可以通过exitcode属性访问。允许的范围列在下面的表格中。
Exit Code | Meaning |
---|---|
== 0 | row 1 col 2 |
0 | the process had an error, and exited with that code
< 0 | the process had an error, and exited with that code
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
funcs = [
exit_error,
exit_ok,
return_value,
raises,
terminated,
]
for f in funcs:
print('Starting process for', f.__name__)
j = multiprocessing.Process(target=f, name=f.__name__)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
运行结果:
Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated
Process raises:
exit_error.exitcode = 1
exit_ok.exitcode = 0
Traceback (most recent call last):
File "C:UsersAdministratorAppDataLocalProgramsPythonPython36libmultiprocessingprocess.py", line 258, in _bootstrap
self.run()
File "C:UsersAdministratorAppDataLocalProgramsPythonPython36libmultiprocessingprocess.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "E:MyPythonimage.py", line 19, in raises
raise RuntimeError('There was an error!')
RuntimeError: There was an error!
return_value.exitcode = 0
raises.exitcode = 1
terminated.exitcode = -15
exit(0):无错误退出
exit(1):有错误退出
退出代码是告诉解释器的(或操作系统)
8.调试 log_to_stderr
在调试并发性问题时,可以使用multiprocessing所提供的对象的内部构件。有一个方便的模块级函数为logtostderr()。它使用logging设置一个记录器对象,并添加一个处理程序,以便将日志消息发送到标准错误通道。
import multiprocessing
import logging
import sys
def worker():
print('Doing some work')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
运行结果:认情况下,logging级别被设置为NOTSET不会产生任何消息。
Doing some work
[INFO/Process-1] child process calling self.run()
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
若想直接操作日志请使用get_logger()
获取
import multiprocessing
import logging
import sys
def worker():
print('Doing some work')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
运行结果:
[INFO/Process-1] child process calling self.run()
Doing some work
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
9.子类化进程
通过multiprocessing.Process
可以创建进程,也可以通过自定义子类创建进程。
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print('In {}'.format(self.name))
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
运行结果:
In Worker-2
In Worker-4
In Worker-3
In Worker-1
In Worker-5