zoukankan      html  css  js  c++  java
  • python多进程multiprocessing Pool相关问题

    python多进程想必大部分人都用到过,可以充分利用多核CPU让代码效率更高效。

    我们看看multiprocessing.pool.Pool.map的官方用法

    map(func, iterable[, chunksize])
    A parallel equivalent of the map() built-in function (it supports only one iterable argument though). It blocks until the result is ready.
    
    This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

    一、多参数传入如何变成一个参数

    map的用法,函数func只允许一个可迭代的参数传递进去。

    如果我们需要传递多个参数时怎么办呢,

    一种方法是把多个参数放入到一个list或者元祖里当做一个参数传入func中

    还有一种是使用偏函数,偏函数(Partial function)是通过将一个函数的部分参数预先绑定为某些值,从而得到一个新的具有较少可变参数的函数。在Python中,可以通过functools中的partial高阶函数来实现偏函数功能。偏函数partial的源码如下:

    def partial(func, *args, **keywords):
        """New function with partial application of the given arguments
        and keywords.
        """
        if hasattr(func, 'func'):
            args = func.args + args
            tmpkw = func.keywords.copy()
            tmpkw.update(keywords)
            keywords = tmpkw
            del tmpkw
            func = func.func
    
        def newfunc(*fargs, **fkeywords):
            newkeywords = keywords.copy()
            newkeywords.update(fkeywords)
            return func(*(args + fargs), **newkeywords)
        newfunc.func = func
        newfunc.args = args
        newfunc.keywords = keywords
        return newfunc

    使用方法也很简单,比如我们有一个func函数,里面要传入texts,lock, data三个参数,但是我们想要用多进程把data分别传入进去计算,那么我们就可以先用partial函数,将texts和lock先固定到函数里组成一个新的函数,然后新函数传入data一个参数就可以了

    from functools import partial
    def func(texts, lock, data):
        ......
    
    
    pt = partial(func, tests, lock)  
    
    # 新函数pt只需要传入一个参数data

    这我们就可以对pt函数套用pool.map函数并且只传入一个参数data里。

    二、多进程内存复制

    python对于多进程中使用的是copy on write机制,python 使用multiprocessing来创建多进程时,无论数据是否不会被更改,子进程都会复制父进程的状态(内存空间数据等)。所以如果主进程耗的资源较多时,不小心就会造成不必要的大量的内存复制,从而可能导致内存爆满的情况。

    进程的启动有spawn、fork、forkserver三种方式

    spawn:调用该方法,父进程会启动一个新的python进程,子进程只会继承运行进程对象run()方法所需的那些资源。特别地,子进程不会继承父进程中不必要的文件描述符和句柄。与使用forkforkserver相比,使用此方法启动进程相当慢。

               Available on Unix and Windows. The default on Windows.

    fork:父进程使用os.fork()来fork Python解释器。子进程在开始时实际上与父进程相同,父进程的所有资源都由子进程继承。请注意,安全创建多线程进程尚存在一定的问题。

              Available on Unix only. The default on Unix.

    forkserver:当程序启动并选择forkserverstart方法时,将启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它fork一个新进程。 fork服务器进程是单线程的,因此使用os.fork()是安全的。没有不必要的资源被继承。

             Available on Unix platforms which support passing file descriptors over Unix pipes.

          要选择以上某一种start方法,请在主模块的if __name__ == '__ main__'子句中使用mp.set_start_method()。并且mp.set_start_method()在一个程序中仅仅能使用一次。

    import multiprocessing as mp
    
    def foo(q):
        q.put('hello')
    
    if __name__ == '__main__':
        mp.set_start_method('spawn')
        q = mp.Queue()
        p = mp.Process(target=foo, args=(q,))
        p.start()
        print(q.get())
        p.join()

    设置maxtasksperchild=1,因此,每个任务完成后都不会重新生成进程,

    对pool.map的调用中指定chunksize = 1.这样iterable中的多个项目将不会从工作进程的感知捆绑到一个“任务”中:

    import multiprocessing
    import time
    import os
    
    def f(x):
        print("PID: %d" % os.getpid())
        time.sleep(x)
        complex_obj = 5 #more complex axtually
        return complex_obj
    
    if __name__ == '__main__':
        multiprocessing.set_start_method('spawn')
        pool = multiprocessing.Pool(4, maxtasksperchild=1)
        pool.map(f, [5]*30, chunksize=1)
        pool.close()

    三、多进程间通讯

    还有一种情况是,多进程间要相互之间通讯,比如我每一个进程的结果都要存入到texts这个list里。当然要把这个texts当做参数传入到函数里面,但是一般的list并不能共享给所有的进程,我们需要用multiprocessing.Manager().list()建立的list才可以用于进程间通讯,防止冲突,还要对texts加上锁,防止操作冲突。注意multiprocessing.Lock() 创建的锁不能传递,需要使用multiprocessing.Manager().Lock()来创建。multiprocessing.Manager()可创建字典,也可创建list,lock,它创建的变量可用于多进程间传递才不会出错。比如以下代码:

    texts = multiprocessing.Manager().list()
    lock = multiprocessing.Manager().Lock()
    pool = multiprocessing.Pool(processes=4)
    data = list(range(20))
    pt = partial(func, texts, lock)
    pool.map(pt, data)
    pool.close()
    pool.join()
  • 相关阅读:
    对OpenCV学习笔记(1)遗留问题的思考
    转:争论32bit/64bit的人都搞错了方向,需要分清楚IA64和x64
    Win8_64bit+VS2012下的OpenCV学习笔记(1)
    pikachu练习平台-不安全的文件下载
    pikachu练习平台-文件包含漏洞(Files Inclusion)
    pikachu练习平台-RCE(远程系统命令、代码执行)
    pikachu练习平台(SQL注入 )
    pikachu练习平台(CSRF(跨站请求伪造) )
    pikachu练习平台(XSS-漏洞测试案例(cookie的窃取和利用、钓鱼攻击、XSS获取键盘记录))
    pikachu练习平台(XSS(跨站脚本))
  • 原文地址:https://www.cnblogs.com/zongfa/p/11332556.html
Copyright © 2011-2022 走看看