zoukankan      html  css  js  c++  java
  • Python3的multiprocessing多进程-Queue、Pipe进程间通信

    Python3的multiprocessing多进程-Queue、Pipe进程间通信

    一、简介

    当使用多个进程时,通常使用消息传递来进行进程之间的通信,并避免必须使用任何同步原语(如锁)。对于传递消息,可以使用Pipe()(用于两个进程之间的连接)或队列Queue(允许多个生产者和消费者)。
    multiprocessing使用通常queue.Empty和 queue.Full异常来发出超时信号。它们在multiprocessing命名空间中不可用,因此需要从中导入它们 queue。

    1、Queue 用来在多个进程间通信。Queue 有两个方法,get 和 put:

    class multiprocessing.Queue([ maxsize ] )
    • put:放数据,Queue.put( )默认有block=True和timeout两个参数。当block=True时,写入是阻塞式的,阻塞时间由timeout确定。当队列q被(其他线程)写满后,这段代码就会阻塞,直至其他线程取走数据。Queue.put()方法加上 block=False 的参数,即可解决这个隐蔽的问题。但要注意,非阻塞方式写队列,当队列满时会抛出 exception Queue.Full 的异常
    • get:取数据(默认阻塞),Queue.get([block[, timeout]])获取队列,timeout等待时间
    from multiprocessing import Process, Queue
    import os, time, random
    # 写数据进程执行的代码:
    def _write(q,urls):
        print('Process(%s) is writing...' % os.getpid())
        for url in urls:
            q.put(url)
            print('Put %s to queue...' % url)
            time.sleep(random.random())
    # 读数据进程执行的代码:
    def _read(q):
        print('Process(%s) is reading...' % os.getpid())
        while True:
            url = q.get(True)
            print('Get %s from queue.' % url)
    if __name__=='__main__':
        # 父进程创建Queue,并传给各个子进程:
        q = Queue()
        _writer1 = Process(target=_write, args=(q,['url_1', 'url_2', 'url_3']))
        _writer2 = Process(target=_write, args=(q,['url_4','url_5','url_6']))
        _reader = Process(target=_read, args=(q,))
        # 启动子进程_writer,写入:
        _writer1.start()
        _writer2.start()
        # 启动子进程_reader,读取:
        _reader.start()
        # 等待_writer结束:
        _writer1.join()
        _writer2.join()
        # _reader进程里是死循环,无法等待其结束,只能强行终止:
        _reader.terminate()
    '''
    Process(7460) is writing...
    Put url_1 to queue...
    Process(13764) is writing...
    Put url_4 to queue...
    Process(13236) is reading...
    Get url_1 from queue.
    Get url_4 from queue.
    Put url_2 to queue...
    Get url_2 from queue.
    Put url_5 to queue...
    Get url_5 from queue.
    Put url_6 to queue...
    Get url_6 from queue.
    Put url_3 to queue...
    Get url_3 from queue.
    
    '''

    2、Pipe常用来在两个进程间通信,两个进程分别位于管道的两端。

    multiprocessing.Pipe([duplex])
    (con1, con2) = Pipe() 
    • con1管道的一端,负责存储,也可以理解为发送信息
    • con2管道的另一端,负责读取,也可以理解为接受信息
    from multiprocessing import Process, Pipe
    
    def send(pipe):
        pipe.send(['spam'] + [42, 'egg'])   # send 传输一个列表
        pipe.close()
    
    if __name__ == '__main__':
        (con1, con2) = Pipe()                            # 创建两个 Pipe 实例
        sender = Process(target=send, args=(con1, ))     # 函数的参数,args 一定是实例化之后的 Pipe 变量,不能直接写 args=(Pip(),)
        sender.start()                                   # Process 类启动进程
        print("con2 got: %s" % con2.recv())              # 管道的另一端 con2 从send收到消息
        con2.close()                                     # 关闭管道

    3、管道是可以同时发送和接受消息的:

    from multiprocessing import Process, Pipe
    
    def talk(pipe):
        pipe.send(dict(name='Bob', spam=42))            # 传输一个字典
        reply = pipe.recv()                             # 接收传输的数据
        print('talker got:', reply)
    
    if __name__ == '__main__':
        (parentEnd, childEnd) = Pipe()                  # 创建两个 Pipe() 实例,也可以改成 conf1, conf2
        child = Process(target=talk, args=(childEnd,))  # 创建一个 Process 进程,名称为 child
        child.start()                                   # 启动进程
        print('parent got:', parentEnd.recv())          # parentEnd 是一个 Pip() 管道,可以接收 child Process 进程传输的数据
        parentEnd.send({x * 2 for x in 'spam'})         # parentEnd 是一个 Pip() 管道,可以使用 send 方法来传输数据
        child.join()                                    # 传输的数据被 talk 函数内的 pip 管道接收,并赋值给 reply
        print('parent exit')
    
    '''
    parent got: {'name': 'Bob', 'spam': 42}
    talker got: {'ss', 'mm', 'pp', 'aa'}
    parent exit
    '''
  • 相关阅读:
    Java学习笔记(4)
    Idea常用功能汇总
    Java学习笔记(3)
    Java学习笔记(2)
    Java学习笔记(1)
    如何开发NPM包
    c#抓屏功能在DPI缩放后,截到的图片不完整的问题
    支持续传功能的ASP.NET WEB API文件下载服务
    ASP.NET MVC 阻止通过Url直接访问服务器上的静态文件
    VS2013/VS2015/VS2017通过oschina托管代码
  • 原文地址:https://www.cnblogs.com/lizm166/p/14658360.html
Copyright © 2011-2022 走看看