zoukankan      html  css  js  c++  java
  • 【转载】 源码分析multiprocessing的Value Array共享内存原理

    原文地址:

    http://xiaorui.cc/archives/3290

    =========================================================

    当第一次使用python多进程模块(multiprocessing)的Value Array做数据共享,就觉得一定是基于mmap实现的。 当然python文档中也说明是共享内存的方式了。 mmap只是提供了文件映射内存到进程地址空间的的一种方法,通过这方法你是可以读写数据的.    

          直接去读写数据会让人烦恼的,对于上层应用不应该对他的数据格式进行解析,我向mmap里面flush一条数据 “521+我是峰云” 这样的组合,我怎么读? 如果数据是这么写进去的,我是需要约定的数据格式, 像http那样。   有什么解决方法?  首先想到的是json,pickle序列化.  那么multiprocessing Value是如何解决的?  他用ctypes内置的基本数据结构实现的,这里准确说 C的数据结构,我们只是用ctypes引用使用而已。 ctypes可以很好的切分内存,转换成可用的数据结构。 

    该文章写的有些乱,欢迎来喷 ! 另外文章后续不断更新中,请到原文地址查看更新. http://xiaorui.cc/?p=3290

    额外说一下,multiprocessing 的Value和Array实现的方法大同小异,只是选用不同的ctypes数据类型而已.  另外multiprocessing官方有两种方法提供给我们,一个是共享内存的方式,另一个是Manager 网络的方式,因为借助了网络,Manager的数据类型要比共享内存要多。

    对于我们多进程应用来说,我们只需要找到文件映射的内存空间,进一步找到数据结构就可以实现数据共享了。 

    multiprocessing Value使用方法:

    #xiaorui.cc
    from multiprocessing import Process, Value
    running_status = Value('d', True)

    我们说下 ctypes用法, ctypes大多是用来调用C库的,当然你也可以使用它的基本数据类型。

    下面简单说说,怎么通过调用ctypes类型的指针函数来创建指针实例:

    from ctypes import *  
    i = c_int(1)  
    i.value = 521

    创建以整形数组

    #xiaorui.cc
    import ctypes
    int_array = ctypes.c_int * 10
    a = int_array(10,33,55)

     详细的ctypes用法,请到官方查看 , https://docs.python.org/2/library/ctypes.html

    接着粗略的聊聊multiprocessing共享内存的实现方法.

    multiprocessing提前设定的ctypes映射表,这样对你来说只需要传递符号就可以了。 

    typecode_to_type = {
        'c': ctypes.c_char,  'u': ctypes.c_wchar,
        'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
        'h': ctypes.c_short, 'H': ctypes.c_ushort,
        'i': ctypes.c_int,   'I': ctypes.c_uint,
        'l': ctypes.c_long,  'L': ctypes.c_ulong,
        'f': ctypes.c_float, 'd': ctypes.c_double
        }

    下面这个Value就是我们用来多进程共享数据的函数,下面我们一步步的看看他是如何实现的.

    #xiaorui.cc
    
    def Value(typecode_or_type, *args, **kwds):
        lock = kwds.pop('lock', None)
        ...
        还是会调用RawValue
        obj = RawValue(typecode_or_type, *args)
        if lock is False:
            return obj
        if lock in (True, None):
            lock = RLock()
        if not hasattr(lock, 'acquire'):
            raise AttributeError("'%r' has no method 'acquire'" % lock)
        返回值是synchronized对象
        return synchronized(obj, lock)
    
    def RawArray(typecode_or_type, size_or_initializer):
        type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
        if isinstance(size_or_initializer, (int, long)):
            type_ = type_ * size_or_initializer
            obj = _new_value(type_)
            ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
            return obj
        else:
            type_ = type_ * len(size_or_initializer)
            result = _new_value(type_)  #申请共享内存空间
            result.__init__(*size_or_initializer)
            return result
    
    
    def RawValue(typecode_or_type, *args):
        通过符号获取相应的ctypes对象
        type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
        obj = _new_value(type_)
        ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
        ...
    
    
    def _new_value(type_):
        size = ctypes.sizeof(type_)
        通过heap.py 的 BufferWrapper类申请内存空间
        wrapper = heap.BufferWrapper(size)
        return rebuild_ctype(type_, wrapper, None)

    python2.7/multiprocessing/heap.py

    #xiaorui.cc
    class BufferWrapper(object):
    
        _heap = Heap()
    
        def __init__(self, size):
            assert 0 <= size < sys.maxint  #一定要非负整数
            block = BufferWrapper._heap.malloc(size)    #调用_heap.malloc申请空间
            self._state = (block, size)
            Finalize(self, BufferWrapper._heap.free, args=(block,))
    
    
    class Heap(object):
    
        def malloc(self, size):
            assert 0 <= size < sys.maxint
            if os.getpid() != self._lastpid:
                self.__init__()                     # 通过self._lastpid判断是否需要重新初始化
            self._lock.acquire()
            self._free_pending_blocks()
            try:
                size = self._roundup(max(size,1), self._alignment)
                (arena, start, stop) = self._malloc(size)
                
                ...
    
                return block
            finally:
                self._lock.release()
    
        def _malloc(self, size):
            i = bisect.bisect_left(self._lengths, size)
            if i == len(self._lengths):
                length = self._roundup(max(self._size, size), mmap.PAGESIZE)  #计算申请空间的大小
                self._size *= 2
                arena = Arena(length)    # 调用Arena类
    
    
    class Arena(object):
    
        def __init__(self, size):
            self.buffer = mmap.mmap(-1, size)     #Value共享内存的方式果然mmap来实现
            self.size = size
            self.name = None

    这里mmap传递-1是个什么概念,一般咱们都是传递文件描述符的。

    Only -1 is accepted on Unix: on my 64-bit Ubuntu box with Python 2.6.5,
    mmap.mmap(0, 256) fails with errno=19 (No such device) and mmap.mmap(-1, 256) works fine. And, To map anonymous memory, -1 should be passed as the fileno along with the length.

    参数fd为即将映射到进程空间的文件描述字,一般由open()返回,同时,fd可以指定为-1,此时须指定flags参数中的MAP_ANON,表明进行的是匿名映射(不涉及具体的文件名,避免了文件的创建及打开,很显然只能用于具有亲缘关系的

    下面的代码通过mmap ctypes实现了一个简单的数据共享, 这样对于我们来说,可以像操作python对象那样操作映射的内存地址

    代码引用地址:
    https://blog.schmichael.com/2011/05/15/sharing-python-data-between-processes-using-mmap/

    a.py 设置mmap,调用ctypes创建一个c_int对象 。 a.py在共享内存中一共创建了两个数据结构,先是用c_int变量i,然后使用struct.calcsize拿到 变量 i 的数据大小,然后用from_buffer加上offset申请对象。这样能保证内存连续性。

    import ctypes
    import mmap
    import os
    import struct
    
    
    def main():
        fd = os.open('/tmp/mmaptest', os.O_CREAT | os.O_TRUNC | os.O_RDWR)
    
        assert os.write(fd, 'x00' * mmap.PAGESIZE) == mmap.PAGESIZE
    
        buf = mmap.mmap(fd, 0, mmap.MAP_SHARED, mmap.PROT_WRITE)
    
        i = ctypes.c_int.from_buffer(buf)
    
        i.value = 10
    
        # Before we create a new value, we need to find the offset of the next free
        # memory address within the mmap
        offset = struct.calcsize(i._type_)
    
        # The offset should be uninitialized ('x00')
        assert buf[offset] == 'x00'
    
        # Now ceate a string containing 'foo' by first creating a c_char array
        s_type = ctypes.c_char * len('foo')
    
        # Now create the ctypes instance
        s = s_type.from_buffer(buf, offset)
    
        s.raw = 'foo'
    
        print 'Changing i'
        i.value *= i.value
    
        print 'Changing s'
        s.raw = 'bar'
    
        new_i = 111
        i.value = int(new_i)
    
    if __name__ == '__main__':
        main()

    b.py , 可以接受a.py扔过去的值, 流程一样.

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import mmap
    import os
    import struct
    import time
    
    def main():
        # Open the file for reading
        fd = os.open('/tmp/mmaptest', os.O_RDONLY)
    
        # Memory map the file
        buf = mmap.mmap(fd, 0, mmap.MAP_SHARED, mmap.PROT_READ)
    
        i = None
        s = None
    
        new_i, = struct.unpack('i', buf[:4])
        new_s, = struct.unpack('3s', buf[4:7])
    
        if i != new_i or s != new_s:
            print 'i: %s => %d' % (i, new_i)
            print 's: %s => %s' % (s, new_s)
            i = new_i
            s = new_s
    
    if __name__ == '__main__':
        main()

    对于multiprocessing共享内存方式来通信就说这么多了,上面的描述讲解还是比较粗略,如果大家想刨根问底,可以看 multiprocessing sharedctypes.py heap.py forking.py模块。 下次跟大家聊聊manger的实现。

    本博客是博主个人学习时的一些记录,不保证是为原创,个别文章加入了转载的源地址还有个别文章是汇总网上多份资料所成,在这之中也必有疏漏未加标注者,如有侵权请与博主联系。
  • 相关阅读:
    请求转发和请求重定向的区别
    查看电脑连过的WiFi密码
    linux mysql不能远程登录
    map的遍历方法
    ________________springbootのMybatis
    ________________springbootのTest
    ________________springbootの自定义starter
    ________________springbootのAOP
    ________________springbootのjdbc、事物
    ________________初学springboot14
  • 原文地址:https://www.cnblogs.com/devilmaycry812839668/p/15074631.html
Copyright © 2011-2022 走看看