1 import os
2 from multiprocessing import Pool, Manager
3
4
5 def handle_task(arg1, arg2):
6 lock.acquire()
7 # get from queuing_task_lst
8 # enqueue running_task_dt
9 proc_id = os.getpid()
10 lock.release()
11
12
13 def async_deal_task_queue():
14 # query
15 queuing_task_lst.append(123)
16
17
18 def init(mgr_lock):
19 global lock
20 lock = mgr_lock
21
22
23 if __name__ == '__main__':
24 manager = Manager()
25 lock = manager.Lock()
26 queuing_task_lst = manager.list()
27 running_task_dt = manager.dict()
28
29 proc_pool = Pool(processes=10, initializer=init, initargs=(lock,))
30 for i in range(0, 10):
31 try:
32 proc_pool.apply_async(handle_task, args=(queuing_task_lst, running_task_dt))
33 except KeyboardInterrupt as err:
34 proc_pool.terminate()
35 except Exception as e:
36 proc_pool.terminate()
37 async_deal_task_queue()
38 try:
39 proc_pool.close()
40 proc_pool.join()
41 except Exception:
42 pass
43 finally:
44 pass
> 说明:
上述代码中,handle_task进程和async_deal_task_queue进程异步执行
async_deal_task_queue作为生产者,往全局队列 queuing_task_lst 放入生产的产品
handle_task 从 queuing_task_lst 中拿到产品进行消费
消费方式是一次性将生产的产品存多个,然后再一个一个进行处理
处理过程中使用了全局锁,通过 Pool的参数 initializer 及 initargs 来实现
> 细节技术:
manager.dict() 不同于一般意义上python 的 dict, 他的取法有如下特殊点:
取key只能用 run_task_dt.keys()
获取value用 tid_lst = run_task_dt[key]
赋值能用 run_task_dt[queue_name] = value, 可变对象记得deepcopy
manager要使用 multiprocessing下的manager
还可以参考: https://thief.one/2016/11/24/Multiprocessing%E5%AD%90%E8%BF%9B%E7%A8%8B%E8%BF%94%E5%9B%9E%E5%80%BC/