zoukankan      html  css  js  c++  java
  • python自带库--multiprocessing库

    multiprocessing模块介绍

    multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix 和 Windows 上均可运行。

    multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

    一、Process类的介绍

    在 multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程

    简单实例:

    from multiprocessing import Process
    def f(name):
        print('hello', name)
    if __name__ == '__main__':
        p = Process(target=f, args=('bob',))
        p.start()
        p.join()

    创建进程的类:

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程

    强调:

    1. 需要使用关键字的方式来指定参数 

    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

    参数介绍:

    group参数未使用,值始终为None

    target表示调用对象,即子进程要执行的任务

    args表示调用对象的位置参数元组,args=(1,2,'mike',)

    kwargs表示调用对象的字典,kwargs={'name':'mike','age':18}

    name为子进程的名称

    方法介绍:

    p.start() :# 启动进程,并调用该子进程中的p.run() 

    p.run() :# 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法

    p.terminate() : # 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁

    p.is_alive() :# 如果p仍然运行,返回True

    p.join([timeout]) :# 主进程等待p终止(强调:是主进程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。

    属性介绍

    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

    p.name:进程的名称

    p.pid:进程的pid

    二、在进程之间交换对象

    multiprocessing 支持进程之间的两种通信通道:队列和管道

    1、队列(Quene)

    class multiprocessing.Queue([maxsize])

    返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中。

    一旦超时,将抛出标准库 queue 模块中常见的异常 queue.Empty 和 queue.Full

    方法:

    qsize()-----返回队列大致长度,返回数字不可靠

    empty()-----如果队列是空的,返回 True ,反之返回 False。状态不可靠

    full()-----如果队列是满的,返回 True ,反之返回 False 。状态不可靠

    put(obj[, block[, timeout]])

    将 obj 放入队列。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full  异常。反之 (block 是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。

    put_nowait(obj)-----相当于 put(obj, False)
    get([block[, timeout]])

    从队列中取出并返回对象。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (block 是 False 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。

    在 3.8 版更改: 如果队列已经关闭,会抛出 ValueError 而不是 OSError 。

    get_nowait()-----相当于 get(False)
    from multiprocessing import Process, Queue
    
    def f(q):
        q.put([42, None, 'hello'])
    
    if __name__ == '__main__':
        q = Queue()
        p = Process(target=f, args=(q,))
        p.start()
        print(q.get())    # prints "[42, None, 'hello']"
        p.join()
    

        队列是线程和进程安全的。

    2、管道(Pipe)

    multiprocessing.Pipe([duplex])

    返回一对 Connection 对象 (conn1, conn2) , 分别表示管道的两端。

    如果 duplex 被置为 True (默认值),那么该管道是双向的。如果 duplex 被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。

    from multiprocessing import Process, Pipe
    def f(conn):
        conn.send([42, None, 'hello'])
        conn.close()
    if __name__ == '__main__':
        parent_conn, child_conn = Pipe()
        p = Process(target=f, args=(child_conn,))
        p.start()
        print(parent_conn.recv())   # prints "[42, None, 'hello']"
        p.join()
    返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。
    请注意,如果两个进程(或线程)同时尝试读取或写入管道
    的 同一 端,则管道中的数据可能会损坏。
    当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

    三、进程间同步

    对于所有在 threading 存在的同步原语,multiprocessing 中都有类似的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

    from multiprocessing import Process, Lock
    def f(l, i):
        l.acquire()
        try:
            print('hello world', i)
        finally:
            l.release()
    if __name__ == '__main__':
        lock = Lock()
        for num in range(10):
            Process(target=f, args=(lock, num)).start()
    

    不使用锁的情况下,来自于多进程的输出很容易产生混淆。

    四、服务进程

    由 Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。

    Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

    from multiprocessing import Process, Manager
    def f(d, l):
        d[1] = '1'
        d['2'] = 2
        d[0.25] = None
        l.reverse()
    if __name__ == '__main__':
        with Manager() as manager:
            d = manager.dict()
            l = manager.list(range(10))
            p = Process(target=f, args=(d, l))
            p.start()
            p.join()
            print(d)
            print(l)
    

    将打印

    {0.25: None, 1: '1', '2': 2}
    [9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
    

    使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢

    五、使用工作进程

    Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。

    from multiprocessing import Pool, TimeoutError
    import time
    import os
    def f(x):
        return x*x
    if __name__ == '__main__':
        # start 4 worker processes
        with Pool(processes=4) as pool:
            # print "[0, 1, 4,..., 81]"
            print(pool.map(f, range(10)))
            # print same numbers in arbitrary order
            for i in pool.imap_unordered(f, range(10)):
                print(i)
            # evaluate "f(20)" asynchronously
            res = pool.apply_async(f, (20,))      # runs in *only* one process
            print(res.get(timeout=1))             # prints "400"
            # evaluate "os.getpid()" asynchronously
            res = pool.apply_async(os.getpid, ()) # runs in *only* one process
            print(res.get(timeout=1))             # prints the PID of that process
    
            # launching multiple evaluations asynchronously *may* use more processes
            multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
            print([res.get(timeout=1) for res in multiple_results])
    
            # make a single worker sleep for 10 secs
            res = pool.apply_async(time.sleep, (10,))
            try:
                print(res.get(timeout=1))
            except TimeoutError:
                print("We lacked patience and got a multiprocessing.TimeoutError")
    
            print("For the moment, the pool remains available for more work")
    
        # exiting the 'with'-block has stopped the pool
        print("Now the pool is closed and no longer available")
    

    请注意,进程池的方法只能由创建它的进程使用。

    六、连接(Connection)对象

    class multiprocessing.connection.Connection

    send(obj)-----将一个对象发送到连接的另一端,可以用 recv() 读取。
    recv()------返回一个由另一端使用 send() 发送的对象。
    fileno()-----返回由连接对象使用的描述符或者句柄。
    close()-----关闭连接对象。
    poll([timeout])-----返回连接对象中是否有可以读取的数据。
    send_bytes(buffer[, offset[, size]])-----从一个 bytes-like object  (字节类对象)对象中取出字节数组并作为一条完整消息发送。
    recv_bytes([maxlength])-----以字符串形式返回一条从连接对象另一端发送过来的字节数据。此方法在接收到数据前将一直阻塞。
    ecv_bytes_into(buffer[, offset])-----将一条完整的字节数据消息读入 buffer 中并返回消息的字节数。 此方法在接收到数据前将一直阻塞。
    七、class multiprocessing.Lock一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。
    方法:acquire(block=Truetimeout=None)-----获得锁,阻塞或非阻塞的。

    如果 block 参数被设为 True ( 默认值 ) , 对该方法的调用在锁处于释放状态之前都会阻塞,然后将锁设置为锁住状态并返回 True 。

    如果 block 参数被设置成 False ,方法的调用将不会阻塞。 如果锁当前处于锁住状态,将返回 False ; 否则将锁设置成锁住状态,并返回 True 。

    方法:release()---释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。

    八、管理器--管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务。其他进程可以通过代理访问这些共享对象。

    1、multiprocessing.Manager()---返回一个已启动的 SyncManager 管理器对象,这个对象可以用于在不同进程中共享数据。返回的管理器对象对应了一个已经启动的子进程,并且拥有一系列方法可以用于创建共享对象、返回对应的代理。

    2、class multiprocessing.managers.BaseManager([address[, authkey]])---创建一个 BaseManager 对象。

    一旦创建,应该及时调用 start() 或者 get_server().serve_forever() 以确保管理器对象对应的管理进程已经启动。

    address 是管理器服务进程监听的地址。如果 address 是 None ,则允许和任意主机的请求建立连接。

    authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkey 是 None, 则会使用 current_process().authkey , 否则,就使用 authkey , 需要保证它必须是 byte 类型的字符串。

    方法:tart([initializer[, initargs]])-----为管理器开启一个子进程,如果 initializer 不是 None , 子进程在启动时将会调用 initializer(*initargs)

    方法:get_server()----返回一个 Server  对象,它是管理器在后台控制的真实的服务。 Server  对象拥有 serve_forever() 方法。

    方法:connect()----将本地管理器对象连接到一个远程管理器进程:

    方法:shutdown()-----停止管理器的进程。这个方法只能用于已经使用 start() 启动的服务进程。它可以被多次调用。

    方法:register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])-----一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。

    属性:address----管理器所用的地址。

    九、进程池------可以创建一个进程池,它将使用 Pool 类执行提交给它的任务。

    class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

    一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现。

    map(funciterable[, chunksize])-----内置 map() 函数的并行版本 (但它只支持一个 iterable 参数,对于多个可迭代对象请参阅 starmap())。 它会保持阻塞直到获得结果。

    参考文档:https://docs.python.org/zh-cn/3/library/multiprocessing.html#multiprocessing.managers.BaseManager.connect

  • 相关阅读:
    C++——文件的读写
    我以我血荐轩辕——记徐家福教授的演讲
    文件命名
    面向对象
    关于函数
    php跨域发送请求原理以及同步异步问题
    关于iframe
    关于url
    cookie
    call和apply
  • 原文地址:https://www.cnblogs.com/eihouwang/p/14197640.html
Copyright © 2011-2022 走看看