1. multiprocessing.Pool
from multiprocessing.pool import Pool def gen_row(): ...return rows def main(rows):
i = 1 for row in rows: i += 1
...
print(i)
if __name__ == "__main__": rows = gen_row() with Pool(4) as p: p.map(main, (rows[:100000],rows[100000:200000],rows[200000:300000],rows[300000:]))
#会打印4个 i 出来,把 map()第二个参数给定长度为1的 iterable 参数,只打印一个 i 出来;
#如果是 Pool(2) ,传4个 rows 进去依然得到4个 i ;
#不使用 Pool 和使用 Pool 得到的结果居然不一样,没想明白,所以不能用在操作同一个对象上,p.map(f, [1,2,3,4]) 这样参数之间不存在关系就能保证结果一致。
下面这个官网的例子能体现出优势:
from multiprocessing import Pool def f(x): return x*x with Pool() as p: p.map(f, range(10000000))
再想一下之后,看下面:
import json import MySQLdb from multiprocessing.pool import Pool def gen_row(): db = MySQLdb.connect(host='192.168.1.205', user='root', passwd='123456', db='kaqu') c = db.cursor() c.execute("select params from t1") rows = c.fetchall() return rows def main(row): # for row in rows: try: latitude = float(json.loads(row[0])['latitude']) longitude = float(json.loads(row[0])['longitude']) if not (latitude == 5e-324 or latitude == 0.0): print(latitude, longitude) except: pass if __name__ == "__main__": rows = gen_row() with Pool(2) as p: p.map(main, rows) #这里直接可以把rows拿过来
奇怪的是:
Pool(2)
time python test2.py >> all.log #wc -l all.log 383696 all.log
Pool(4)
time python test2.py >> all2.log #wc -l all2.log 384183 all2.log
Pool(5)
# wc -l all.log 383881 all.log
Pool(10)
# wc -l all.log 383966 all.log
每次结果都不一样!!!系统是VM 4核心虚拟机。其中 Pool(4) 是可靠的数据,因此请根据 cpu 核心数来操作!!
PS:导致以上结果差异的原因是没有等待线程结束,加上close() join() 即可,见下面实例。
小结:
使用官网的例子,Pool() 不用带第一个参数,会自动根据cpu数来进行。
实例:
import multiprocessing as mul import time def f(number): time.sleep(1) return number + 1 if __name__ == '__main__': sequence = list(range(4)) p = mul.Pool() print(p.map(f, sequence)) p.close() p.join()
实例2:
from multiprocessing import Pool import os, time, random def long_time_task(name): print('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3) end = time.time() print('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__=='__main__': print('Parent process %s.' % os.getpid()) p = Pool(4) for i in range(8): p.apply_async(long_time_task, args=(i,)) print('Waiting for all subprocesses done...') p.close() p.join() print('All subprocesses done.')
输出:
Parent process 11756.
Waiting for all subprocesses done...
Run task 0 (11048)...
Run task 1 (13032)...
Run task 2 (6736)...
Run task 3 (8884)...
Task 3 runs 0.50 seconds.
Run task 4 (8884)...
Task 2 runs 1.03 seconds.
Run task 5 (6736)...
Task 1 runs 1.19 seconds.
Run task 6 (13032)...
Task 0 runs 2.86 seconds.
Run task 7 (11048)...
Task 4 runs 2.69 seconds.
Task 6 runs 2.22 seconds.
Task 5 runs 2.76 seconds.
Task 7 runs 2.10 seconds.
All subprocesses done.
可以发现只创建了4个子进程,因为设定了 Pool(4) 。
PS:参考廖雪峰 Python