zoukankan      html  css  js  c++  java
  • python类库32[多进程通信Queue+Pipe+Value+Array]

    多进程通信

    queue和pipe的区别: pipe用来在两个进程间通信。queue用来在多个进程间实现通信。 此两种方法为所有系统多进程通信的基本方法,几乎所有的语言都支持此两种方法。

    1)Queue & JoinableQueue

    queue用来在进程间传递消息,任何可以pickle-able的对象都可以在加入到queue。 

    multiprocessing.JoinableQueue 是 Queue的子类,增加了task_done()和join()方法。

     

    task_done()用来告诉queue一个task完成。一般地在调用get()获得一个task,在task结束后调用task_done()来通知Queue当前task完成。

    join() 阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。

    代码:

    复制代码
    import multiprocessing
    import time

    class Consumer(multiprocessing.Process):
        
        def __init__(self, task_queue, result_queue):
            multiprocessing.Process.__init__(self)
            self.task_queue = task_queue
            self.result_queue = result_queue

        def run(self):
            proc_name = self.name
            while True:
                next_task = self.task_queue.get()
                if next_task is None:
                    # Poison pill means shutdown
                    print ('%s: Exiting' % proc_name)
                    self.task_queue.task_done()
                    break
                print ('%s: %s' % (proc_name, next_task))
                answer = next_task() # __call__()
                self.task_queue.task_done()
                self.result_queue.put(answer)
            return


    class Task(object):
        def __init__(self, a, b):
            self.a = a
            self.b = b
        def __call__(self):
            time.sleep(0.1) # pretend to take some time to do the work
            return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
        def __str__(self):
            return '%s * %s' % (self.a, self.b)


    if __name__ == '__main__':
        # Establish communication queues
        tasks = multiprocessing.JoinableQueue()
        results = multiprocessing.Queue()
        
        # Start consumers
        num_consumers = multiprocessing.cpu_count()
        print ('Creating %d consumers' % num_consumers)
        consumers = [ Consumer(tasks, results)
                      for i in range(num_consumers) ]
        for w in consumers:
            w.start()
        
        # Enqueue jobs
        num_jobs = 10
        for i in range(num_jobs):
            tasks.put(Task(i, i))
        
        # Add a poison pill for each consumer
        for i in range(num_consumers):
            tasks.put(None)

        # Wait for all of the tasks to finish
        tasks.join()
        
        # Start printing results
        while num_jobs:
            result = results.get()
            print ('Result:', result)
            num_jobs -= 1
    复制代码

     注意小技巧: 使用None来表示task处理完毕。

    运行结果:

    2) pipe

    pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法。

    代码:

    复制代码
    from multiprocessing import Process, Pipe

    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()

    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        p.join()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
    复制代码

    3)Value + Array

    Value + Array 是python中共享内存 映射文件的方法,速度比较快。

    复制代码
    from multiprocessing import Process, Value, Array

    def f(n, a):
        n.value = n.value + 1
        for i in range(len(a)):
            a[i] = a[i] * 10

    if __name__ == '__main__':
        num = Value('i', 1)
        arr = Array('i', range(10))

        p = Process(target=f, args=(num, arr))
        p.start()
        p.join()

        print(num.value)
        print(arr[:])
        
        p2 = Process(target=f, args=(num, arr))
        p2.start()
        p2.join()

        print(num.value)
        print(arr[:])

    # the output is :
    # 2
    # [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
    # 3
    # [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]
    复制代码

    参考: 
    The Python Standard Library By Example

    http://www.doughellmann.com/PyMOTW/multiprocessing/communication.html 

    完! 

  • 相关阅读:
    Vue.js_础学习之DOM操作
    node REPL
    node npm
    Vue.js_getter and setter
    tomcat+nginx+redis实现均衡负载以及session共享
    深入浅出微服务框架dubbo(一):基础篇
    Linux下安装zip解压功能
    Linux下查看CPU型号,内存大小,硬盘空间的命令
    Linux查看系统信息命令
    MyBatis自动生成代码之generatorConfig配置文件及其详细解读
  • 原文地址:https://www.cnblogs.com/L-H-R-X-hehe/p/3815349.html
Copyright © 2011-2022 走看看