zoukankan      html  css  js  c++  java
  • python multiprocessing 使用

    如何在Pool中使用Queue,Stack Overflow的回答,戳这里

    其实吧官方文档看一遍应该就大部分都懂了。

    需要注意的是:在使用多进程的时候,我们的进程函数的传入参数必须是pickle-able的,也就是参数必须可以被pickle保存下来,multiprocessing.Queue对象不能传递给pool.apply_*()等函数,需要使用multiprocessing.Manager().Queue()产生的对象

    贴一下代码

    # -*- coding: UTF-8 -*-
    from multiprocessing import Process, Pool, Queue, Manager, JoinableQueue
    import time
    import os
    
    res = []
    
    
    def put_task():
        msg = []
        for i in xrange(50):
            time.sleep(0.1)
            msg.append(str(os.getpid()))
        return ','.join(msg)
    
    
    def collect_results(result):
        res.append(result)
    
    
    def take_task(queue):
        while 1:
            print(queue.get(True))
    
    
    def task_put(name, que):
        for i in range(10):
            time.sleep(1)
            que.put("%d is done" % name)
    
    
    def task_take_queue(que, n):
        i = 0
        while i < n:
            print(que.get(True))
            i += 1
    
    
    def consumer(input_q):
    
        while True:
            item = input_q.get(True)
            # 处理项目
            print item  # 此处替换为有用的工作
            # 发出信号通知任务完成
            input_q.task_done()
    
    
    def producer(output_q):
        sequence = [1, 2, 3, 4]  # range(5)[1:5]
        for item in sequence:
            # 将项目放入队列
            time.sleep(1)
            output_q.put(item)
            # 建立进程
    
    
    def method_1():
        pool = Pool()
        res = pool.map_async(put_task, range(5))
        pool.close()
        pool.join()
        from pprint import pprint
        pprint(res.get())
    
    
    def method_2():
        pool = Pool()
        pool.apply_async(put_task, callback=collect_results)
        pool.apply_async(put_task, callback=collect_results)
        pool.apply_async(put_task, callback=collect_results)
        pool.close()
        pool.join()
        from pprint import pprint
        pprint(res)
    
    
    def method_3():
        pool = Pool(processes=10)
        m = Manager()
        q = m.Queue()
        for i in range(5):
            pool.apply_async(task_put, (i, q))
        pool.apply_async(task_take_queue, (q, 50))
        pool.close()
        pool.join()
    
    
    def method_4():
        q = JoinableQueue()
    
        # 运行使用者进程
        cons_p = Process(target=consumer, args=(q,))
        cons_p.daemon = True  # 定义该进程为后台运行 True - When a process exits, it attempts to terminate all of its daemonic child processes.
        cons_p.start()
        # 生产项目,sequence代表要发送给使用者的项目序列
        # 在时间中,这可能是生成器的输出或通过一些其他方式生产出来
    
        producer(q)
        # 等待所有项目被处理
        q.join()
    
    
    if __name__ == '__main__':
        method_4()


     1 import multiprocessing
     2 import os
     3 import time
     4 
     5 
     6 def pool_init(q):
     7     global queue  # make queue global in workers
     8     queue = q
     9 
    10 
    11 def task():
    12     # can use `queue` here if you like
    13     for i in range(5):
    14         time.sleep(1)
    15         queue.put(os.getpid())
    16 
    17 
    18 def take_task():
    19     while 1:
    20         print(queue.get(True))
    21 
    22 
    23 def run(pool):
    24     tasks = []
    25     tasks.append(pool.apply_async(take_task))
    26     for i in range(os.cpu_count()):
    27         tasks.append(pool.apply_async(task))
    28     for t in tasks:
    29         print(t.get(), )
    30 
    31 
    32 if __name__ == '__main__':
    33     queue = multiprocessing.Queue()
    34     pool = multiprocessing.Pool(initializer=pool_init, initargs=(queue,))
    35     run(pool)
    36     pool.close()
    37     pool.join()
    View Code
    
    
    
     
  • 相关阅读:
    【Spring-Security】Re01 入门上手
    【JDBC】Extra03 PostgreSQL-JDBC
    【JDBC】Extra02 SqlServer-JDBC
    【JDBC】Extra01 Oracle-JDBC
    【Oracle】Windiws-11G 安装
    【Hibernate】Re08 加载策略配置
    【Hibernate】Re07 关系映射处理
    【Hibernate】Re01.6 HQL
    【Hibernate】Re01.5 API
    【Quartz】
  • 原文地址:https://www.cnblogs.com/chen-kh/p/7436362.html
Copyright © 2011-2022 走看看