zoukankan      html  css  js  c++  java
  • 异步进程 multiprocessing 模板

     1 import os
     2 from multiprocessing import Pool, Manager
     3 
     4 
     5 def handle_task(arg1, arg2):
     6     lock.acquire()
     7     # get from queuing_task_lst
     8     # enqueue running_task_dt
     9     proc_id = os.getpid()
    10     lock.release()
    11 
    12 
    13 def async_deal_task_queue():
    14     # query
    15     queuing_task_lst.append(123)
    16 
    17 
    18 def init(mgr_lock):
    19     global lock
    20     lock = mgr_lock
    21 
    22 
    23 if __name__ == '__main__':
    24     manager = Manager()
    25     lock = manager.Lock()
    26     queuing_task_lst = manager.list()
    27     running_task_dt = manager.dict()
    28 
    29     proc_pool = Pool(processes=10, initializer=init, initargs=(lock,))
    30     for i in range(0, 10):
    31         try:
    32             proc_pool.apply_async(handle_task, args=(queuing_task_lst, running_task_dt))
    33         except KeyboardInterrupt as err:
    34             proc_pool.terminate()
    35         except Exception as e:
    36             proc_pool.terminate()
    37     async_deal_task_queue()
    38     try:
    39         proc_pool.close()
    40         proc_pool.join()
    41     except Exception:
    42         pass
    43     finally:
    44         pass

    > 说明:

    上述代码中,handle_task进程和async_deal_task_queue进程异步执行
    async_deal_task_queue作为生产者,往全局队列 queuing_task_lst 放入生产的产品
    handle_task 从 queuing_task_lst 中拿到产品进行消费
    消费方式是一次性将生产的产品存多个,然后再一个一个进行处理
    处理过程中使用了全局锁,通过 Pool的参数 initializer 及 initargs 来实现

    > 细节技术:

    manager.dict() 不同于一般意义上python 的 dict, 他的取法有如下特殊点:
    取key只能用 run_task_dt.keys()
    获取value用 tid_lst = run_task_dt[key]
    赋值能用 run_task_dt[queue_name] = value, 可变对象记得deepcopy
    manager要使用 multiprocessing下的manager

    还可以参考: https://thief.one/2016/11/24/Multiprocessing%E5%AD%90%E8%BF%9B%E7%A8%8B%E8%BF%94%E5%9B%9E%E5%80%BC/

  • 相关阅读:
    Go语言TCP/UDP Socket编程
    Go目录
    Go语言获取项目当前路径
    Mysql写入记录出现 Incorrect string value: 'xB4xE7xB1xCAxBCxC7‘错误?(写入中文)
    Erlang的Web库和框架
    erlang 资源
    Erlang基础2
    Erlang语言基础总结
    angular修改端口号port
    npm ERR! Cannot read property 'resolve' of undefined
  • 原文地址:https://www.cnblogs.com/zhanghaibin16/p/13322056.html
Copyright © 2011-2022 走看看