zoukankan      html  css  js  c++  java
  • 利用multiprocessing.managers开发跨进程生产者消费者模型

    研究了下multiprocessing.managers,略有收获,随笔一篇;

    核心思路是构造一个manager进程,这个进程可以通过unix socket或tcp socket与其它进程通信;因为利用了socket,所以通信的进程间不要求具备父子关系,甚至可以跨主机(通过tcp socket);

    通过manager进行数据结构共享,可以应用于很多的IPC场景;这里只做一个示例,在manager内维护一个队列,做为生产者消费者模型中的消息队列;

    # coding:utf-8
    
    import os
    import time
    import subprocess
    
    from multiprocessing import Process
    #from multiprocessing.managers import SyncManager
    from multiprocessing.managers import BaseManager
    from multiprocessing import JoinableQueue
    
    #父进程
    pid = os.getpid()
    print("parent pid %d" % pid)
    
    class TestManager(BaseManager):
      pass
    
    #用于共享的队列
    jqueue = JoinableQueue()
    
    #用于获取队列的方法
    def _get_queue():
        return jqueue
    
    #启动server(manager)
    def make_server(port, authkey):
        # 注册rpc,获取共享的队列(的代理,这段代码的实现很有意思,建议看看源码)
        TestManager.register("get_queue",callable=lambda : _get_queue())
        # 如果要走tcp socket,就用这一行
        #manager = TestManager(address=('', port), authkey=authkey)
        # 如果要走unix socket,就用这一行
        manager = TestManager(authkey=authkey)
        # 启动server(manager)进程
        manager.start()
        return manager
    
    # consumer进程的入口
    def do_consume(manager):
        print ("consumer pid %d" % os.getpid())
        queue=manager.get_queue()
        count = 0
        while True:
            item = queue.get(block=True)
            #print(item)
            #time.sleep(1)
            queue.task_done()
            if item is None:
                conn = queue._tls.connection
                break
            count += 1
        print ("done consuming %d" % count)
    
    #构造新的manager实例做为client连接manager server
    def make_client(address, authkey):
        # 注册rpc,用于非父子进程环境时,新构造的manager识别rpc方法
        TestManager.register("get_queue")
        manager = TestManager(address=address, authkey=authkey)
        manager.connect()
        return manager
    
    # producer进程的入口
    def do_produce(address, authkey):
        print ("producer pid %d" % os.getpid())
        client=make_client(address, authkey)
        queue=client.get_queue()
        for i in range(10000):
            queue.put(i, block=True)
        print ("done producing")
    
    # terminator进程的入口
    def do_terminate(address, authkey):
        client=make_client(address, authkey)
        queue=client.get_queue()
        queue.put(None, block=True)
    
    authkey = b'foo'
    manager=make_server(6666, authkey)
    address = manager._address
    # 查看manager的进程号
    print ("manager pid %d" % manager._process.ident)
    
    # 通过父子进程变量传递的方式,向consumer进程传递manager
    consumer = Process(target=do_consume, args=(manager, ))
    consumer.start()
    
    # 伪造非父子进程传递address和authkey的方式,向producer进程传递连接manager需要的信息
    producer = Process(target=do_produce, args=(address, authkey))
    producer.start()
    
    # 查看当前的进程树
    status, output = subprocess.getstatusoutput('pstree -p %d' % pid)
    print (output) 
    
    producer.join()
    
    # 伪造非父子进程传递address和authkey的方式,再启动一个terminator进程结束通信
    terminator = Process(target=do_terminate, args=(address, authkey))
    terminator.start()
    terminator.join()
    
    consumer.join()
    

      

    以上示例代码的过程如下:

    1. 构造manager server;
    2. 构造一个consumer进程,直接从父进程获取到manager对象;
    3. 再构造一个producer进程,通过传递address和authkey,新构造manager client,并连接manager server;
    4. producer获取到共享队列,生产消息;
    5. consumer获取到共享队列,消费消息;
    6. terminator(producer)生产一个空消息;
    7. consumer获取到空消息,消费结束;

    结果:

    parent pid 30460
    manager pid 30461
    consumer pid 30463
    producer pid 30464
    python(30460)-+-pstree(30465)
                  |-python(30461)-+-{python}(30462)
                  |               |-{python}(30472)
                  |               |-{python}(30474)
                  |               `-{python}(30475)
                  |-python(30463)
                  `-python(30464)
    done producing
    done consuming 10000
    

      

    可以看到共生成4个子进程,一个manager server、一个consumer、一个producer、还有一个pstree查看进程树;

  • 相关阅读:
    MySQL的数据库,数据表,数据的操作
    数组函数
    字符串函数,时间函数,数学函数,数组
    PHP函数
    php类型的相关函数,运算符,条件判断,循环
    PHP数据类型
    vector中erase用法注意事项
    C++11 右值引用&&
    vector中find和find_if的用法 以后再遍历剁手!
    在cocos2d中添加自己的shader教程
  • 原文地址:https://www.cnblogs.com/ZisZ/p/10770436.html
Copyright © 2011-2022 走看看