Preface
看了PrefetchedIter
(MXNet)后,有一段时间打算用多线程对迭代器进行加速,后面发现不是特别有必要。但最近似乎看到了些需求。
PrefetchedIter里面用的是thread
模块,但了解一番后,发现社区里面multiprocessing
比较受推荐,主要原因是解释器中GIL
导致产生多核在计算密集型任务中相当鸡肋,参考大佬博客。
Code
程序是简单测试下计算密集型任务中,multiprocessing多线程表现的性能和单线程对比的情况。通过一个Queue
进行数据同步。
import multiprocessing as mtp
from multiprocessing import Queue, Process
import time, os
import numpy as np
import mxnet as mx
numP = 4
it =3
n=40000
def f(Q,n):
for i in xrange(it):
while n>0:
i=n
n -= 1
while i>0:
i -= 1
if Q is not None:
Q.put(mx.nd.random.uniform(shape=(10,4)) )
print('enqueue from pid: %d'%os.getpid())
print('pid:%d exits'%os.getpid())
if __name__ == '__main__':
Q = Queue(numP)
plist = []
for i in xrange(numP):
plist.append( Process(target=f, args=(Q, n) ) )
t0=time.time()
for p in plist:
p.start()
for i in xrange(numP*it):
data = Q.get()
t1=time.time()
# [4 process(es) with 3 iteration(s), 12 object(s)] time elapsed: 13.219889 s, 0.9077 object(s)/sec
#[4 process(es) with 3 iteration(s), 12 object(s)] time elapsed: 44.230299 s, 0.2713 object(s)/sec
print('[%d process(es) with %d iteration(s), %d object(s)] time elapsed: %f s, %.4f object(s)/sec'%(numP, it,numP*it, t1-t0, (numP*it)/(t1-t0)))
# test single process...
t0=time.time()
f(Q, n)
Q.get()
t1=time.time()
#[single process with 3 iteration(s), 3 object(s)] time elapsed: 7.293080 s, 0.4113 object(s)/sec
#[single process with 3 iteration(s), 3 object(s)] time elapsed: 28.916437 s, 0.1037 object(s)/sec
print('[single process with %d iteration(s), %d object(s)] time elapsed: %f s, %.4f object(s)/sec'%(it,1*it,t1-t0, it/(t1-t0)))
四个线程的性能大致是单个的两倍。具体算一下,当任务复杂度提升时(n:(20000 ightarrow 40000)),倍数从(frac{.9077}{.4113}=2.207)升至(frac{.2712}{.1037}=2.616),数据来源参见注释。