话不多说,直接上代码:
import time from functools import partial from concurrent.futures.process import ProcessPoolExecutor # 异步回调需要加锁吗? """ 疑问来源,在做数据分析时,为了提高程序速度,使用了进程池,并为了汇总每个进程处理的结果,使用到回调函数, 我的回调函数中需要对数据进行修改,因此不得不考虑数据安全问题。 按照我的想法:回调在进程执行完毕之后才发生,此时已经不在是并发了,那么就不会有数据安全问题,无需加锁。 但是稳妥起见,还是测试一波。 我建立了下面这个demo1,在回调的时候特地sleep2秒,然后计算程序运行总时间,根据打印结果,一共花费8秒多, 显然在处理回调函数pring_n的时候程序时同步执行的,那么自然无需加锁。 那么执行回调函数print_n的顺序是怎样的呢?是按照对应的进程结束时间调用的吗?根据源码注释: These callables are called in the order that they were added(回调函数按照他们被添加的顺序调用) 也就是说回调函数调用顺序跟他所在进程被添加顺序一致,而非按照他所在的进程的结束时间执行。 为了验证这一点,我做了demo2,将第二个进程(i为1)sleep更长时间,从结果来看,和demo1顺序一致, "1经过计算后的结果是1"并未在最后打印,证实了源码说法。 """ # demo1 # def count(): # for i in range(4): # yield i # # # def print_n(i, ret, obj): # print(f'{i}经过计算后的结果是{obj.result()}') # ret.append(obj.result()) # time.sleep(2) # # # def do_calc(i): # return i ** 2 # # # if __name__ == '__main__': # s = time.time() # l = Lock() # pool = ProcessPoolExecutor(4) # ret = [] # for i in range(4): # pool.submit(do_calc, i).add_done_callback(partial(print_n, i, ret)) # pool.shutdown(wait=True) # print('end', time.time() -s) # print('计算结果', ret) # demo2 def count(): for i in range(4): yield i def print_n(i, ret, obj): print(f'{i}经过计算后的结果是{obj.result()}') ret.append(obj.result()) if i == 1: time.sleep(4) def do_calc(i): return i ** 2 if __name__ == '__main__': s = time.time() pool = ProcessPoolExecutor(4) ret = [] for i in range(4): pool.submit(do_calc, i).add_done_callback(partial(print_n, i, ret)) pool.shutdown(wait=True) print('end', time.time() -s) print('计算结果', ret)
代码运行结果我没有打印出来,大家可以copy一下运行。
另外,python中很多并发库都支持回调机制,大家感兴趣可以试验一下,增强对并发的理解。