zoukankan      html  css  js  c++  java
  • Python 进程之间共享数据

      最近遇到多进程共享数据的问题,到网上查了有几篇博客写的蛮好的,记录下来方便以后查看。

    一、Python multiprocessing 跨进程对象共享 

      在mp库当中,跨进程对象共享有三种方式,第一种仅适用于原生机器类型,即python.ctypes当中的类型,这种在mp库的文档当中称为shared memory 方式,即通过共享内存共享对象;另外一种称之为server process , 即有一个服务器进程负责维护所有的对象,而其他进程连接到该进程,通过代理对象操作服务器进程当中的对象;最后一种在mp文档当中没有单独提出,但是在其 中多次提到,而且是mp库当中最重要的一种共享方式,称为inheritance ,即继承,对象在 父进程当中创建,然后在父进程是通过multiprocessing.Process创建子进程之后,子进程自动继承了父进程当中的对象,并且子进程对这 些对象的操作都是反映到了同一个对象。

      这三者共享方式各有特色,在这里进行一些简单的比较。

      首先是共享方式所应对的对象类型,看这个表:

    共享方式 支持的类型
    Shared memory ctypes当中的类型,通过RawValue,RawArray等包装类提供
    Inheritance 系统内核对象,以及基于这些对象实现的对象。包括Pipe, Queue, JoinableQueue, 同步对象(Semaphore, Lock, RLock, Condition, Event等等)
    Server process 所有对象,可能需要自己手工提供代理对象(Proxy)

      这个表总结了三种不同的共享方式所支持的类型,下面一个个展开讨论。

      其中最单纯简单的就是shared memory这种方式,只有ctypes当中的数据类型可以通过这种方式共享。由于mp库本身缺少命名的机制,即在一个进程当中创建的对象,无法在另外一 个进程当中通过名字来引用,因此,这种共享方式依赖于继承,对象应该由父进程创建,然后由子进程引用。关于这种机制的例子,可以参见Python文档 当中的例子 Synchronization types like locks, conditions and queues,参考其中的test_sharedvalues函数。

      然后是继承方式。首先关于继承方式需要有说明,继承本质上并不是一种对象共享的机制,对象共享只是其副作用。子进程从父进程继承来的对象并不一定是 共享的。继承本质上是父进程fork出的子进程自动继承父进程的内存状态和对象描述符。因此,实际上子进程复制 了一份 父进程的对象,只不过,当这个对象包装了一些系统内核对象的描述符的时候,拷贝这个对象(及其包装的描述符)实现了对象的共享。因此,在上面的表当中,只 有系统内核对象,和基于这些对象实现的对象,才能够通过继承来共享。通过继承共享的对象在linux平台上没有任何限制,但是在Windows上面由于没 有fork的实现,因此有一些额外的限制条件 ,因此,在Windows上面,继承方式是几乎无法用的。

      最后就是Server Process这种方式。这种方式可以支持的类型比另外两种都多,因为其模型是这样的:

    server process模型

      server process模型

      在这个模型当中,有一个manager进程,负责管理实际的对象。真正的对象也是在manager进程的内存空间当中。所有需要访问该对象的进程都 需要先连接到该管理进程,然后获取到对象的一个代理对象(Proxy object),通常情况下,这个代理对象提供了实际对象的公共函 数 的代理,将函数参数进行pickle,然后通过连接传送到管理进程当中,管理进程将参数unpickle之后,转发给相应的实际对象 的函数,返回值(或者异常)同样经过管理进程pickle之后,通过连接传回到客户进程,再由proxy对象进行unpickle,返回给调用者或者抛出 异常。

      很明显,这个模型是一个典型的RPC(远程过程调用)的模型。因为每个客户进程实际上都是在访问manager进程当中的对象,因此完全可以通过这 个实现对象共享。

      manager和proxy之间的连接可以是基于socket的网络连接,也可以是unix pipe。如果是使用基于socket的连接方式,在使用proxy之前,需要调用manager对象的connect函数与远程的manager进程建 立连接。由于manager进程会打开端口接收该连接,因此必要的身份验证是需要的,否则任何人都可以连上manager弄乱你的共享对象。mp库通过 authkey的方式来进行身份验证。

      在实现当中,manager进程通过multiprocessing.Manager类或者BaseManager的子类实现。 BaseManager提供了函数register注册一个函数来获取共享对象的proxy。这个函数会被客户进程调用,然后在manager进程当中执 行。这个函数可以返回一个共享的对象(对所有的调用返回同一个对象),或者可以为每一个调用创建一个新的对象,通过前者就可以实现多个进程共享一个对象。 关于这个的用法可以参考Python文档 当中的例子“Demonstration of how to create and use customized managers and proxies”。

    典型的导出一个共享对象的代码是:

    1. ObjectType object_  
    2. class ObjectManager(multiprocessing.managers.BaseManager): pass  
    3. ObjectManager.register("object", lambda: object_)  

      注意上面介绍proxy对象的时候,我提到的“公共函数”四个字。每个proxy对象只会导出实际对象的公共函数。这里面有两个含义,一个是“公 共”,即所有非下划线开头的成员,另一个是“函数”,即所有callable的成员。这就带来一些限制,一是无法导出属性,二是无法导出一些公共的特殊函 数,例如__get__, __next__等等。对于这个mp库有一套处理,即自定义proxy对象。首先是BaseManager的register可以提供一个 proxy_type作为第三个参数,这个参数指定了哪些成员需要被导出。详细的使用方法可以参见文档当中的第一个例子。

      另外manager还有一些细节的问题需要注意。由于Proxy对象不是线程安全的,因此如果需要在一个多线程程序当中使用proxy,mp库会为 每个线程创建一个proxy对象,而每个proxy对象都会对server process创建一个连接,而manager那边对于每个连接都创建一个单独的线程来为其服务。这样带来的问题就是,如果客户进程有很多线程,很容易会 导致manager进程的fd数目达到ulimit的限制,即使没有达到限制,也会因为manager进程当中有太多线程而严重影响manager的性 能。解决方案可以是一个进程内cache,只有一个单独的线程可以创建proxy对象访问共享对象,其余线程只能访问该进程当中的cache。

      一旦manager因为达到ulimit限制或者其他异常,manager会直接退出,遗憾的是,这时候已经建立的proxy会试图重新连接 manager – 但是它已经不存在了。这个会导致客户进程hang在对proxy的函数调用上,这个时候,目前除了杀掉进程没有找到别的办法。

      另外proxy使用socket的方式比较tricky,因此和内置的socket库有很多冲突,比如 socket.setdefaulttimeout(Python Issue 6056 )。在setdefaulttimeout调用了之后,进程当中所有通过socket模块建立的socket都是被设置为unblock模式的,但是mp 库并不知道这一点,而且它总是假设socket都是block模式的,于是,一旦调用了setdefaulttimeout,所有对于proxy的函数调 用都会抛出OSError,错误代码为11,错误原因是非常有误导性的“Resource temporarily unavailable”,实际上就是EAGAIN。这个错误可以通过我提供的一个patch 来补救(这个patch当中还包含其他的一些修复,所以请自行查看并修改该patch)。

      由于以上的一些原因,server process模式作为一个对象的共享模式,能够提供最为灵活的共享方式,但是也有最多的问题。这个在使用过程当中就靠自己去衡量了。目前我们的系统对于 数据可靠性方面要求不高,丢失数据是可以接受的,但是也只用这种模式来维护统计值,不敢用来维护更多的东西。

    二、Python多进程写入同一文件

      最近用python的正则表达式处理了一些文本数据,需要把结果写到文件里面,但是由于文件比较大,所以运行起来花费的时间很长。但是打开任务管理器发现CPU只占用了25%,上网找了一下原因发现是由于一个叫GIL的存在,使得Python在同一时间只能运行一个线程,所以只占用了一个CPU,由于我的电脑是4核的,所以CPU利用率就是25%了。

      既然多线程没有什么用处,那就可以使用多进程来处理,毕竟多进程是可以不受GIL影响的。Python提供了一个multiprocessing的多进程库,但是多进程也有一些问题,比如,如果进程都需要写入同一个文件,那么就会出现多个进程争用资源的问题,如果不解决,那就会使文件的内容顺序杂乱。这就需要涉及到锁了,但是加锁一般会造成程序的执行速度下降,而且如果进程在多处需要向文件输出,也不好把这些代码整个都锁起来,如果都锁起来,那跟单进程还有什么区别。有一个解决办法就是把向文件的输出都整合到一块去,在这一块集中加个锁,这样问题就不大了。不过还有一种更加优雅的解决方式:使用multiprocessing库的回调函数功能。

      具体思路跟把文件输出集中在一起也差不多,就是把进程需要写入文件的内容作为返回值返回给惠和的回调函数,使用回调函数向文件中写入内容。这样做在windows下面还有一个好处,在windows环境下,python的多进程没有像linux环境下的多进程一样,linux环境下的multiprocessing库是基于fork函数,父进程fork了一个子进程之后会把自己的资源,比如文件句柄都传递给子进程。但是在windows环境下没有fork函数,所以如果你在父进程里打开了一个文件,在子进程中写入,会出现ValueError: I/O operation on closed file这样的错误,而且在windows环境下最好加入if __name__ == '__main__'这样的判断,以避免一些可能出现的RuntimeError或者死锁。

    下面是代码:

    from multiprocessing import Pool
    import time
    
    
    def mycallback(x):
        list1.append(x)
    def sayHi(num):
    
        return num
    
    if __name__ == '__main__':
        
        pool = Pool(4)
        list1 = []
        for i in range(4):
            pool.apply_async(sayHi, (i,), callback=mycallback)
            # print(x)
        pool.close()
        pool.join()
        
        print(list1)

    三、Python 进程之间共享数据(全局变量)

    进程之间共享数据(数值型):

    import multiprocessing  
      
    def  func(num):  
        num.value=10.78  #子进程改变数值的值,主进程跟着改变  
      
    if  __name__=="__main__":  
        num=multiprocessing.Value("d",10.0) # d表示数值,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value)  
        print(num.value)  
      
        p=multiprocessing.Process(target=func,args=(num,))  
        p.start()  
        p.join()  
      
        print(num.value)  

    进程之间共享数据(数组型):

    import multiprocessing  
      
    def  func(num):  
        num[2]=9999   #子进程改变数组,主进程跟着改变  
      
    if  __name__=="__main__":  
        num=multiprocessing.Array("i",[1,2,3,4,5])   #主进程与子进程共享这个数组  
        print(num[:])  
      
        p=multiprocessing.Process(target=func,args=(num,))  
        p.start()   
        p.join()  
      
        print(num[:])  

    进程之间共享数据(dict,list):

    import multiprocessing  
      
    def func(mydict,mylist):  
        mydict["index1"]="aaaaaa"   #子进程改变dict,主进程跟着改变  
        mydict["index2"]="bbbbbb"  
        mylist.append(11)        #子进程改变List,主进程跟着改变  
        mylist.append(22)  
        mylist.append(33)  
      
    if __name__=="__main__":  
        with multiprocessing.Manager() as MG:   #重命名  
            mydict=multiprocessing.Manager().dict()   #主进程与子进程共享这个字典  
            mylist=multiprocessing.Manager().list(range(5))   #主进程与子进程共享这个List  
      
            p=multiprocessing.Process(target=func,args=(mydict,mylist))  
            p.start()  
            p.join()  
      
            print(mylist)  
            print(mydict)  

    四、参考链接:

    1、http://blog.ftofficer.com/2009/12/python-multiprocessing-2-object-sharing-across-process/

    2、http://blog.csdn.net/Q_AN1314/article/details/51923022

    3、http://blog.csdn.net/houyanhua1/article/details/78244288

  • 相关阅读:
    rs
    stm32f767 usoc3
    stm32f767 RTT 日志
    stm32f767 标准库 工程模板
    stm32f767 HAL 工程模板
    docker tab 补全 linux tab 补全
    docker anconda 依赖 下载 不了
    docker run 常用 指令
    linux scp 命令
    Dockerfile 常用参数说明
  • 原文地址:https://www.cnblogs.com/xiaxuexiaoab/p/8558519.html
Copyright © 2011-2022 走看看