zoukankan      html  css  js  c++  java
  • Python标准模块--multiprocessing

    1 模块简介

    multiprocessing模块在Python2.6中引入。最初的multiprocessing是由Jesse Noller和Richard Oudkerk在PEP 371中定义。就像你可以在threading模块中使用多个线程一样,multiprocessing模块允许你使用多个进程。当你使用多个进程时,你可以避免GIL锁,并充分利用机器的多处理器。

    multiprocessing库包括一些没有在threading模块中出现的API。例如,你可以使用Pool类在多个输入上并行执行一个函数。我们将在后面的章节提到Pool。我们先从multiprocessing中的Process类开始。

    2 模块使用

    2.1 开始使用Multiprocessing

    Process类与threading中的Thread类非常相似。让我们尝试着创建多个进程,这些进程都会调用同一个函数,让我们看看它们是如何工作的,

    import os
    
    from multiprocessing import Process
    
    def doubler(number):
        result = number * 2
        proc = os.getpid()
        print("{0} double to {1} by process id:{2}
    ".format(number,result,proc))
    
    if __name__ == "__main__":
        numbers = [5,10,15,20,25]
        procs = []
        for index,number in enumerate(numbers):
            proc = Process(target = doubler,args = (number,))
            procs.append(proc)
            proc.start()
        for proc in procs:
            proc.join()
    

    这个例子中,我们引入Process,然后创建一个doubler函数。在doubler函数中,我们将传入的数乘以2。我们也使用Python的os模块用于获取当前进程的ID(或者为pid)。这将会告诉我们哪个进程正在调用函数。在代码的底部,我们创建了多个线程,并且启动它们。最后一个循环就是在每个进程上调用join()方法,这个将会告诉Python需要等待进程终止。如果你需要终止终止一个进程,你可以调用terminate()方法。

    当你运行这段代码时,你将会看到与以下结果相似的信息,

    5 double to 10 by process id:6725
    
    10 double to 20 by process id:6726
    20 double to 40 by process id:6728
    15 double to 30 by process id:6727
    
    
    
    25 double to 50 by process id:6729
    

    有时候,拥有人类可读的名字的进程会更加方便。幸运的是,Process类允许你获取进程的名字,实例如下,

    import os
    
    from multiprocessing import Process,current_process
    
    def doubler(number):
        result = number * 2
        proc_name = current_process().name
        print("{0} double to {1} by process id:{2}
    ".format(number,result,proc_name))
    
    if __name__ == "__main__":
        numbers = [5,10,15,20,25]
        procs = []
        proc = Process(target = doubler, args = (5,))
        for index,number in enumerate(numbers):
            proc = Process(target = doubler,args = (number,))
            procs.append(proc)
            proc.start()
        proc = Process(target = doubler,name = "Test",args = (2,))
        proc.start()
        procs.append(proc)
    
        proc = Process(target = doubler,args = (3,))
        proc.start()
        procs.append(proc)
    
        for proc in procs:
            proc.join()
    

    在这个例子中,我们另外引入了current_process。current_process与threading模块中的current_thread很相似。我们用它获取当前正在调用函数的进程的名字。你将会注意到,前面5个进程,我们没有设置名字。对于第6个进程,我们设置该进程名字为"Test",让我们看一下输出,

    5 double to 10 by process id:Process-2
    10 double to 20 by process id:Process-3
    
    
    15 double to 30 by process id:Process-4
    
    25 double to 50 by process id:Process-6
    
    2 double to 4 by process id:Test
    
    20 double to 40 by process id:Process-5
    
    3 double to 6 by process id:Process-8
    

    输出显示了multiprocessing模块给每个进程赋予一个编号作为它的名字的一部分。当我们指定一个线程的名字时,数字就不会再加入到名字中。由于第6个进程本来的编号是7,但是由于它已经指定进程名字,因此对于最后一个线程,跳过数字7,编号就为8。

    2.2 进程锁

    multiprocessing模块支持进程锁的方式与threading模块支持线程锁的方式相同。你所需要做的就是引入Lock,获取它,处理任务,然后释放。实例如下,

    from multiprocessing import Process,Lock
    
    def printer(item,lock):
        lock.acquire()
        try:
            print(item)
        finally:
            lock.release()
    
    if __name__ == "__main__":
        lock = Lock()
        items = ['tango','foxtrot',10]
        for item in items:
            p = Process(target = printer,args = (item,lock))
            p.start()
    

    在这里,我们首先创建一个简单的将输入打印出来的函数。为了阻止进程之间的互相干扰,我们使用了一个Lock对象。这段代码遍历列表中的三个元素,给每个元素都创建一个进程。每个进程在调用函数时,传入一个列表中的元素。由于我们使用了进程锁,下一个进程将会等待线程锁释放后才能执行。

    2.3 日志

    进程中输出日志与线程中输出日志有所不同。原因就是Python的logging库没有使用进程共享锁,所以你可能会得到多个进程混合的日志信息。让我们尝试在前面的例子中加入日志模块,如下所示,

    import logging
    import multiprocessing
    from multiprocessing import Process,Lock
    
    def printer(item,lock):
        lock.acquire()
        try:
            print(item)
        finally:
            lock.release()
    
    if __name__ == "__main__":
        lock = Lock()
        items = ['tango','foxtrot',10]
        multiprocessing.log_to_stderr()
        logger = multiprocessing.get_logger()
        logger.setLevel(logging.INFO)
        for item in items:
            p = Process(target = printer,args = (item,lock))
            p.start()
    

    最简单的方式就是将日志输出到标准错误流上。我们调用log_to_stderr()函数来完成。然后我们调用get_logger方法获取一个logger对象,并且设置日志等级为INFO。剩余的代码和上面的例子是一样的。这里,我并没有调用join()方法。父线程会默认调用join()方法。

    当你运行这段代码时,你将会看到与以下结果相似的信息,

    tango
    [INFO/Process-1] child process calling self.run()
    foxtrot
    [INFO/Process-1] process shutting down
    [INFO/Process-1] process exiting with exitcode 0
    [INFO/Process-2] child process calling self.run()
    [INFO/Process-2] process shutting down
    [INFO/MainProcess] process shutting down
    [INFO/Process-2] process exiting with exitcode 0
    [INFO/MainProcess] calling join() for process Process-2
    [INFO/MainProcess] calling join() for process Process-1
    [INFO/MainProcess] calling join() for process Process-3
    [INFO/Process-3] child process calling self.run()
    10
    [INFO/Process-3] process shutting down
    [INFO/Process-3] process exiting with exitcode 0
    

    2.4 Pool

    Pool类用于表示一个工作进程池。它允许你卸载工作进程中的任务。让我们看一个简单的实例,

    from multiprocessing import Pool
    
    def doubler(number):
        return number * 2
    
    if __name__ == "__main__":
        numbers = [5,10,20]
        pool = Pool(processes = 3)
        print(pool.map(doubler,numbers))
    

    这里,我们首先创建一个Pool实例,并且告诉它创建三个工作进程。然后我们使用map方法,将函数和可迭代对象与每个进程一一建立对映。最后,我们将结果打印出来,也就是 [10,20,40]

    你也可以通过使用apply_async方法来获取进程池中进程的结果。

    from multiprocessing import Pool
    
    def doubler(number):
        return number * 2
    
    if __name__ == "__main__":
        numbers = [5,10,20]
        pool = Pool(processes = 3)
        result = pool.apply_async(doubler,(25,))
        print(result.get(timeout = 1))
    

    get函数允许我们直接访问进程的结果。你将会注意到我们设置了超时,以防当我们在调用函数时出现意外。我们并不希望程序莫名奇妙地被阻塞了。

    2.5 进程通信

    multiprocessing模块主要有两种方法来实现进程间通信,分别是队列和管道。队列实现方式是线程和进程安全。让我们看一看基于队列实现的进程间通信的实例,

    from multiprocessing import Process,Queue
    
    sentine1 = -1
    
    def creator(data,q):
        print("Creating data putting it on the queue")
        for item in data:
            q.put(item)
    
    def my_consumer(q):
        while True:
            data = q.get()
            print("data found to be processed:{}".format(data))
            processed = data * 2
            print(processed)
            if data is sentine1:
                break
    
    if __name__ == "__main__":
        q = Queue()
        data = [5,10,13,-1]
        process_one = Process(target = creator,args = (data,q))
        process_two = Process(target = my_consumer, args = (q,))
        process_one.start()
        process_two.start()
    
        q.close()
        q.join_thread()
    
        process_one.join()
        process_two.join()
    

    这里,我们首先引入Queue和Process。然后我们创建两个函数,第一个函数生产数据,并将数据添加到队列;第二个函数消费数据并处理它。添加数据到队列通过队列的put()方法实现,从队列中获取数据是通过队列的get()方法实现。代码块中的最后一部分是创建队列对象,两个进程并且启动进程。你将会注意到我们实在进程上调用join()方法,而不是在队列上。

    2.6 总结

    我们在本文中讲解了很多技术点。你已经学习了如何在常规函数上使用multiprocessing模块,使用队列进行进程间通信,对进程命名等等。在本文中,Python文档中仍然还有很多知识没有涉及到,一定要继续钻研。在此期间,你已经了解到如何通过Python利用你手中计算机的处理能力。

    3 Reference

    Python 201

  • 相关阅读:
    Dubbo
    支持微服务架构落地的Java框架
    thinkphp6的主要特性
    thinkphp5的主要特性
    RPC
    HTTP1.0 HTTP1.1 HTTP2.0 主要特性对比
    RabbitMQ 生产环境配置详解
    分布式AKF拆分原则
    通过Hystrix了解分布式接口级的高可用
    Python中使用grpc与consul
  • 原文地址:https://www.cnblogs.com/zhbzz2007/p/6063097.html
Copyright © 2011-2022 走看看